Skip to content

Commit

Permalink
Merge pull request #2602 from kuzudb/copy-rdf-graph
Browse files Browse the repository at this point in the history
Copy rdf graph
  • Loading branch information
andyfengHKU committed Dec 21, 2023
2 parents d591bff + ef72884 commit dd0efd3
Show file tree
Hide file tree
Showing 42 changed files with 682 additions and 430 deletions.
5 changes: 1 addition & 4 deletions dataset/copy-test/rdf/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
COPY taxonomy_resource_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_literal_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_resource_triples_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_literal_triples_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
5 changes: 1 addition & 4 deletions dataset/rdf/rdfox_example/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
COPY example_resource_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example_literal_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example_resource_triples_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example_literal_triples_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example FROM "dataset/rdf/rdfox_example/data.ttl";
5 changes: 1 addition & 4 deletions dataset/rdf/spb1k/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
COPY spb_resource_t FROM "dataset/rdf/spb1k/*.nq";
COPY spb_literal_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_resource_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_literal_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb FROM "dataset/rdf/spb1k/*.nq";
28 changes: 7 additions & 21 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
auto tableID = catalog.getTableID(clientContext->getTx(), tableName);
auto tableSchema = catalog.getTableSchema(clientContext->getTx(), tableID);
switch (tableSchema->tableType) {
case TableType::REL_GROUP:
case TableType::RDF: {
case TableType::REL_GROUP: {
throw BinderException(stringFormat("Cannot copy into {} table with type {}.", tableName,
TableTypeUtils::toString(tableSchema->tableType)));
}
Expand All @@ -88,24 +87,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
switch (tableSchema->tableType) {
case TableType::NODE:
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfNodeFrom(statement, std::move(readerConfig), tableSchema);
}
default:
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
}
case TableType::REL: {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfRelFrom(statement, std::move(readerConfig), tableSchema);
}
default:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
}
}
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
case TableType::REL:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
case TableType::RDF:
return bindCopyRdfFrom(statement, std::move(readerConfig), tableSchema);
// LCOV_EXCL_START
default: {
KU_UNREACHABLE;
Expand Down Expand Up @@ -133,7 +119,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statem
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ANONYMOUS));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_graph_pattern.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ std::shared_ptr<RelExpression> Binder::createRecursiveQueryRel(const parser::Rel
}
}
auto nodePredicateExecutionFlag = expressionBinder.createVariableExpression(
LogicalType{LogicalTypeID::BOOL}, InternalKeyword::ANONYMOUS);
LogicalType{LogicalTypeID::BOOL}, std::string(InternalKeyword::ANONYMOUS));
if (nodePredicate != nullptr) {
nodePredicate = expressionBinder.combineBooleanExpressions(
ExpressionType::OR, nodePredicate, nodePredicateExecutionFlag);
Expand Down
202 changes: 99 additions & 103 deletions src/binder/bind/copy/bind_copy_rdf_graph.cpp
Original file line number Diff line number Diff line change
@@ -1,121 +1,117 @@
#include "binder/binder.h"
#include "binder/copy/bound_copy_from.h"
#include "catalog/rel_table_schema.h"
#include "catalog/rdf_graph_schema.h"
#include "common/constants.h"
#include "common/copier_config/rdf_reader_config.h"
#include "common/keyword/rdf_keyword.h"
#include "common/types/rdf_variant_type.h"
#include "function/table_functions/bind_input.h"
#include "main/client_context.h"
#include "processor/operator/persistent/reader/rdf/rdf_scan.h"

using namespace kuzu::binder;
using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::parser;
using namespace kuzu::function;
using namespace kuzu::processor;

namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(const Statement& /*statement*/,
std::unique_ptr<ReaderConfig> config, TableSchema* tableSchema) {
auto func = getScanFunction(config->fileType, *config);
bool containsSerial;
std::vector<std::string> columnNames;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
columnNames.emplace_back(rdf::IRI);
RdfReaderMode mode;
if (tableSchema->tableName.ends_with(rdf::RESOURCE_TABLE_SUFFIX)) {
containsSerial = false;
columnTypes.push_back(LogicalType::STRING());
mode = RdfReaderMode::RESOURCE;
} else {
KU_ASSERT(tableSchema->tableName.ends_with(rdf::LITERAL_TABLE_SUFFIX));
containsSerial = true;
columnTypes.push_back(RdfVariantType::getType());
mode = RdfReaderMode::LITERAL;
}
config->extraConfig = std::make_unique<RdfReaderConfig>(mode);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, columnNames, std::move(columnTypes), vfs);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
std::unique_ptr<BoundStatement> Binder::bindCopyRdfFrom(
const parser::Statement&, std::unique_ptr<ReaderConfig> config, TableSchema* tableSchema) {
auto rdfSchema = ku_dynamic_cast<TableSchema*, RdfGraphSchema*>(tableSchema);
auto offset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), std::string(InternalKeyword::ROW_OFFSET));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(const Statement& /*statement*/,
std::unique_ptr<ReaderConfig> config, TableSchema* tableSchema) {
// Bind Scan
auto func = getScanFunction(config->fileType, *config);
auto containsSerial = false;
std::vector<std::string> columnNames;
logical_types_t columnTypes;
RdfReaderMode mode;
columnNames.emplace_back(rdf::SUBJECT);
columnNames.emplace_back(rdf::PREDICATE);
columnTypes.push_back(LogicalType::STRING());
columnTypes.push_back(LogicalType::STRING());
if (tableSchema->tableName.ends_with(rdf::RESOURCE_TRIPLE_TABLE_SUFFIX)) {
mode = RdfReaderMode::RESOURCE_TRIPLE;
columnNames.emplace_back(rdf::OBJECT);
columnTypes.push_back(LogicalType::STRING());
} else {
mode = RdfReaderMode::LITERAL_TRIPLE;
columnNames.emplace_back(InternalKeyword::DST_OFFSET);
columnTypes.push_back(LogicalType::INT64());
}
config->extraConfig = std::make_unique<RdfReaderConfig>(mode);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, columnNames, std::move(columnTypes), vfs);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ROW_OFFSET));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, offset);
// Bind Copy
auto rrrSchema = ku_dynamic_cast<TableSchema*, RelTableSchema*>(tableSchema);
auto rTableID = rrrSchema->getSrcTableID();
auto extraInfo = std::make_unique<ExtraBoundCopyRelInfo>();
auto s = columns[0];
auto p = columns[1];
auto sOffset = createVariable(InternalKeyword::SRC_OFFSET, LogicalTypeID::INT64);
auto pOffset = createVariable(rdf::PID, LogicalTypeID::INT64);
auto sLookUpInfo =
std::make_unique<IndexLookupInfo>(rTableID, sOffset, s, s->getDataType().copy());
auto pLookUpInfo =
std::make_unique<IndexLookupInfo>(rTableID, pOffset, p, p->getDataType().copy());
extraInfo->infos.push_back(std::move(sLookUpInfo));
extraInfo->infos.push_back(std::move(pLookUpInfo));
if (mode == RdfReaderMode::RESOURCE_TRIPLE) {
auto o = columns[2];
auto oOffset = createVariable(InternalKeyword::DST_OFFSET, LogicalTypeID::INT64);
auto oLookUpInfo =
std::make_unique<IndexLookupInfo>(rTableID, oOffset, o, o->getDataType().copy());
extraInfo->infos.push_back(std::move(oLookUpInfo));
extraInfo->toOffset = oOffset;
} else {
extraInfo->toOffset = columns[2];
// This is a temporary hack to make sure object offset will not be projected out.
extraInfo->propertyColumns.push_back(columns[2]);
}
extraInfo->fromOffset = sOffset;
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, std::move(extraInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
*LogicalType::INT64(), InternalKeyword::ROW_OFFSET);
auto r = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::IRI);
auto l = expressionBinder.createVariableExpression(*RdfVariantType::getType(), rdf::IRI);
auto s = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::SUBJECT);
auto p = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::PREDICATE);
auto o = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::OBJECT);
auto sOffset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), InternalKeyword::SRC_OFFSET);
auto pOffset = expressionBinder.createVariableExpression(*LogicalType::INT64(), rdf::PID);
auto oOffset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), InternalKeyword::DST_OFFSET);
auto scanFunc = getScanFunction(config->fileType, *config);
// Bind copy resource.
auto rScanFunc = scanFunc;
auto rBindInput = std::make_unique<RdfScanBindInput>();
rBindInput->mode = RdfReaderMode::RESOURCE;
rBindInput->config = config->copy();
auto rBindData =
rScanFunc->bindFunc(clientContext, rBindInput.get(), (Catalog*)&catalog, storageManager);
auto rColumns = expression_vector{r};
auto rScanInfo = std::make_unique<BoundFileScanInfo>(
rScanFunc, std::move(rBindData), std::move(rColumns), offset);
auto rTableID = rdfSchema->getResourceTableID();
auto rSchema = catalog.getTableSchema(clientContext->getTx(), rTableID);
auto rCopyInfo =
std::make_unique<BoundCopyFromInfo>(rSchema, std::move(rScanInfo), false, nullptr);
// Bind copy literal.
auto lScanFunc = scanFunc;
auto lBindInput = std::make_unique<RdfScanBindInput>();
lBindInput->mode = RdfReaderMode::LITERAL;
lBindInput->config = config->copy();
auto lBindData =
lScanFunc->bindFunc(clientContext, lBindInput.get(), (Catalog*)&catalog, storageManager);
auto lColumns = expression_vector{l};
auto lScanInfo = std::make_unique<BoundFileScanInfo>(
lScanFunc, std::move(lBindData), std::move(lColumns), offset);
auto lTableID = rdfSchema->getLiteralTableID();
auto lSchema = catalog.getTableSchema(clientContext->getTx(), lTableID);
auto lCopyInfo =
std::make_unique<BoundCopyFromInfo>(lSchema, std::move(lScanInfo), true, nullptr);
// Bind copy resource triples
auto rrrScanFunc = scanFunc;
auto rrrBindInput = std::make_unique<RdfScanBindInput>();
rrrBindInput->mode = RdfReaderMode::RESOURCE_TRIPLE;
rrrBindInput->config = config->copy();
auto rrrBindData = rrrScanFunc->bindFunc(
clientContext, rrrBindInput.get(), (Catalog*)&catalog, storageManager);
auto rrrColumns = expression_vector{s, p, o};
auto rrrScanInfo = std::make_unique<BoundFileScanInfo>(
rrrScanFunc, std::move(rrrBindData), rrrColumns, offset);
auto rrrTableID = rdfSchema->getResourceTripleTableID();
auto rrrSchema = catalog.getTableSchema(clientContext->getTx(), rrrTableID);
auto rrrExtraInfo = std::make_unique<ExtraBoundCopyRelInfo>();
auto sLookUp = std::make_unique<IndexLookupInfo>(rTableID, sOffset, s, s->getDataType().copy());
auto pLookUp = std::make_unique<IndexLookupInfo>(rTableID, pOffset, p, p->getDataType().copy());
auto oLookUp = std::make_unique<IndexLookupInfo>(rTableID, oOffset, o, o->getDataType().copy());
rrrExtraInfo->infos.push_back(sLookUp->copy());
rrrExtraInfo->infos.push_back(pLookUp->copy());
rrrExtraInfo->infos.push_back(oLookUp->copy());
rrrExtraInfo->fromOffset = sOffset;
rrrExtraInfo->toOffset = oOffset;
auto rrrCopyInfo = std::make_unique<BoundCopyFromInfo>(
rrrSchema, std::move(rrrScanInfo), false, std::move(rrrExtraInfo));
// Bind copy literal triples
auto rrlScanFunc = scanFunc;
auto rrlBindInput = std::make_unique<RdfScanBindInput>();
rrlBindInput->mode = RdfReaderMode::LITERAL_TRIPLE;
rrlBindInput->config = config->copy();
auto rrlBindData = rrlScanFunc->bindFunc(
clientContext, rrlBindInput.get(), (Catalog*)&catalog, storageManager);
auto rrlColumns = expression_vector{s, p, oOffset};
auto rrlScanInfo = std::make_unique<BoundFileScanInfo>(
rrlScanFunc, std::move(rrlBindData), rrlColumns, offset);
auto rrlTableID = rdfSchema->getLiteralTripleTableID();
auto rrlSchema = catalog.getTableSchema(clientContext->getTx(), rrlTableID);
auto rrlExtraInfo = std::make_unique<ExtraBoundCopyRelInfo>();
rrlExtraInfo->infos.push_back(sLookUp->copy());
rrlExtraInfo->infos.push_back(pLookUp->copy());
rrlExtraInfo->propertyColumns.push_back(oOffset);
rrlExtraInfo->fromOffset = sOffset;
rrlExtraInfo->toOffset = oOffset;
auto rrLCopyInfo = std::make_unique<BoundCopyFromInfo>(
rrlSchema, std::move(rrlScanInfo), false, std::move(rrlExtraInfo));
// Bind copy rdf
auto rdfExtraInfo = std::make_unique<ExtraBoundCopyRdfInfo>();
rdfExtraInfo->rInfo = std::move(rCopyInfo);
rdfExtraInfo->lInfo = std::move(lCopyInfo);
rdfExtraInfo->rrrInfo = std::move(rrrCopyInfo);
rdfExtraInfo->rrlInfo = std::move(rrLCopyInfo);
auto rdfCopyInfo =
std::make_unique<BoundCopyFromInfo>(rdfSchema, nullptr, false, std::move(rdfExtraInfo));
return std::make_unique<BoundCopyFrom>(std::move(rdfCopyInfo));
}

} // namespace binder
Expand Down
5 changes: 5 additions & 0 deletions src/binder/bind_expression/bind_variable_expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ std::shared_ptr<Expression> ExpressionBinder::bindVariableExpression(
throw BinderException("Variable " + parsedExpression.getRawName() + " is not in scope.");
}

std::shared_ptr<Expression> ExpressionBinder::createVariableExpression(
common::LogicalType logicalType, std::string_view name) {
return createVariableExpression(logicalType, std::string(name));
}

std::shared_ptr<Expression> ExpressionBinder::createVariableExpression(
LogicalType logicalType, std::string name) {
return std::make_shared<VariableExpression>(
Expand Down
2 changes: 1 addition & 1 deletion src/function/built_in_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "processor/operator/persistent/reader/csv/serial_csv_reader.h"
#include "processor/operator/persistent/reader/npy/npy_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"
#include "processor/operator/persistent/reader/rdf/rdf_reader.h"
#include "processor/operator/persistent/reader/rdf/rdf_scan.h"

using namespace kuzu::common;

Expand Down
4 changes: 1 addition & 3 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ class Binder {
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyNodeFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRdfNodeFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRelFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRdfRelFrom(const parser::Statement& statement,
std::unique_ptr<BoundStatement> bindCopyRdfFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
void bindExpectedNodeColumns(catalog::TableSchema* tableSchema,
const std::vector<std::string>& inputColumnNames, std::vector<std::string>& columnNames,
Expand Down
Loading

0 comments on commit dd0efd3

Please sign in to comment.