Skip to content

Commit

Permalink
Merge pull request #2897 from kuzudb/export_db_parquet
Browse files Browse the repository at this point in the history
add support for exporting database to parquet files
  • Loading branch information
hououou committed Feb 18, 2024
2 parents 044f293 + cc95076 commit a38e0dc
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 56 deletions.
30 changes: 25 additions & 5 deletions src/binder/bind/bind_export_database.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "binder/copy/bound_export_database.h"
#include "binder/query/bound_regular_query.h"
#include "common/exception/binder.h"
#include "common/string_utils.h"
#include "main/client_context.h"
#include "parser/parser.h"
#include "parser/port_db.h"
Expand Down Expand Up @@ -44,6 +45,21 @@ static std::vector<ExportedTableData> getExportInfo(
return exportData;
}

FileType getFileType(std::unordered_map<std::string, common::Value>& options) {
auto fileType = FileType::CSV;
if (options.find("FORMAT") != options.end()) {
auto value = options.at("FORMAT");
if (value.getDataType()->getLogicalTypeID() != LogicalTypeID::STRING) {
throw BinderException("The type of format option must be a string.");
}
auto valueStr = value.getValue<std::string>();
StringUtils::toUpper(valueStr);
fileType = FileTypeUtils::fromString(valueStr);
options.erase("FORMAT");
}
return fileType;
}

ExportedTableData Binder::extractExportData(std::string selQuery, std::string tableName) {
auto parsedStatement = Parser::parseQuery(selQuery);
ExportedTableData exportedTableData;
Expand All @@ -65,18 +81,22 @@ std::unique_ptr<BoundStatement> Binder::bindExportDatabaseClause(const Statement
auto& exportDatabaseStatement = ku_dynamic_cast<const Statement&, const ExportDB&>(statement);
auto boundFilePath = exportDatabaseStatement.getFilePath();
auto exportData = getExportInfo(catalog, clientContext->getTx(), this);
// TODO(Jiamin): add more supported file types
auto fileType = FileType::CSV;
auto csvConfig = CSVReaderConfig::construct(
bindParsingOptions(exportDatabaseStatement.getParsingOptionsRef()));
auto parsedOptions = bindParsingOptions(exportDatabaseStatement.getParsingOptionsRef());
auto fileType = getFileType(parsedOptions);
if (fileType != FileType::CSV && fileType != FileType::PARQUET) {
throw BinderException("Export database currently only supports csv and parquet files.");
}
if (fileType != FileType::CSV && parsedOptions.size() != 0) {
throw BinderException{"Only export to csv can have options."};
}
// try to create the directory, if it doesn't exist yet
if (!vfs->fileOrPathExists(boundFilePath)) {
vfs->createDir(boundFilePath);
} else {
throw BinderException(stringFormat("Directory {} already exists.", boundFilePath));
}
return std::make_unique<BoundExportDatabase>(
boundFilePath, fileType, std::move(exportData), csvConfig.option.copy());
boundFilePath, fileType, std::move(exportData), std::move(parsedOptions));
}
} // namespace binder
} // namespace kuzu
2 changes: 2 additions & 0 deletions src/binder/bind/bind_file_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "common/exception/copy.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_format.h"
#include "common/string_utils.h"

using namespace kuzu::parser;
using namespace kuzu::binder;
Expand Down Expand Up @@ -53,6 +54,7 @@ std::unordered_map<std::string, Value> Binder::bindParsingOptions(
std::unordered_map<std::string, Value> options;
for (auto& option : parsingOptions) {
auto name = option.first;
common::StringUtils::toUpper(name);
auto expr = expressionBinder.bindExpression(*option.second);
KU_ASSERT(expr->expressionType == ExpressionType::LITERAL);
auto literalExpr = ku_dynamic_cast<Expression*, LiteralExpression*>(expr.get());
Expand Down
2 changes: 0 additions & 2 deletions src/common/copier_config/csv_reader_config.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "common/copier_config/csv_reader_config.h"

#include "common/exception/binder.h"
#include "common/string_utils.h"

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -56,7 +55,6 @@ CSVReaderConfig CSVReaderConfig::construct(
auto config = CSVReaderConfig();
for (auto& op : options) {
auto name = op.first;
StringUtils::toUpper(name);
auto isValidStringParsingOption = validateStringParsingOptionName(name);
auto isValidBoolParsingOption = validateBoolParsingOptionName(name);
if (isValidBoolParsingOption) {
Expand Down
13 changes: 13 additions & 0 deletions src/common/copier_config/reader_config.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "common/copier_config/reader_config.h"

#include "common/assert.h"
#include "common/exception/binder.h"
#include "common/exception/copy.h"

namespace kuzu {
Expand Down Expand Up @@ -57,5 +58,17 @@ std::string FileTypeUtils::toString(FileType fileType) {
}
}

FileType FileTypeUtils::fromString(std::string fileType) {
if (fileType == "CSV") {
return FileType::CSV;
} else if (fileType == "PARQUET") {
return FileType::PARQUET;
} else if (fileType == "NPY") {
return FileType::NPY;
} else {
throw BinderException(stringFormat("Unsupported file type: {}.", fileType));
}
}

} // namespace common
} // namespace kuzu
24 changes: 14 additions & 10 deletions src/include/binder/copy/bound_export_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "binder/binder.h"
#include "binder/bound_statement.h"
#include "binder/query/bound_regular_query.h"
#include "common/copier_config/csv_reader_config.h"
#include "common/copier_config/reader_config.h"

namespace kuzu {
Expand All @@ -21,22 +20,27 @@ struct ExportedTableData {
class BoundExportDatabase : public BoundStatement {
public:
BoundExportDatabase(std::string filePath, common::FileType fileType,
std::vector<ExportedTableData> exportData, common::CSVOption csvOption)
std::vector<ExportedTableData> exportData,
std::unordered_map<std::string, common::Value> csvOption)
: BoundStatement{common::StatementType::EXPORT_DATABASE,
BoundStatementResult::createEmptyResult()},
filePath{std::move(filePath)}, fileType{fileType},
exportData(std::move(exportData)), csvOption{std::move(csvOption)} {}
exportData(std::move(exportData)),
boundFileInfo(fileType, std::vector{std::move(filePath)}) {
boundFileInfo.options = std::move(csvOption);
}

inline std::string getFilePath() const { return filePath; }
inline common::FileType getFileType() const { return fileType; }
inline const common::CSVOption* getCopyOption() const { return &csvOption; }
inline std::string getFilePath() const { return boundFileInfo.filePaths[0]; }
inline common::FileType getFileType() const { return boundFileInfo.fileType; }
inline common::CSVOption getCopyOption() const {
auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo.options);
return csvConfig.option.copy();
}
inline const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; }
inline const std::vector<ExportedTableData>* getExportData() const { return &exportData; }

private:
std::string filePath;
common::FileType fileType;
std::vector<ExportedTableData> exportData;
common::CSVOption csvOption;
common::ReaderConfig boundFileInfo;
};

} // namespace binder
Expand Down
1 change: 1 addition & 0 deletions src/include/common/copier_config/reader_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class FileType : uint8_t {
struct FileTypeUtils {
static FileType getFileTypeFromExtension(std::string_view extension);
static std::string toString(FileType fileType);
static FileType fromString(std::string fileType);
};

struct ReaderConfig {
Expand Down
22 changes: 13 additions & 9 deletions src/include/planner/operator/persistent/logical_export_db.h
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
#pragma once

#include "common/copier_config/csv_reader_config.h"
#include "common/copier_config/reader_config.h"
#include "planner/operator/logical_operator.h"

namespace kuzu {
namespace planner {

class LogicalExportDatabase : public LogicalOperator {
public:
explicit LogicalExportDatabase(std::string filePath, common::CSVOption copyToOption,
std::vector<std::shared_ptr<LogicalOperator>> plans)
explicit LogicalExportDatabase(
common::ReaderConfig boundFileInfo, std::vector<std::shared_ptr<LogicalOperator>> plans)
: LogicalOperator{LogicalOperatorType::EXPORT_DATABASE, std::move(plans)},
filePath{std::move(filePath)}, copyToOption{std::move(copyToOption)} {}
boundFileInfo{std::move(boundFileInfo)} {}

inline std::string getExpressionsForPrinting() const override { return std::string{}; }

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::string getFilePath() const { return filePath; }
inline const common::CSVOption* getCopyOption() const { return &copyToOption; }
inline std::string getFilePath() const { return boundFileInfo.filePaths[0]; }
inline common::FileType getFileType() const { return boundFileInfo.fileType; }
inline common::CSVOption getCopyOption() const {
auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo.options);
return csvConfig.option.copy();
}
inline const common::ReaderConfig* getBoundFileInfo() const { return &boundFileInfo; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalExportDatabase>(
filePath, std::move(copyToOption), std::move(children));
return make_unique<LogicalExportDatabase>(std::move(boundFileInfo), std::move(children));
}

private:
std::string filePath;
common::CSVOption copyToOption;
common::ReaderConfig boundFileInfo;
};

} // namespace planner
Expand Down
13 changes: 6 additions & 7 deletions src/include/processor/operator/persistent/export_db.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#pragma once

#include "common/copier_config/csv_reader_config.h"
#include "common/copier_config/reader_config.h"
#include "processor/operator/physical_operator.h"

namespace kuzu {
namespace processor {

class ExportDB : public PhysicalOperator {
public:
ExportDB(std::string filePath, common::CSVOption copyToOption, uint32_t id,
const std::string& paramsString, std::vector<std::unique_ptr<PhysicalOperator>> children)
ExportDB(common::ReaderConfig boundFileInfo, uint32_t id, const std::string& paramsString,
std::vector<std::unique_ptr<PhysicalOperator>> children)
: PhysicalOperator{PhysicalOperatorType::EXPORT_DATABASE, std::move(children), id,
paramsString},
filePath(filePath), copyToOption{std::move(copyToOption)} {}
boundFileInfo{std::move(boundFileInfo)} {}

bool canParallel() const override { return false; }

Expand All @@ -22,12 +22,11 @@ class ExportDB : public PhysicalOperator {

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ExportDB>(
filePath, std::move(copyToOption), id, paramsString, std::move(children));
std::move(boundFileInfo), id, paramsString, std::move(children));
}

private:
std::string filePath;
common::CSVOption copyToOption;
common::ReaderConfig boundFileInfo;
};
} // namespace processor
} // namespace kuzu
15 changes: 9 additions & 6 deletions src/planner/plan/plan_port_db.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#include "binder/copy/bound_export_database.h"
#include "common/string_utils.h"
#include "planner/operator/persistent/logical_copy_to.h"
#include "planner/operator/persistent/logical_export_db.h"
#include "planner/planner.h"

using namespace kuzu::binder;
using namespace kuzu::storage;
using namespace kuzu::catalog;
Expand All @@ -20,18 +20,21 @@ std::unique_ptr<LogicalPlan> Planner::planExportDatabase(const BoundStatement& s
auto exportData = boundExportDatabase.getExportData();
auto logicalOperators = std::vector<std::shared_ptr<LogicalOperator>>();
auto plan = std::make_unique<LogicalPlan>();
auto fileTypeStr = FileTypeUtils::toString(fileType);
StringUtils::toLower(fileTypeStr);
auto copyToSuffix = "." + fileTypeStr;
for (auto& exportTableData : *exportData) {
auto regularQuery = exportTableData.getRegularQuery();
KU_ASSERT(regularQuery->getStatementType() == StatementType::QUERY);
auto tablePlan = getBestPlan(*regularQuery);
auto copyTo =
std::make_shared<LogicalCopyTo>(filePath + "/" + exportTableData.tableName + ".csv",
fileType, exportTableData.columnNames, exportTableData.getColumnTypesRef(),
boundExportDatabase.getCopyOption()->copy(), tablePlan->getLastOperator());
auto copyTo = std::make_shared<LogicalCopyTo>(
filePath + "/" + exportTableData.tableName + copyToSuffix, fileType,
exportTableData.columnNames, exportTableData.getColumnTypesRef(),
boundExportDatabase.getCopyOption(), tablePlan->getLastOperator());
logicalOperators.push_back(std::move(copyTo));
}
auto exportDatabase = make_shared<LogicalExportDatabase>(
filePath, boundExportDatabase.getCopyOption()->copy(), std::move(logicalOperators));
boundExportDatabase.getBoundFileInfo()->copy(), std::move(logicalOperators));
plan->setLastOperator(std::move(exportDatabase));
return plan;
}
Expand Down
5 changes: 2 additions & 3 deletions src/processor/map/map_port_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapExportDatabase(
planner::LogicalOperator* logicalOperator) {
auto exportDatabase =
ku_dynamic_cast<LogicalOperator*, LogicalExportDatabase*>(logicalOperator);
auto filePath = exportDatabase->getFilePath();
std::vector<std::unique_ptr<PhysicalOperator>> children;
for (auto childCopyTo : exportDatabase->getChildren()) {
auto childPhysicalOperator = mapOperator(childCopyTo.get());
children.push_back(std::move(childPhysicalOperator));
}
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor;
return std::make_unique<ExportDB>(filePath, exportDatabase->getCopyOption()->copy(),
getOperatorID(), exportDatabase->getExpressionsForPrinting(), std::move(children));
return std::make_unique<ExportDB>(exportDatabase->getBoundFileInfo()->copy(), getOperatorID(),
exportDatabase->getExpressionsForPrinting(), std::move(children));
}

} // namespace processor
Expand Down
29 changes: 17 additions & 12 deletions src/processor/operator/persistent/export_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "catalog/catalog_entry/node_table_catalog_entry.h"
#include "catalog/catalog_entry/rel_table_catalog_entry.h"
#include "common/file_system/virtual_file_system.h"
#include "common/string_utils.h"

using namespace kuzu::common;
using namespace kuzu::transaction;
Expand All @@ -26,10 +27,14 @@ static void writeStringStreamToFile(
}

static void writeCopyStatement(
stringstream& ss, std::string tableName, std::string filePath, std::string copyOption) {
stringstream& ss, std::string tableName, ReaderConfig* boundFileInfo) {
ss << "COPY ";
ss << tableName << " FROM \"" << filePath << "/" << tableName << ".csv";
ss << "\"" << copyOption << std::endl;
ss << tableName << " FROM \"" << boundFileInfo->filePaths[0] << "/" << tableName;
auto fileTypeStr = FileTypeUtils::toString(boundFileInfo->fileType);
StringUtils::toLower(fileTypeStr);
ss << "." << fileTypeStr;
auto csvConfig = common::CSVReaderConfig::construct(boundFileInfo->options);
ss << "\"" << csvConfig.option.toCypher() << std::endl;
}

std::string getSchemaCypher(main::ClientContext* clientContext, transaction::Transaction* tx) {
Expand All @@ -52,16 +57,16 @@ std::string getMacroCypher(catalog::Catalog* catalog, transaction::Transaction*
return ss.str();
}

std::string getCopyCypher(catalog::Catalog* catalog, transaction::Transaction* tx,
std::string filePath, std::string copyOption) {
std::string getCopyCypher(
catalog::Catalog* catalog, transaction::Transaction* tx, ReaderConfig* boundFileInfo) {
stringstream ss;
for (auto& nodeTableEntry : catalog->getNodeTableEntries(tx)) {
auto tableName = nodeTableEntry->getName();
writeCopyStatement(ss, tableName, filePath, copyOption);
writeCopyStatement(ss, tableName, boundFileInfo);
}
for (auto& relTableEntry : catalog->getRelTableEntries(tx)) {
auto tableName = relTableEntry->getName();
writeCopyStatement(ss, tableName, filePath, copyOption);
writeCopyStatement(ss, tableName, boundFileInfo);
}
return ss.str();
}
Expand All @@ -70,17 +75,17 @@ bool ExportDB::getNextTuplesInternal(ExecutionContext* context) {
// write the schema.cypher file
writeStringStreamToFile(context->clientContext->getVFSUnsafe(),
getSchemaCypher(context->clientContext, context->clientContext->getTx()),
filePath + "/schema.cypher");
boundFileInfo.filePaths[0] + "/schema.cypher");
// write macro.cypher file
writeStringStreamToFile(context->clientContext->getVFSUnsafe(),
getMacroCypher(context->clientContext->getCatalog(), context->clientContext->getTx()),
filePath + "/macro.cypher");
boundFileInfo.filePaths[0] + "/macro.cypher");
// write the copy.cypher file
// for every table, we write COPY FROM statement
writeStringStreamToFile(context->clientContext->getVFSUnsafe(),
getCopyCypher(context->clientContext->getCatalog(), context->clientContext->getTx(),
filePath, copyToOption.toCypher()),
filePath + "/copy.cypher");
getCopyCypher(
context->clientContext->getCatalog(), context->clientContext->getTx(), &boundFileInfo),
boundFileInfo.filePaths[0] + "/copy.cypher");
return false;
}
} // namespace processor
Expand Down
Loading

0 comments on commit a38e0dc

Please sign in to comment.