Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy rdf graph #2602

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions dataset/copy-test/rdf/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
COPY taxonomy_resource_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_literal_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_resource_triples_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy_literal_triples_t FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
COPY taxonomy FROM "dataset/copy-test/rdf/taxonomy.ttl" ;
5 changes: 1 addition & 4 deletions dataset/rdf/rdfox_example/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
COPY example_resource_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example_literal_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example_resource_triples_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example_literal_triples_t FROM "dataset/rdf/rdfox_example/data.ttl";
COPY example FROM "dataset/rdf/rdfox_example/data.ttl";
5 changes: 1 addition & 4 deletions dataset/rdf/spb1k/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
COPY spb_resource_t FROM "dataset/rdf/spb1k/*.nq";
COPY spb_literal_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_resource_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb_literal_triples_t FROM glob("dataset/rdf/spb1k/*.nq");
COPY spb FROM "dataset/rdf/spb1k/*.nq";
28 changes: 7 additions & 21 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
auto tableID = catalog.getTableID(clientContext->getTx(), tableName);
auto tableSchema = catalog.getTableSchema(clientContext->getTx(), tableID);
switch (tableSchema->tableType) {
case TableType::REL_GROUP:
case TableType::RDF: {
case TableType::REL_GROUP: {
throw BinderException(stringFormat("Cannot copy into {} table with type {}.", tableName,
TableTypeUtils::toString(tableSchema->tableType)));
}
Expand All @@ -88,24 +87,11 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
}
switch (tableSchema->tableType) {
case TableType::NODE:
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfNodeFrom(statement, std::move(readerConfig), tableSchema);
}
default:
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
}
case TableType::REL: {
switch (readerConfig->fileType) {
case FileType::TURTLE:
case FileType::NQUADS: {
return bindCopyRdfRelFrom(statement, std::move(readerConfig), tableSchema);
}
default:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
}
}
return bindCopyNodeFrom(statement, std::move(readerConfig), tableSchema);
case TableType::REL:
return bindCopyRelFrom(statement, std::move(readerConfig), tableSchema);
case TableType::RDF:
return bindCopyRdfFrom(statement, std::move(readerConfig), tableSchema);
// LCOV_EXCL_START
default: {
KU_UNREACHABLE;
Expand Down Expand Up @@ -133,7 +119,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statem
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), InternalKeyword::ANONYMOUS);
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ANONYMOUS));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
Expand Down
2 changes: 1 addition & 1 deletion src/binder/bind/bind_graph_pattern.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ std::shared_ptr<RelExpression> Binder::createRecursiveQueryRel(const parser::Rel
}
}
auto nodePredicateExecutionFlag = expressionBinder.createVariableExpression(
LogicalType{LogicalTypeID::BOOL}, InternalKeyword::ANONYMOUS);
LogicalType{LogicalTypeID::BOOL}, std::string(InternalKeyword::ANONYMOUS));
if (nodePredicate != nullptr) {
nodePredicate = expressionBinder.combineBooleanExpressions(
ExpressionType::OR, nodePredicate, nodePredicateExecutionFlag);
Expand Down
202 changes: 99 additions & 103 deletions src/binder/bind/copy/bind_copy_rdf_graph.cpp
Original file line number Diff line number Diff line change
@@ -1,121 +1,117 @@
#include "binder/binder.h"
#include "binder/copy/bound_copy_from.h"
#include "catalog/rel_table_schema.h"
#include "catalog/rdf_graph_schema.h"
#include "common/constants.h"
#include "common/copier_config/rdf_reader_config.h"
#include "common/keyword/rdf_keyword.h"
#include "common/types/rdf_variant_type.h"
#include "function/table_functions/bind_input.h"
#include "main/client_context.h"
#include "processor/operator/persistent/reader/rdf/rdf_scan.h"

using namespace kuzu::binder;
using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::parser;
using namespace kuzu::function;
using namespace kuzu::processor;

namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(const Statement& /*statement*/,
std::unique_ptr<ReaderConfig> config, TableSchema* tableSchema) {
auto func = getScanFunction(config->fileType, *config);
bool containsSerial;
std::vector<std::string> columnNames;
std::vector<std::unique_ptr<common::LogicalType>> columnTypes;
columnNames.emplace_back(rdf::IRI);
RdfReaderMode mode;
if (tableSchema->tableName.ends_with(rdf::RESOURCE_TABLE_SUFFIX)) {
containsSerial = false;
columnTypes.push_back(LogicalType::STRING());
mode = RdfReaderMode::RESOURCE;
} else {
KU_ASSERT(tableSchema->tableName.ends_with(rdf::LITERAL_TABLE_SUFFIX));
containsSerial = true;
columnTypes.push_back(RdfVariantType::getType());
mode = RdfReaderMode::LITERAL;
}
config->extraConfig = std::make_unique<RdfReaderConfig>(mode);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, columnNames, std::move(columnTypes), vfs);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
std::unique_ptr<BoundStatement> Binder::bindCopyRdfFrom(
const parser::Statement&, std::unique_ptr<ReaderConfig> config, TableSchema* tableSchema) {
auto rdfSchema = ku_dynamic_cast<TableSchema*, RdfGraphSchema*>(tableSchema);
auto offset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), std::string(InternalKeyword::ROW_OFFSET));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(const Statement& /*statement*/,
std::unique_ptr<ReaderConfig> config, TableSchema* tableSchema) {
// Bind Scan
auto func = getScanFunction(config->fileType, *config);
auto containsSerial = false;
std::vector<std::string> columnNames;
logical_types_t columnTypes;
RdfReaderMode mode;
columnNames.emplace_back(rdf::SUBJECT);
columnNames.emplace_back(rdf::PREDICATE);
columnTypes.push_back(LogicalType::STRING());
columnTypes.push_back(LogicalType::STRING());
if (tableSchema->tableName.ends_with(rdf::RESOURCE_TRIPLE_TABLE_SUFFIX)) {
mode = RdfReaderMode::RESOURCE_TRIPLE;
columnNames.emplace_back(rdf::OBJECT);
columnTypes.push_back(LogicalType::STRING());
} else {
mode = RdfReaderMode::LITERAL_TRIPLE;
columnNames.emplace_back(InternalKeyword::DST_OFFSET);
columnTypes.push_back(LogicalType::INT64());
}
config->extraConfig = std::make_unique<RdfReaderConfig>(mode);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, columnNames, std::move(columnTypes), vfs);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ROW_OFFSET));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, offset);
// Bind Copy
auto rrrSchema = ku_dynamic_cast<TableSchema*, RelTableSchema*>(tableSchema);
auto rTableID = rrrSchema->getSrcTableID();
auto extraInfo = std::make_unique<ExtraBoundCopyRelInfo>();
auto s = columns[0];
auto p = columns[1];
auto sOffset = createVariable(InternalKeyword::SRC_OFFSET, LogicalTypeID::INT64);
auto pOffset = createVariable(rdf::PID, LogicalTypeID::INT64);
auto sLookUpInfo =
std::make_unique<IndexLookupInfo>(rTableID, sOffset, s, s->getDataType().copy());
auto pLookUpInfo =
std::make_unique<IndexLookupInfo>(rTableID, pOffset, p, p->getDataType().copy());
extraInfo->infos.push_back(std::move(sLookUpInfo));
extraInfo->infos.push_back(std::move(pLookUpInfo));
if (mode == RdfReaderMode::RESOURCE_TRIPLE) {
auto o = columns[2];
auto oOffset = createVariable(InternalKeyword::DST_OFFSET, LogicalTypeID::INT64);
auto oLookUpInfo =
std::make_unique<IndexLookupInfo>(rTableID, oOffset, o, o->getDataType().copy());
extraInfo->infos.push_back(std::move(oLookUpInfo));
extraInfo->toOffset = oOffset;
} else {
extraInfo->toOffset = columns[2];
// This is a temporary hack to make sure object offset will not be projected out.
extraInfo->propertyColumns.push_back(columns[2]);
}
extraInfo->fromOffset = sOffset;
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, std::move(extraInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
*LogicalType::INT64(), InternalKeyword::ROW_OFFSET);
auto r = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::IRI);
auto l = expressionBinder.createVariableExpression(*RdfVariantType::getType(), rdf::IRI);
auto s = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::SUBJECT);
auto p = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::PREDICATE);
auto o = expressionBinder.createVariableExpression(*LogicalType::STRING(), rdf::OBJECT);
auto sOffset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), InternalKeyword::SRC_OFFSET);
auto pOffset = expressionBinder.createVariableExpression(*LogicalType::INT64(), rdf::PID);
auto oOffset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), InternalKeyword::DST_OFFSET);
auto scanFunc = getScanFunction(config->fileType, *config);
// Bind copy resource.
auto rScanFunc = scanFunc;
auto rBindInput = std::make_unique<RdfScanBindInput>();
rBindInput->mode = RdfReaderMode::RESOURCE;
rBindInput->config = config->copy();
auto rBindData =
rScanFunc->bindFunc(clientContext, rBindInput.get(), (Catalog*)&catalog, storageManager);
auto rColumns = expression_vector{r};
auto rScanInfo = std::make_unique<BoundFileScanInfo>(
rScanFunc, std::move(rBindData), std::move(rColumns), offset);
auto rTableID = rdfSchema->getResourceTableID();
auto rSchema = catalog.getTableSchema(clientContext->getTx(), rTableID);
auto rCopyInfo =
std::make_unique<BoundCopyFromInfo>(rSchema, std::move(rScanInfo), false, nullptr);
// Bind copy literal.
auto lScanFunc = scanFunc;
auto lBindInput = std::make_unique<RdfScanBindInput>();
lBindInput->mode = RdfReaderMode::LITERAL;
lBindInput->config = config->copy();
auto lBindData =
lScanFunc->bindFunc(clientContext, lBindInput.get(), (Catalog*)&catalog, storageManager);
auto lColumns = expression_vector{l};
auto lScanInfo = std::make_unique<BoundFileScanInfo>(
lScanFunc, std::move(lBindData), std::move(lColumns), offset);
auto lTableID = rdfSchema->getLiteralTableID();
auto lSchema = catalog.getTableSchema(clientContext->getTx(), lTableID);
auto lCopyInfo =
std::make_unique<BoundCopyFromInfo>(lSchema, std::move(lScanInfo), true, nullptr);
// Bind copy resource triples
auto rrrScanFunc = scanFunc;
auto rrrBindInput = std::make_unique<RdfScanBindInput>();
rrrBindInput->mode = RdfReaderMode::RESOURCE_TRIPLE;
rrrBindInput->config = config->copy();
auto rrrBindData = rrrScanFunc->bindFunc(
clientContext, rrrBindInput.get(), (Catalog*)&catalog, storageManager);
auto rrrColumns = expression_vector{s, p, o};
auto rrrScanInfo = std::make_unique<BoundFileScanInfo>(
rrrScanFunc, std::move(rrrBindData), rrrColumns, offset);
auto rrrTableID = rdfSchema->getResourceTripleTableID();
auto rrrSchema = catalog.getTableSchema(clientContext->getTx(), rrrTableID);
auto rrrExtraInfo = std::make_unique<ExtraBoundCopyRelInfo>();
auto sLookUp = std::make_unique<IndexLookupInfo>(rTableID, sOffset, s, s->getDataType().copy());
auto pLookUp = std::make_unique<IndexLookupInfo>(rTableID, pOffset, p, p->getDataType().copy());
auto oLookUp = std::make_unique<IndexLookupInfo>(rTableID, oOffset, o, o->getDataType().copy());
rrrExtraInfo->infos.push_back(sLookUp->copy());
rrrExtraInfo->infos.push_back(pLookUp->copy());
rrrExtraInfo->infos.push_back(oLookUp->copy());
rrrExtraInfo->fromOffset = sOffset;
rrrExtraInfo->toOffset = oOffset;
auto rrrCopyInfo = std::make_unique<BoundCopyFromInfo>(
rrrSchema, std::move(rrrScanInfo), false, std::move(rrrExtraInfo));
// Bind copy literal triples
auto rrlScanFunc = scanFunc;
auto rrlBindInput = std::make_unique<RdfScanBindInput>();
rrlBindInput->mode = RdfReaderMode::LITERAL_TRIPLE;
rrlBindInput->config = config->copy();
auto rrlBindData = rrlScanFunc->bindFunc(
clientContext, rrlBindInput.get(), (Catalog*)&catalog, storageManager);
auto rrlColumns = expression_vector{s, p, oOffset};
auto rrlScanInfo = std::make_unique<BoundFileScanInfo>(
rrlScanFunc, std::move(rrlBindData), rrlColumns, offset);
auto rrlTableID = rdfSchema->getLiteralTripleTableID();
auto rrlSchema = catalog.getTableSchema(clientContext->getTx(), rrlTableID);
auto rrlExtraInfo = std::make_unique<ExtraBoundCopyRelInfo>();
rrlExtraInfo->infos.push_back(sLookUp->copy());
rrlExtraInfo->infos.push_back(pLookUp->copy());
rrlExtraInfo->propertyColumns.push_back(oOffset);
rrlExtraInfo->fromOffset = sOffset;
rrlExtraInfo->toOffset = oOffset;
auto rrLCopyInfo = std::make_unique<BoundCopyFromInfo>(
rrlSchema, std::move(rrlScanInfo), false, std::move(rrlExtraInfo));
// Bind copy rdf
auto rdfExtraInfo = std::make_unique<ExtraBoundCopyRdfInfo>();
rdfExtraInfo->rInfo = std::move(rCopyInfo);
rdfExtraInfo->lInfo = std::move(lCopyInfo);
rdfExtraInfo->rrrInfo = std::move(rrrCopyInfo);
rdfExtraInfo->rrlInfo = std::move(rrLCopyInfo);
auto rdfCopyInfo =
std::make_unique<BoundCopyFromInfo>(rdfSchema, nullptr, false, std::move(rdfExtraInfo));
return std::make_unique<BoundCopyFrom>(std::move(rdfCopyInfo));
}

} // namespace binder
Expand Down
5 changes: 5 additions & 0 deletions src/binder/bind_expression/bind_variable_expression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ std::shared_ptr<Expression> ExpressionBinder::bindVariableExpression(
throw BinderException("Variable " + parsedExpression.getRawName() + " is not in scope.");
}

std::shared_ptr<Expression> ExpressionBinder::createVariableExpression(
common::LogicalType logicalType, std::string_view name) {
return createVariableExpression(logicalType, std::string(name));
}

std::shared_ptr<Expression> ExpressionBinder::createVariableExpression(
LogicalType logicalType, std::string name) {
return std::make_shared<VariableExpression>(
Expand Down
2 changes: 1 addition & 1 deletion src/function/built_in_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "processor/operator/persistent/reader/csv/serial_csv_reader.h"
#include "processor/operator/persistent/reader/npy/npy_reader.h"
#include "processor/operator/persistent/reader/parquet/parquet_reader.h"
#include "processor/operator/persistent/reader/rdf/rdf_reader.h"
#include "processor/operator/persistent/reader/rdf/rdf_scan.h"

using namespace kuzu::common;

Expand Down
4 changes: 1 addition & 3 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,9 @@ class Binder {
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyNodeFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRdfNodeFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRelFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
std::unique_ptr<BoundStatement> bindCopyRdfRelFrom(const parser::Statement& statement,
std::unique_ptr<BoundStatement> bindCopyRdfFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, catalog::TableSchema* tableSchema);
void bindExpectedNodeColumns(catalog::TableSchema* tableSchema,
const std::vector<std::string>& inputColumnNames, std::vector<std::string>& columnNames,
Expand Down
Loading