Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for exporting database to parquet files #2897

Merged
merged 1 commit into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/binder/bind/bind_export_database.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "binder/copy/bound_export_database.h"
#include "binder/query/bound_regular_query.h"
#include "common/exception/binder.h"
#include "common/string_utils.h"
#include "main/client_context.h"
#include "parser/parser.h"
#include "parser/port_db.h"
Expand Down Expand Up @@ -44,6 +45,21 @@ static std::vector<ExportedTableData> getExportInfo(
return exportData;
}

FileType getFileType(std::unordered_map<std::string, common::Value>& options) {
auto fileType = FileType::CSV;
if (options.find("FORMAT") != options.end()) {
hououou marked this conversation as resolved.
Show resolved Hide resolved
auto value = options.at("FORMAT");
if (value.getDataType()->getLogicalTypeID() != LogicalTypeID::STRING) {
throw BinderException("The type of format option must be a string.");
}
auto valueStr = value.getValue<std::string>();
StringUtils::toUpper(valueStr);
fileType = FileTypeUtils::fromString(valueStr);
options.erase("FORMAT");
}
return fileType;
}

ExportedTableData Binder::extractExportData(std::string selQuery, std::string tableName) {
auto parsedStatement = Parser::parseQuery(selQuery);
ExportedTableData exportedTableData;
Expand All @@ -65,18 +81,22 @@ std::unique_ptr<BoundStatement> Binder::bindExportDatabaseClause(const Statement
auto& exportDatabaseStatement = ku_dynamic_cast<const Statement&, const ExportDB&>(statement);
auto boundFilePath = exportDatabaseStatement.getFilePath();
auto exportData = getExportInfo(catalog, clientContext->getTx(), this);
// TODO(Jiamin): add more supported file types
auto fileType = FileType::CSV;
auto csvConfig = CSVReaderConfig::construct(
bindParsingOptions(exportDatabaseStatement.getParsingOptionsRef()));
auto parsedOptions = bindParsingOptions(exportDatabaseStatement.getParsingOptionsRef());
auto fileType = getFileType(parsedOptions);
if (fileType != FileType::CSV && fileType != FileType::PARQUET) {
throw BinderException("Export database currently only supports csv and parquet files.");
}
if (fileType != FileType::CSV && parsedOptions.size() != 0) {
throw BinderException{"Only export to csv can have options."};
}
// try to create the directory, if it doesn't exist yet
if (!vfs->fileOrPathExists(boundFilePath)) {
vfs->createDir(boundFilePath);
} else {
throw BinderException(stringFormat("Directory {} already exists.", boundFilePath));
}
return std::make_unique<BoundExportDatabase>(
boundFilePath, fileType, std::move(exportData), csvConfig.option.copy());
boundFilePath, fileType, std::move(exportData), std::move(parsedOptions));
}
} // namespace binder
} // namespace kuzu
2 changes: 2 additions & 0 deletions src/binder/bind/bind_file_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/exception/copy.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_format.h"
#include "common/string_utils.h"

using namespace kuzu::parser;
using namespace kuzu::binder;
Expand Down Expand Up @@ -53,6 +54,7 @@ std::unordered_map<std::string, Value> Binder::bindParsingOptions(
std::unordered_map<std::string, Value> options;
for (auto& option : parsingOptions) {
auto name = option.first;
common::StringUtils::toUpper(name);
auto expr = expressionBinder.bindExpression(*option.second);
KU_ASSERT(expr->expressionType == ExpressionType::LITERAL);
auto literalExpr = ku_dynamic_cast<Expression*, LiteralExpression*>(expr.get());
Expand Down
2 changes: 0 additions & 2 deletions src/common/copier_config/csv_reader_config.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "common/copier_config/csv_reader_config.h"

#include "common/exception/binder.h"
#include "common/string_utils.h"

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -56,7 +55,6 @@ CSVReaderConfig CSVReaderConfig::construct(
auto config = CSVReaderConfig();
for (auto& op : options) {
auto name = op.first;
StringUtils::toUpper(name);
auto isValidStringParsingOption = validateStringParsingOptionName(name);
auto isValidBoolParsingOption = validateBoolParsingOptionName(name);
if (isValidBoolParsingOption) {
Expand Down
13 changes: 13 additions & 0 deletions src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common/copier_config/reader_config.h"

#include "common/assert.h"
#include "common/exception/binder.h"
#include "common/exception/copy.h"

namespace kuzu {
Expand Down Expand Up @@ -57,5 +58,17 @@ std::string FileTypeUtils::toString(FileType fileType) {
}
}

FileType FileTypeUtils::fromString(std::string fileType) {
if (fileType == "CSV") {
return FileType::CSV;
} else if (fileType == "PARQUET") {
return FileType::PARQUET;
} else if (fileType == "NPY") {
return FileType::NPY;
} else {
throw BinderException(stringFormat("Unsupported file type: {}.", fileType));
}
}

} // namespace common
} // namespace kuzu
24 changes: 14 additions & 10 deletions src/include/binder/copy/bound_export_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "binder/binder.h"
#include "binder/bound_statement.h"
#include "binder/query/bound_regular_query.h"
#include "common/copier_config/csv_reader_config.h"
#include "common/copier_config/reader_config.h"

namespace kuzu {
Expand All @@ -21,22 +20,27 @@ struct ExportedTableData {
class BoundExportDatabase : public BoundStatement {
public:
BoundExportDatabase(std::string filePath, common::FileType fileType,
std::vector<ExportedTableData> exportData, common::CSVOption csvOption)
std::vector<ExportedTableData> exportData,
std::unordered_map<std::string, common::Value> csvOption)
: BoundStatement{common::StatementType::EXPORT_DATABASE,
BoundStatementResult::createEmptyResult()},
filePath{std::move(filePath)}, fileType{fileType},
exportData(std::move(exportData)), csvOption{std::move(csvOption)} {}
exportData(std::move(exportData)),
boundFileInfo(fileType, std::vector{std::move(filePath)}) {
boundFileInfo.options = std::move(csvOption);
}

inline std::string getFilePath() const { return filePath; }
inline common::FileType getFileType() const { return fileType; }
inline const common::CSVOption* getCopyOption() const { return &csvOption; }
inline std::string getFilePath() const { return boundFileInfo.filePaths[0]; }
inline common::FileType getFileType() const { return boundFileInfo.fileType; }
inline common::CSVOption getCopyOption() const {
auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo.options);
return csvConfig.option.copy();
}
inline const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; }
inline const std::vector<ExportedTableData>* getExportData() const { return &exportData; }

private:
std::string filePath;
common::FileType fileType;
std::vector<ExportedTableData> exportData;
common::CSVOption csvOption;
common::ReaderConfig boundFileInfo;
};

} // namespace binder
Expand Down
1 change: 1 addition & 0 deletions src/include/common/copier_config/reader_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class FileType : uint8_t {
struct FileTypeUtils {
static FileType getFileTypeFromExtension(std::string_view extension);
static std::string toString(FileType fileType);
static FileType fromString(std::string fileType);
};

struct ReaderConfig {
Expand Down
22 changes: 13 additions & 9 deletions src/include/planner/operator/persistent/logical_export_db.h
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
#pragma once

#include "common/copier_config/csv_reader_config.h"
#include "common/copier_config/reader_config.h"
#include "planner/operator/logical_operator.h"

namespace kuzu {
namespace planner {

class LogicalExportDatabase : public LogicalOperator {
public:
explicit LogicalExportDatabase(std::string filePath, common::CSVOption copyToOption,
std::vector<std::shared_ptr<LogicalOperator>> plans)
explicit LogicalExportDatabase(
common::ReaderConfig boundFileInfo, std::vector<std::shared_ptr<LogicalOperator>> plans)
: LogicalOperator{LogicalOperatorType::EXPORT_DATABASE, std::move(plans)},
filePath{std::move(filePath)}, copyToOption{std::move(copyToOption)} {}
boundFileInfo{std::move(boundFileInfo)} {}

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::string getFilePath() const { return filePath; }
inline const common::CSVOption* getCopyOption() const { return &copyToOption; }
inline std::string getFilePath() const { return boundFileInfo.filePaths[0]; }
inline common::FileType getFileType() const { return boundFileInfo.fileType; }
inline common::CSVOption getCopyOption() const {
auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo.options);
return csvConfig.option.copy();
}
inline const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalExportDatabase>(
filePath, std::move(copyToOption), std::move(children));
return make_unique<LogicalExportDatabase>(std::move(boundFileInfo), std::move(children));
}

private:
std::string filePath;
common::CSVOption copyToOption;
common::ReaderConfig boundFileInfo;
};

} // namespace planner
Expand Down
13 changes: 6 additions & 7 deletions src/include/processor/operator/persistent/export_db.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#pragma once

#include "common/copier_config/csv_reader_config.h"
#include "common/copier_config/reader_config.h"
#include "processor/operator/physical_operator.h"

namespace kuzu {
namespace processor {

class ExportDB : public PhysicalOperator {
public:
ExportDB(std::string filePath, common::CSVOption copyToOption, uint32_t id,
const std::string& paramsString, std::vector<std::unique_ptr<PhysicalOperator>> children)
ExportDB(common::ReaderConfig boundFileInfo, uint32_t id, const std::string& paramsString,
std::vector<std::unique_ptr<PhysicalOperator>> children)
: PhysicalOperator{PhysicalOperatorType::EXPORT_DATABASE, std::move(children), id,
paramsString},
filePath(filePath), copyToOption{std::move(copyToOption)} {}
boundFileInfo{std::move(boundFileInfo)} {}

bool canParallel() const override { return false; }

Expand All @@ -22,12 +22,11 @@ class ExportDB : public PhysicalOperator {

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ExportDB>(
filePath, std::move(copyToOption), id, paramsString, std::move(children));
std::move(boundFileInfo), id, paramsString, std::move(children));
}

private:
std::string filePath;
common::CSVOption copyToOption;
common::ReaderConfig boundFileInfo;
};
} // namespace processor
} // namespace kuzu
15 changes: 9 additions & 6 deletions src/planner/plan/plan_port_db.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include "binder/copy/bound_export_database.h"
#include "common/string_utils.h"
#include "planner/operator/persistent/logical_copy_to.h"
#include "planner/operator/persistent/logical_export_db.h"
#include "planner/planner.h"

using namespace kuzu::binder;
using namespace kuzu::storage;
using namespace kuzu::catalog;
Expand All @@ -20,18 +20,21 @@ std::unique_ptr<LogicalPlan> Planner::planExportDatabase(const BoundStatement& s
auto exportData = boundExportDatabase.getExportData();
auto logicalOperators = std::vector<std::shared_ptr<LogicalOperator>>();
auto plan = std::make_unique<LogicalPlan>();
auto fileTypeStr = FileTypeUtils::toString(fileType);
StringUtils::toLower(fileTypeStr);
auto copyToSuffix = "." + fileTypeStr;
for (auto& exportTableData : *exportData) {
auto regularQuery = exportTableData.getRegularQuery();
KU_ASSERT(regularQuery->getStatementType() == StatementType::QUERY);
auto tablePlan = getBestPlan(*regularQuery);
auto copyTo =
std::make_shared<LogicalCopyTo>(filePath + "/" + exportTableData.tableName + ".csv",
fileType, exportTableData.columnNames, exportTableData.getColumnTypesRef(),
boundExportDatabase.getCopyOption()->copy(), tablePlan->getLastOperator());
auto copyTo = std::make_shared<LogicalCopyTo>(
filePath + "/" + exportTableData.tableName + copyToSuffix, fileType,
exportTableData.columnNames, exportTableData.getColumnTypesRef(),
boundExportDatabase.getCopyOption(), tablePlan->getLastOperator());
logicalOperators.push_back(std::move(copyTo));
}
auto exportDatabase = make_shared<LogicalExportDatabase>(
filePath, boundExportDatabase.getCopyOption()->copy(), std::move(logicalOperators));
boundExportDatabase.getBoundFileInfo()->copy(), std::move(logicalOperators));
plan->setLastOperator(std::move(exportDatabase));
return plan;
}
Expand Down
5 changes: 2 additions & 3 deletions src/processor/map/map_port_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapExportDatabase(
planner::LogicalOperator* logicalOperator) {
auto exportDatabase =
ku_dynamic_cast<LogicalOperator*, LogicalExportDatabase*>(logicalOperator);
auto filePath = exportDatabase->getFilePath();
std::vector<std::unique_ptr<PhysicalOperator>> children;
for (auto childCopyTo : exportDatabase->getChildren()) {
auto childPhysicalOperator = mapOperator(childCopyTo.get());
children.push_back(std::move(childPhysicalOperator));
}
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor;
return std::make_unique<ExportDB>(filePath, exportDatabase->getCopyOption()->copy(),
getOperatorID(), exportDatabase->getExpressionsForPrinting(), std::move(children));
return std::make_unique<ExportDB>(exportDatabase->getBoundFileInfo()->copy(), getOperatorID(),
exportDatabase->getExpressionsForPrinting(), std::move(children));
}

} // namespace processor
Expand Down
29 changes: 17 additions & 12 deletions src/processor/operator/persistent/export_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "catalog/catalog_entry/rel_table_catalog_entry.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_utils.h"

using namespace kuzu::common;
using namespace kuzu::transaction;
Expand All @@ -26,10 +27,14 @@ static void writeStringStreamToFile(
}

static void writeCopyStatement(
stringstream& ss, std::string tableName, std::string filePath, std::string copyOption) {
stringstream& ss, std::string tableName, ReaderConfig* boundFileInfo) {
ss << "COPY ";
ss << tableName << " FROM \"" << filePath << "/" << tableName << ".csv";
ss << "\"" << copyOption << std::endl;
ss << tableName << " FROM \"" << boundFileInfo->filePaths[0] << "/" << tableName;
auto fileTypeStr = FileTypeUtils::toString(boundFileInfo->fileType);
StringUtils::toLower(fileTypeStr);
ss << "." << fileTypeStr;
auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo->options);
ss << "\"" << csvConfig.option.toCypher() << std::endl;
}

std::string getSchemaCypher(main::ClientContext* clientContext, transaction::Transaction* tx) {
Expand All @@ -52,16 +57,16 @@ std::string getMacroCypher(catalog::Catalog* catalog, transaction::Transaction*
return ss.str();
}

std::string getCopyCypher(catalog::Catalog* catalog, transaction::Transaction* tx,
std::string filePath, std::string copyOption) {
std::string getCopyCypher(
catalog::Catalog* catalog, transaction::Transaction* tx, ReaderConfig* boundFileInfo) {
stringstream ss;
for (auto& nodeTableEntry : catalog->getNodeTableEntries(tx)) {
auto tableName = nodeTableEntry->getName();
writeCopyStatement(ss, tableName, filePath, copyOption);
writeCopyStatement(ss, tableName, boundFileInfo);
}
for (auto& relTableEntry : catalog->getRelTableEntries(tx)) {
auto tableName = relTableEntry->getName();
writeCopyStatement(ss, tableName, filePath, copyOption);
writeCopyStatement(ss, tableName, boundFileInfo);
}
return ss.str();
}
Expand All @@ -70,17 +75,17 @@ bool ExportDB::getNextTuplesInternal(ExecutionContext* context) {
// write the schema.cypher file
writeStringStreamToFile(context->clientContext->getVFSUnsafe(),
getSchemaCypher(context->clientContext, context->clientContext->getTx()),
filePath + "/schema.cypher");
boundFileInfo.filePaths[0] + "/schema.cypher");
// write macro.cypher file
writeStringStreamToFile(context->clientContext->getVFSUnsafe(),
getMacroCypher(context->clientContext->getCatalog(), context->clientContext->getTx()),
filePath + "/macro.cypher");
boundFileInfo.filePaths[0] + "/macro.cypher");
// write the copy.cypher file
// for every table, we write COPY FROM statement
writeStringStreamToFile(context->clientContext->getVFSUnsafe(),
getCopyCypher(context->clientContext->getCatalog(), context->clientContext->getTx(),
filePath, copyToOption.toCypher()),
filePath + "/copy.cypher");
getCopyCypher(
context->clientContext->getCatalog(), context->clientContext->getTx(), &boundFileInfo),
boundFileInfo.filePaths[0] + "/copy.cypher");
return false;
}
} // namespace processor
Expand Down
Loading
Loading