diff --git a/src/binder/bind/bind_copy.cpp b/src/binder/bind/bind_copy.cpp index 62101ba5d45..68a67799d6f 100644 --- a/src/binder/bind/bind_copy.cpp +++ b/src/binder/bind/bind_copy.cpp @@ -38,9 +38,9 @@ std::unique_ptr Binder::bindCopyToClause(const Statement& statem if (fileType != FileType::CSV && copyToStatement.getParsingOptionsRef().size() != 0) { throw BinderException{"Only copy to csv can have options."}; } - auto csvOption = bindParsingOptions(copyToStatement.getParsingOptionsRef()); + auto csvConfig = bindParsingOptions(copyToStatement.getParsingOptionsRef()); return std::make_unique(boundFilePath, fileType, std::move(columnNames), - std::move(columnTypes), std::move(query), std::move(csvOption)); + std::move(columnTypes), std::move(query), std::move(csvConfig->option.copy())); } // As a temporary constraint, we require npy files loaded with COPY FROM BY COLUMN keyword. @@ -77,11 +77,11 @@ std::unique_ptr Binder::bindCopyFromClause(const Statement& stat default: break; } - auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptionsRef()); + auto csvConfig = bindParsingOptions(copyStatement.getParsingOptionsRef()); auto filePaths = bindFilePaths(copyStatement.getFilePaths()); auto fileType = bindFileType(filePaths); auto readerConfig = - std::make_unique(fileType, std::move(filePaths), std::move(csvReaderConfig)); + std::make_unique(fileType, std::move(filePaths), std::move(csvConfig)); validateByColumnKeyword(readerConfig->fileType, copyStatement.byColumn()); if (readerConfig->fileType == FileType::NPY) { validateCopyNpyNotForRelTables(tableSchema); @@ -111,7 +111,7 @@ std::unique_ptr Binder::bindCopyFromClause(const Statement& stat std::unique_ptr Binder::bindCopyNodeFrom(const Statement& statement, std::unique_ptr config, TableSchema* tableSchema) { auto& copyStatement = reinterpret_cast(statement); - auto func = getScanFunction(config->fileType, config->csvReaderConfig->parallel); + auto func = getScanFunction(config->fileType, *config); // For table with SERIAL columns, we need to read in serial from files. auto containsSerial = tableSchema->containsColumnType(LogicalType(LogicalTypeID::SERIAL)); std::vector expectedColumnNames; @@ -137,7 +137,7 @@ std::unique_ptr Binder::bindCopyNodeFrom(const Statement& statem std::unique_ptr Binder::bindCopyRelFrom(const parser::Statement& statement, std::unique_ptr config, TableSchema* tableSchema) { auto& copyStatement = reinterpret_cast(statement); - auto func = getScanFunction(config->fileType, config->csvReaderConfig->parallel); + auto func = getScanFunction(config->fileType, *config); // For table with SERIAL columns, we need to read in serial from files. auto containsSerial = tableSchema->containsColumnType(LogicalType(LogicalTypeID::SERIAL)); KU_ASSERT(containsSerial == false); diff --git a/src/binder/bind/bind_file_scan.cpp b/src/binder/bind/bind_file_scan.cpp index 58c7fcd0fc9..2154a66f54c 100644 --- a/src/binder/bind/bind_file_scan.cpp +++ b/src/binder/bind/bind_file_scan.cpp @@ -1,5 +1,6 @@ #include "binder/binder.h" #include "binder/expression/literal_expression.h" +#include "common/copier_config/csv_reader_config.h" #include "common/exception/binder.h" #include "common/exception/copy.h" #include "common/string_format.h" @@ -63,23 +64,23 @@ static char bindParsingOptionValue(std::string value) { } static void bindBoolParsingOption( - CSVReaderConfig& csvReaderConfig, const std::string& optionName, bool optionValue) { + CSVReaderConfig& config, const std::string& optionName, bool optionValue) { if (optionName == "HEADER") { - csvReaderConfig.hasHeader = optionValue; + config.option.hasHeader = optionValue; } else if (optionName == "PARALLEL") { - csvReaderConfig.parallel = optionValue; + config.parallel = optionValue; } } static void bindStringParsingOption( - CSVReaderConfig& csvReaderConfig, const std::string& optionName, std::string& optionValue) { + CSVReaderConfig& config, const std::string& optionName, std::string& optionValue) { auto parsingOptionValue = bindParsingOptionValue(optionValue); if (optionName == "ESCAPE") { - csvReaderConfig.escapeChar = parsingOptionValue; + config.option.escapeChar = parsingOptionValue; } else if (optionName == "DELIM") { - csvReaderConfig.delimiter = parsingOptionValue; + config.option.delimiter = parsingOptionValue; } else if (optionName == "QUOTE") { - csvReaderConfig.quoteChar = parsingOptionValue; + config.option.quoteChar = parsingOptionValue; } } diff --git a/src/binder/bind/bind_reading_clause.cpp b/src/binder/bind/bind_reading_clause.cpp index dce431b541f..a4e1656a628 100644 --- a/src/binder/bind/bind_reading_clause.cpp +++ b/src/binder/bind/bind_reading_clause.cpp @@ -170,8 +170,7 @@ std::unique_ptr Binder::bindLoadFrom( expectedColumnNames.push_back(name); expectedColumnTypes.push_back(bindDataType(type)); } - auto scanFunction = - getScanFunction(readerConfig->fileType, readerConfig->csvReaderConfig->parallel); + auto scanFunction = getScanFunction(readerConfig->fileType, *readerConfig); auto bindInput = std::make_unique(memoryManager, *readerConfig, std::move(expectedColumnNames), std::move(expectedColumnTypes)); auto bindData = scanFunction->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog); diff --git a/src/binder/bind/copy/bind_copy_rdf_graph.cpp b/src/binder/bind/copy/bind_copy_rdf_graph.cpp index 1faa8a20d2e..60578daaf94 100644 --- a/src/binder/bind/copy/bind_copy_rdf_graph.cpp +++ b/src/binder/bind/copy/bind_copy_rdf_graph.cpp @@ -16,7 +16,7 @@ namespace binder { std::unique_ptr Binder::bindCopyRdfNodeFrom(const Statement& /*statement*/, std::unique_ptr config, TableSchema* tableSchema) { - auto func = getScanFunction(config->fileType, config->csvReaderConfig->parallel); + auto func = getScanFunction(config->fileType, *config); bool containsSerial; auto stringType = LogicalType{LogicalTypeID::STRING}; std::vector columnNames; @@ -25,13 +25,13 @@ std::unique_ptr Binder::bindCopyRdfNodeFrom(const Statement& /*s if (tableSchema->tableName.ends_with(rdf::RESOURCE_TABLE_SUFFIX)) { containsSerial = false; columnTypes.push_back(stringType.copy()); - config->rdfReaderConfig = + config->extraConfig = std::make_unique(RdfReaderMode::RESOURCE, nullptr /* index */); } else { KU_ASSERT(tableSchema->tableName.ends_with(rdf::LITERAL_TABLE_SUFFIX)); containsSerial = true; columnTypes.push_back(RdfVariantType::getType()); - config->rdfReaderConfig = + config->extraConfig = std::make_unique(RdfReaderMode::LITERAL, nullptr /* index */); } auto bindInput = std::make_unique( @@ -52,7 +52,7 @@ std::unique_ptr Binder::bindCopyRdfNodeFrom(const Statement& /*s std::unique_ptr Binder::bindCopyRdfRelFrom(const Statement& /*statement*/, std::unique_ptr config, TableSchema* tableSchema) { - auto func = getScanFunction(config->fileType, config->csvReaderConfig->parallel); + auto func = getScanFunction(config->fileType, *config); auto containsSerial = false; std::vector columnNames; columnNames.emplace_back(InternalKeyword::SRC_OFFSET); @@ -67,10 +67,10 @@ std::unique_ptr Binder::bindCopyRdfRelFrom(const Statement& /*st auto resourceTableID = relTableSchema->getSrcTableID(); auto index = storageManager->getPKIndex(resourceTableID); if (tableSchema->tableName.ends_with(rdf::RESOURCE_TRIPLE_TABLE_SUFFIX)) { - config->rdfReaderConfig = + config->extraConfig = std::make_unique(RdfReaderMode::RESOURCE_TRIPLE, index); } else { - config->rdfReaderConfig = + config->extraConfig = std::make_unique(RdfReaderMode::LITERAL_TRIPLE, index); } auto bindInput = std::make_unique( diff --git a/src/binder/binder.cpp b/src/binder/binder.cpp index 57eb4a51d81..b2f89b37cd7 100644 --- a/src/binder/binder.cpp +++ b/src/binder/binder.cpp @@ -1,6 +1,7 @@ #include "binder/binder.h" #include "binder/bound_statement_rewriter.h" +#include "common/copier_config/csv_reader_config.h" #include "common/exception/binder.h" #include "common/string_format.h" #include "function/table_functions.h" @@ -209,25 +210,27 @@ void Binder::restoreScope(std::unique_ptr prevVariableScope) { scope = std::move(prevVariableScope); } -function::TableFunction* Binder::getScanFunction(common::FileType fileType, bool isParallel) { +function::TableFunction* Binder::getScanFunction(FileType fileType, const ReaderConfig& config) { function::Function* func; auto stringType = LogicalType(LogicalTypeID::STRING); std::vector inputTypes; inputTypes.push_back(&stringType); + auto functions = catalog.getBuiltInFunctions(); switch (fileType) { case common::FileType::PARQUET: { - func = - catalog.getBuiltInFunctions()->matchScalarFunction(READ_PARQUET_FUNC_NAME, inputTypes); + func = functions->matchScalarFunction(READ_PARQUET_FUNC_NAME, inputTypes); } break; case common::FileType::NPY: { - func = catalog.getBuiltInFunctions()->matchScalarFunction(READ_NPY_FUNC_NAME, inputTypes); + func = functions->matchScalarFunction(READ_NPY_FUNC_NAME, inputTypes); } break; case common::FileType::CSV: { - func = catalog.getBuiltInFunctions()->matchScalarFunction( - isParallel ? READ_CSV_PARALLEL_FUNC_NAME : READ_CSV_SERIAL_FUNC_NAME, inputTypes); + auto csvConfig = reinterpret_cast(config.extraConfig.get()); + func = functions->matchScalarFunction( + csvConfig->parallel ? READ_CSV_PARALLEL_FUNC_NAME : READ_CSV_SERIAL_FUNC_NAME, + inputTypes); } break; case common::FileType::TURTLE: { - func = catalog.getBuiltInFunctions()->matchScalarFunction(READ_RDF_FUNC_NAME, inputTypes); + func = functions->matchScalarFunction(READ_RDF_FUNC_NAME, inputTypes); } break; default: KU_UNREACHABLE; diff --git a/src/common/copier_config/CMakeLists.txt b/src/common/copier_config/CMakeLists.txt index fba3f7abf2a..cfa682baaa8 100644 --- a/src/common/copier_config/CMakeLists.txt +++ b/src/common/copier_config/CMakeLists.txt @@ -1,6 +1,6 @@ add_library(kuzu_common_copier_config OBJECT - copier_config.cpp) + reader_config.cpp) set(ALL_OBJECT_FILES ${ALL_OBJECT_FILES} $ diff --git a/src/common/copier_config/copier_config.cpp b/src/common/copier_config/reader_config.cpp similarity index 95% rename from src/common/copier_config/copier_config.cpp rename to src/common/copier_config/reader_config.cpp index 20742f9f972..9d033c1565f 100644 --- a/src/common/copier_config/copier_config.cpp +++ b/src/common/copier_config/reader_config.cpp @@ -1,4 +1,4 @@ -#include "common/copier_config/copier_config.h" +#include "common/copier_config/reader_config.h" #include "common/assert.h" #include "common/exception/copy.h" diff --git a/src/function/cast/cast_fixed_list.cpp b/src/function/cast/cast_fixed_list.cpp index a3806321de7..b7e9dab5eb0 100644 --- a/src/function/cast/cast_fixed_list.cpp +++ b/src/function/cast/cast_fixed_list.cpp @@ -166,21 +166,20 @@ void CastFixedList::stringtoFixedListCastExecFunction( const std::vector>& params, ValueVector& result, void* dataPtr) { KU_ASSERT(params.size() == 1); const auto& param = params[0]; - auto csvReaderConfig = &reinterpret_cast(dataPtr)->csvConfig; + auto option = &reinterpret_cast(dataPtr)->csvConfig.option; if (param->state->isFlat()) { auto inputPos = param->state->selVector->selectedPositions[0]; auto resultPos = result.state->selVector->selectedPositions[0]; result.setNull(resultPos, param->isNull(inputPos)); if (!result.isNull(inputPos)) { CastString::castToFixedList( - param->getValue(inputPos), &result, resultPos, csvReaderConfig); + param->getValue(inputPos), &result, resultPos, option); } } else if (param->state->selVector->isUnfiltered()) { for (auto i = 0u; i < param->state->selVector->selectedSize; i++) { result.setNull(i, param->isNull(i)); if (!result.isNull(i)) { - CastString::castToFixedList( - param->getValue(i), &result, i, csvReaderConfig); + CastString::castToFixedList(param->getValue(i), &result, i, option); } } } else { @@ -189,7 +188,7 @@ void CastFixedList::stringtoFixedListCastExecFunction( result.setNull(pos, param->isNull(pos)); if (!result.isNull(pos)) { CastString::castToFixedList( - param->getValue(pos), &result, pos, csvReaderConfig); + param->getValue(pos), &result, pos, option); } } } @@ -209,14 +208,12 @@ void CastFixedList::stringtoFixedListCastExecFunction const std::vector>& params, ValueVector& result, void* dataPtr) { KU_ASSERT(params.size() == 1); auto numOfEntries = reinterpret_cast(dataPtr)->numOfEntries; - auto csvReaderConfig = &reinterpret_cast(dataPtr)->csvConfig; - + auto option = &reinterpret_cast(dataPtr)->csvConfig.option; auto inputVector = params[0].get(); for (auto i = 0u; i < numOfEntries; i++) { result.setNull(i, inputVector->isNull(i)); if (!result.isNull(i)) { - CastString::castToFixedList( - inputVector->getValue(i), &result, i, csvReaderConfig); + CastString::castToFixedList(inputVector->getValue(i), &result, i, option); } } } diff --git a/src/function/cast_from_string_functions.cpp b/src/function/cast_from_string_functions.cpp index fa32e963150..b4ac78e5ede 100644 --- a/src/function/cast_from_string_functions.cpp +++ b/src/function/cast_from_string_functions.cpp @@ -16,102 +16,102 @@ namespace function { struct CastStringHelper { template static void cast(const char* input, uint64_t len, T& result, ValueVector* /*vector*/ = nullptr, - uint64_t /*rowToAdd*/ = 0, const CSVReaderConfig* /*csvReaderConfig*/ = nullptr) { + uint64_t /*rowToAdd*/ = 0, const CSVOption* /*option*/ = nullptr) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::INT64}); } static void castToFixedList(const char* input, uint64_t len, ValueVector* vector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + uint64_t rowToAdd, const CSVOption* option); }; template<> inline void CastStringHelper::cast(const char* input, uint64_t len, int128_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleInt128Cast(input, len, result); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, int32_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::INT32}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, int16_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::INT16}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, int8_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::INT8}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, uint64_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::UINT64}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, uint32_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::UINT32}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, uint16_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::UINT16}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, uint8_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(input, len, result, LogicalType{LogicalTypeID::UINT8}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, float_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { doubleCast(input, len, result, LogicalType{LogicalTypeID::FLOAT}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, double_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { doubleCast(input, len, result, LogicalType{LogicalTypeID::DOUBLE}); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, bool& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { castStringToBool(input, len, result); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, date_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result = Date::fromCString(input, len); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, timestamp_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result = Timestamp::fromCString(input, len); } template<> inline void CastStringHelper::cast(const char* input, uint64_t len, interval_t& result, - ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*vector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result = Interval::fromCString(input, len); } // ---------------------- cast String to Blob ------------------------------ // template<> void CastString::operation(const ku_string_t& input, blob_t& result, ValueVector* resultVector, - uint64_t /*rowToAdd*/, const CSVReaderConfig* /*csvReaderConfig*/) { + uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result.value.len = Blob::getBlobSize(input); if (!ku_string_t::isShortString(result.value.len)) { auto overflowBuffer = StringVector::getInMemOverflowBuffer(resultVector); @@ -127,7 +127,7 @@ void CastString::operation(const ku_string_t& input, blob_t& result, ValueVector template<> void CastStringHelper::cast(const char* input, uint64_t len, blob_t& /*result*/, - ValueVector* vector, uint64_t rowToAdd, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* vector, uint64_t rowToAdd, const CSVOption* /*option*/) { // base case: blob auto blobBuffer = std::make_unique(len); auto blobLen = Blob::fromString(input, len, blobBuffer.get()); @@ -160,8 +160,8 @@ static bool skipToCloseQuotes(const char*& input, const char* end) { return false; } -static bool skipToClose(const char*& input, const char* end, uint64_t& lvl, char target, - const CSVReaderConfig* csvReaderConfig) { +static bool skipToClose( + const char*& input, const char* end, uint64_t& lvl, char target, const CSVOption* option) { input++; while (input != end) { if (*input == '\'') { @@ -169,12 +169,11 @@ static bool skipToClose(const char*& input, const char* end, uint64_t& lvl, char return false; } } else if (*input == '{') { // must have closing brackets {, ] if they are not quoted - if (!skipToClose(input, end, lvl, '}', csvReaderConfig)) { + if (!skipToClose(input, end, lvl, '}', option)) { return false; } } else if (*input == CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR) { - if (!skipToClose( - input, end, lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, csvReaderConfig)) { + if (!skipToClose(input, end, lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, option)) { return false; } lvl++; // nested one more level @@ -214,11 +213,11 @@ struct CountPartOperation { uint64_t count = 0; static inline bool handleKey( - const char* /*start*/, const char* /*end*/, const CSVReaderConfig* /*config*/) { + const char* /*start*/, const char* /*end*/, const CSVOption* /*config*/) { return true; } inline void handleValue( - const char* /*start*/, const char* /*end*/, const CSVReaderConfig* /*config*/) { + const char* /*start*/, const char* /*end*/, const CSVOption* /*config*/) { count++; } }; @@ -230,16 +229,15 @@ struct SplitStringListOperation { uint64_t& offset; ValueVector* resultVector; - void handleValue(const char* start, const char* end, const CSVReaderConfig* csvReaderConfig) { - CastString::copyStringToVector(resultVector, offset, - std::string_view{start, (uint32_t)(end - start)}, csvReaderConfig); + void handleValue(const char* start, const char* end, const CSVOption* option) { + CastString::copyStringToVector( + resultVector, offset, std::string_view{start, (uint32_t)(end - start)}, option); offset++; } }; template -static bool splitCStringList( - const char* input, uint64_t len, T& state, const CSVReaderConfig* csvReaderConfig) { +static bool splitCStringList(const char* input, uint64_t len, T& state, const CSVOption* option) { auto end = input + len; uint64_t lvl = 1; bool seen_value = false; @@ -255,8 +253,7 @@ static bool splitCStringList( while (input < end) { auto ch = *input; if (ch == CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR) { - if (!skipToClose( - input, end, ++lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, csvReaderConfig)) { + if (!skipToClose(input, end, ++lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, option)) { return false; } } else if (ch == '\'' || ch == '"') { @@ -265,11 +262,11 @@ static bool splitCStringList( } } else if (ch == '{') { uint64_t struct_lvl = 0; - skipToClose(input, end, struct_lvl, '}', csvReaderConfig); - } else if (ch == csvReaderConfig->delimiter || + skipToClose(input, end, struct_lvl, '}', option); + } else if (ch == option->delimiter || ch == CopyConstants::DEFAULT_CSV_LIST_END_CHAR) { // split if (ch != CopyConstants::DEFAULT_CSV_LIST_END_CHAR || start_ptr < input || seen_value) { - state.handleValue(start_ptr, input, csvReaderConfig); + state.handleValue(start_ptr, input, option); seen_value = true; } if (ch == CopyConstants::DEFAULT_CSV_LIST_END_CHAR) { // last ] @@ -286,9 +283,9 @@ static bool splitCStringList( } template -static inline void startListCast(const char* input, uint64_t len, T split, - const CSVReaderConfig* csvReaderConfig, ValueVector* vector) { - if (!splitCStringList(input, len, split, csvReaderConfig)) { +static inline void startListCast( + const char* input, uint64_t len, T split, const CSVOption* option, ValueVector* vector) { + if (!splitCStringList(input, len, split, option)) { throw ConversionException("Cast failed. " + std::string{input, len} + " is not in " + vector->dataType.toString() + " range."); } @@ -296,24 +293,24 @@ static inline void startListCast(const char* input, uint64_t len, T split, template<> void CastStringHelper::cast(const char* input, uint64_t len, list_entry_t& /*result*/, - ValueVector* vector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + ValueVector* vector, uint64_t rowToAdd, const CSVOption* option) { // calculate the number of elements in array CountPartOperation state; - splitCStringList(input, len, state, csvReaderConfig); + splitCStringList(input, len, state, option); auto list_entry = ListVector::addList(vector, state.count); vector->setValue(rowToAdd, list_entry); auto listDataVector = ListVector::getDataVector(vector); SplitStringListOperation split{list_entry.offset, listDataVector}; - startListCast(input, len, split, csvReaderConfig, vector); + startListCast(input, len, split, option, vector); } template<> void CastString::operation(const ku_string_t& input, list_entry_t& result, - ValueVector* resultVector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + ValueVector* resultVector, uint64_t rowToAdd, const CSVOption* option) { CastStringHelper::cast(reinterpret_cast(input.getData()), input.len, result, - resultVector, rowToAdd, csvReaderConfig); + resultVector, rowToAdd, option); } // ---------------------- cast String to FixedList ------------------------------ // @@ -325,8 +322,7 @@ struct SplitStringFixedListOperation { uint64_t& offset; ValueVector* resultVector; - void handleValue( - const char* start, const char* end, const CSVReaderConfig* /*csvReaderConfig*/) { + void handleValue(const char* start, const char* end, const CSVOption* /*option*/) { T value; auto str = std::string_view{start, (uint32_t)(end - start)}; if (str.empty() || isNull(str)) { @@ -348,13 +344,13 @@ static void validateNumElementsInList(uint64_t numElementsRead, const LogicalTyp } void CastStringHelper::castToFixedList(const char* input, uint64_t len, ValueVector* vector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + uint64_t rowToAdd, const CSVOption* option) { KU_ASSERT(vector->dataType.getLogicalTypeID() == LogicalTypeID::FIXED_LIST); auto childDataType = FixedListType::getChildType(&vector->dataType); // calculate the number of elements in array CountPartOperation state; - splitCStringList(input, len, state, csvReaderConfig); + splitCStringList(input, len, state, option); validateNumElementsInList(state.count, vector->dataType); auto startOffset = state.count * rowToAdd; @@ -362,23 +358,23 @@ void CastStringHelper::castToFixedList(const char* input, uint64_t len, ValueVec // TODO(Kebing): currently only allow these type case LogicalTypeID::INT64: { SplitStringFixedListOperation split{startOffset, vector}; - startListCast(input, len, split, csvReaderConfig, vector); + startListCast(input, len, split, option, vector); } break; case LogicalTypeID::INT32: { SplitStringFixedListOperation split{startOffset, vector}; - startListCast(input, len, split, csvReaderConfig, vector); + startListCast(input, len, split, option, vector); } break; case LogicalTypeID::INT16: { SplitStringFixedListOperation split{startOffset, vector}; - startListCast(input, len, split, csvReaderConfig, vector); + startListCast(input, len, split, option, vector); } break; case LogicalTypeID::FLOAT: { SplitStringFixedListOperation split{startOffset, vector}; - startListCast(input, len, split, csvReaderConfig, vector); + startListCast(input, len, split, option, vector); } break; case LogicalTypeID::DOUBLE: { SplitStringFixedListOperation split{startOffset, vector}; - startListCast(input, len, split, csvReaderConfig, vector); + startListCast(input, len, split, option, vector); } break; default: { throw NotImplementedException("Unsupported data type: Function::castStringToFixedList"); @@ -387,9 +383,9 @@ void CastStringHelper::castToFixedList(const char* input, uint64_t len, ValueVec } void CastString::castToFixedList(const ku_string_t& input, ValueVector* resultVector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { - CastStringHelper::castToFixedList(reinterpret_cast(input.getData()), input.len, - resultVector, rowToAdd, csvReaderConfig); + uint64_t rowToAdd, const CSVOption* option) { + CastStringHelper::castToFixedList( + reinterpret_cast(input.getData()), input.len, resultVector, rowToAdd, option); } // ---------------------- cast String to Map ------------------------------ // @@ -400,25 +396,23 @@ struct SplitStringMapOperation { uint64_t& offset; ValueVector* resultVector; - inline bool handleKey( - const char* start, const char* end, const CSVReaderConfig* csvReaderConfig) { + inline bool handleKey(const char* start, const char* end, const CSVOption* option) { trimRightWhitespace(start, end); CastString::copyStringToVector(StructVector::getFieldVector(resultVector, 0).get(), offset, - std::string_view{start, (uint32_t)(end - start)}, csvReaderConfig); + std::string_view{start, (uint32_t)(end - start)}, option); return true; } - inline void handleValue( - const char* start, const char* end, const CSVReaderConfig* csvReaderConfig) { + inline void handleValue(const char* start, const char* end, const CSVOption* option) { trimRightWhitespace(start, end); CastString::copyStringToVector(StructVector::getFieldVector(resultVector, 1).get(), - offset++, std::string_view{start, (uint32_t)(end - start)}, csvReaderConfig); + offset++, std::string_view{start, (uint32_t)(end - start)}, option); } }; template static bool parseKeyOrValue(const char*& input, const char* end, T& state, bool isKey, - bool& closeBracket, const CSVReaderConfig* csvReaderConfig) { + bool& closeBracket, const CSVOption* option) { auto start = input; uint64_t lvl = 0; @@ -428,18 +422,17 @@ static bool parseKeyOrValue(const char*& input, const char* end, T& state, bool return false; } } else if (*input == '{') { - if (!skipToClose(input, end, lvl, '}', csvReaderConfig)) { + if (!skipToClose(input, end, lvl, '}', option)) { return false; } } else if (*input == CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR) { - if (!skipToClose( - input, end, lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, csvReaderConfig)) { + if (!skipToClose(input, end, lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, option)) { return false; } } else if (isKey && *input == '=') { - return state.handleKey(start, input, csvReaderConfig); - } else if (!isKey && (*input == csvReaderConfig->delimiter || *input == '}')) { - state.handleValue(start, input, csvReaderConfig); + return state.handleKey(start, input, option); + } else if (!isKey && (*input == option->delimiter || *input == '}')) { + state.handleValue(start, input, option); if (*input == '}') { closeBracket = true; } @@ -452,8 +445,7 @@ static bool parseKeyOrValue(const char*& input, const char* end, T& state, bool // Split map of format: {a=12,b=13} template -static bool splitCStringMap( - const char* input, uint64_t len, T& state, const CSVReaderConfig* csvReaderConfig) { +static bool splitCStringMap(const char* input, uint64_t len, T& state, const CSVOption* option) { auto end = input + len; bool closeBracket = false; @@ -471,11 +463,11 @@ static bool splitCStringMap( } while (input < end) { - if (!parseKeyOrValue(input, end, state, true, closeBracket, csvReaderConfig)) { + if (!parseKeyOrValue(input, end, state, true, closeBracket, option)) { return false; } skipWhitespace(++input, end); - if (!parseKeyOrValue(input, end, state, false, closeBracket, csvReaderConfig)) { + if (!parseKeyOrValue(input, end, state, false, closeBracket, option)) { return false; } skipWhitespace(++input, end); @@ -488,17 +480,17 @@ static bool splitCStringMap( template<> void CastStringHelper::cast(const char* input, uint64_t len, map_entry_t& /*result*/, - ValueVector* vector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + ValueVector* vector, uint64_t rowToAdd, const CSVOption* option) { // count the number of maps in map CountPartOperation state; - splitCStringMap(input, len, state, csvReaderConfig); + splitCStringMap(input, len, state, option); auto list_entry = ListVector::addList(vector, state.count); vector->setValue(rowToAdd, list_entry); auto structVector = ListVector::getDataVector(vector); SplitStringMapOperation split{list_entry.offset, structVector}; - if (!splitCStringMap(input, len, split, csvReaderConfig)) { + if (!splitCStringMap(input, len, split, option)) { throw ConversionException("Cast failed. " + std::string{input, len} + " is not in " + vector->dataType.toString() + " range."); } @@ -506,9 +498,9 @@ void CastStringHelper::cast(const char* input, uint64_t len, map_entry_t& /*resu template<> void CastString::operation(const ku_string_t& input, map_entry_t& result, ValueVector* resultVector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + uint64_t rowToAdd, const CSVOption* option) { CastStringHelper::cast(reinterpret_cast(input.getData()), input.len, result, - resultVector, rowToAdd, csvReaderConfig); + resultVector, rowToAdd, option); } // ---------------------- cast String to Struct ------------------------------ // @@ -523,7 +515,7 @@ static bool parseStructFieldName(const char*& input, const char* end) { } static bool parseStructFieldValue( - const char*& input, const char* end, const CSVReaderConfig* csvReaderConfig, bool& closeBrack) { + const char*& input, const char* end, const CSVOption* option, bool& closeBrack) { uint64_t lvl = 0; while (input < end) { if (*input == '"' || *input == '\'') { @@ -531,15 +523,14 @@ static bool parseStructFieldValue( return false; } } else if (*input == '{') { - if (!skipToClose(input, end, lvl, '}', csvReaderConfig)) { + if (!skipToClose(input, end, lvl, '}', option)) { return false; } } else if (*input == CopyConstants::DEFAULT_CSV_LIST_BEGIN_CHAR) { - if (!skipToClose( - input, end, ++lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, csvReaderConfig)) { + if (!skipToClose(input, end, ++lvl, CopyConstants::DEFAULT_CSV_LIST_END_CHAR, option)) { return false; } - } else if (*input == csvReaderConfig->delimiter || *input == '}') { + } else if (*input == option->delimiter || *input == '}') { if (*input == '}') { closeBrack = true; } @@ -551,7 +542,7 @@ static bool parseStructFieldValue( } static bool tryCastStringToStruct(const char* input, uint64_t len, ValueVector* vector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + uint64_t rowToAdd, const CSVOption* option) { // default values to NULL auto fieldVectors = StructVector::getFieldVectors(vector); for (auto& fieldVector : fieldVectors) { @@ -590,7 +581,7 @@ static bool tryCastStringToStruct(const char* input, uint64_t len, ValueVector* skipWhitespace(++input, end); auto valStart = input; - if (!parseStructFieldValue(input, end, csvReaderConfig, closeBracket)) { // find value + if (!parseStructFieldValue(input, end, option, closeBracket)) { // find value return false; } auto valEnd = input; @@ -600,7 +591,7 @@ static bool tryCastStringToStruct(const char* input, uint64_t len, ValueVector* auto fieldVector = StructVector::getFieldVector(vector, fieldIdx).get(); fieldVector->setNull(rowToAdd, false); CastString::copyStringToVector(fieldVector, rowToAdd, - std::string_view{valStart, (uint32_t)(valEnd - valStart)}, csvReaderConfig); + std::string_view{valStart, (uint32_t)(valEnd - valStart)}, option); if (closeBracket) { return (input == end); @@ -611,8 +602,8 @@ static bool tryCastStringToStruct(const char* input, uint64_t len, ValueVector* template<> void CastStringHelper::cast(const char* input, uint64_t len, struct_entry_t& /*result*/, - ValueVector* vector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { - if (!tryCastStringToStruct(input, len, vector, rowToAdd, csvReaderConfig)) { + ValueVector* vector, uint64_t rowToAdd, const CSVOption* option) { + if (!tryCastStringToStruct(input, len, vector, rowToAdd, option)) { throw ConversionException("Cast failed. " + std::string{input, len} + " is not in " + vector->dataType.toString() + " range."); } @@ -620,9 +611,9 @@ void CastStringHelper::cast(const char* input, uint64_t len, struct_entry_t& /*r template<> void CastString::operation(const ku_string_t& input, struct_entry_t& result, - ValueVector* resultVector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig) { + ValueVector* resultVector, uint64_t rowToAdd, const CSVOption* option) { CastStringHelper::cast(reinterpret_cast(input.getData()), input.len, result, - resultVector, rowToAdd, csvReaderConfig); + resultVector, rowToAdd, option); } // ---------------------- cast String to Union ------------------------------ // @@ -725,7 +716,7 @@ static bool tryCastUnionField( template<> void CastStringHelper::cast(const char* input, uint64_t len, union_entry_t& /*result*/, - ValueVector* vector, uint64_t rowToAdd, const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* vector, uint64_t rowToAdd, const CSVOption* /*option*/) { auto& type = vector->dataType; union_field_idx_t selectedFieldIdx = INVALID_STRUCT_FIELD_IDX; @@ -753,13 +744,13 @@ void CastStringHelper::cast(const char* input, uint64_t len, union_entry_t& /*re template<> void CastString::operation(const ku_string_t& input, union_entry_t& result, - ValueVector* resultVector, uint64_t rowToAdd, const CSVReaderConfig* CSVReaderConfig) { + ValueVector* resultVector, uint64_t rowToAdd, const CSVOption* CSVOption) { CastStringHelper::cast(reinterpret_cast(input.getData()), input.len, result, - resultVector, rowToAdd, CSVReaderConfig); + resultVector, rowToAdd, CSVOption); } -void CastString::copyStringToVector(ValueVector* vector, uint64_t rowToAdd, std::string_view strVal, - const CSVReaderConfig* csvReaderConfig) { +void CastString::copyStringToVector( + ValueVector* vector, uint64_t rowToAdd, std::string_view strVal, const CSVOption* option) { auto& type = vector->dataType; if (strVal.empty() || isNull(strVal)) { @@ -831,8 +822,7 @@ void CastString::copyStringToVector(ValueVector* vector, uint64_t rowToAdd, std: } break; case LogicalTypeID::BLOB: { blob_t val; - CastStringHelper::cast( - strVal.data(), strVal.length(), val, vector, rowToAdd, csvReaderConfig); + CastStringHelper::cast(strVal.data(), strVal.length(), val, vector, rowToAdd, option); } break; case LogicalTypeID::STRING: { if (!utf8proc::Utf8Proc::isValid(strVal.data(), strVal.length())) { @@ -857,27 +847,22 @@ void CastString::copyStringToVector(ValueVector* vector, uint64_t rowToAdd, std: } break; case LogicalTypeID::MAP: { map_entry_t val; - CastStringHelper::cast( - strVal.data(), strVal.length(), val, vector, rowToAdd, csvReaderConfig); + CastStringHelper::cast(strVal.data(), strVal.length(), val, vector, rowToAdd, option); } break; case LogicalTypeID::VAR_LIST: { list_entry_t val; - CastStringHelper::cast( - strVal.data(), strVal.length(), val, vector, rowToAdd, csvReaderConfig); + CastStringHelper::cast(strVal.data(), strVal.length(), val, vector, rowToAdd, option); } break; case LogicalTypeID::FIXED_LIST: { - CastStringHelper::castToFixedList( - strVal.data(), strVal.length(), vector, rowToAdd, csvReaderConfig); + CastStringHelper::castToFixedList(strVal.data(), strVal.length(), vector, rowToAdd, option); } break; case LogicalTypeID::STRUCT: { struct_entry_t val; - CastStringHelper::cast( - strVal.data(), strVal.length(), val, vector, rowToAdd, csvReaderConfig); + CastStringHelper::cast(strVal.data(), strVal.length(), val, vector, rowToAdd, option); } break; case LogicalTypeID::UNION: { union_entry_t val; - CastStringHelper::cast( - strVal.data(), strVal.length(), val, vector, rowToAdd, csvReaderConfig); + CastStringHelper::cast(strVal.data(), strVal.length(), val, vector, rowToAdd, option); } break; default: { KU_UNREACHABLE; diff --git a/src/include/binder/binder.h b/src/include/binder/binder.h index b076cd3992f..153fcd79a6b 100644 --- a/src/include/binder/binder.h +++ b/src/include/binder/binder.h @@ -3,7 +3,7 @@ #include "binder/query/bound_regular_query.h" #include "binder/query/query_graph.h" #include "catalog/catalog.h" -#include "common/copier_config/copier_config.h" +#include "common/copier_config/reader_config.h" #include "expression_binder.h" #include "parser/query/graph_pattern/pattern_element.h" #include "parser/query/regular_query.h" @@ -285,7 +285,8 @@ class Binder { std::unique_ptr saveScope(); void restoreScope(std::unique_ptr prevVariableScope); - function::TableFunction* getScanFunction(common::FileType fileType, bool isParallel); + function::TableFunction* getScanFunction( + common::FileType fileType, const common::ReaderConfig& config); private: const catalog::Catalog& catalog; diff --git a/src/include/binder/copy/bound_copy_to.h b/src/include/binder/copy/bound_copy_to.h index fae8273b9c4..86f3514af12 100644 --- a/src/include/binder/copy/bound_copy_to.h +++ b/src/include/binder/copy/bound_copy_to.h @@ -1,7 +1,7 @@ #pragma once #include "binder/query/bound_regular_query.h" -#include "common/copier_config/copier_config.h" +#include "common/copier_config/reader_config.h" namespace kuzu { namespace binder { diff --git a/src/include/common/copier_config/copier_config.h b/src/include/common/copier_config/copier_config.h deleted file mode 100644 index 2001e2df9e2..00000000000 --- a/src/include/common/copier_config/copier_config.h +++ /dev/null @@ -1,80 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "common/constants.h" -#include "rdf_config.h" - -namespace kuzu { -namespace common { - -struct CSVOption { - CSVOption() - : escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR}, - delimiter{CopyConstants::DEFAULT_CSV_DELIMITER}, - quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR}, - hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {} - - virtual ~CSVOption() = default; - - virtual std::unique_ptr copyCSVOption() const { - return std::make_unique(*this); - } - - // TODO(Xiyang): Add newline character option and delimiter can be a string. - char escapeChar; - char delimiter; - char quoteChar; - bool hasHeader; -}; - -struct CSVReaderConfig : public CSVOption { - bool parallel; - - CSVReaderConfig() : CSVOption{}, parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {} - - inline std::unique_ptr copy() { - return std::make_unique(*this); - } -}; - -enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 }; - -struct FileTypeUtils { - static FileType getFileTypeFromExtension(std::string_view extension); - static std::string toString(FileType fileType); -}; - -struct ReaderConfig { - FileType fileType = FileType::UNKNOWN; - std::vector filePaths; - std::unique_ptr csvReaderConfig = nullptr; - // NOTE: Do not try to refactor this with CSVReaderConfig. We might remove this. - std::unique_ptr rdfReaderConfig; - - ReaderConfig(FileType fileType, std::vector filePaths, - std::unique_ptr csvReaderConfig) - : fileType{fileType}, filePaths{std::move(filePaths)}, csvReaderConfig{ - std::move(csvReaderConfig)} {} - - ReaderConfig(const ReaderConfig& other) : fileType{other.fileType}, filePaths{other.filePaths} { - if (other.csvReaderConfig != nullptr) { - this->csvReaderConfig = other.csvReaderConfig->copy(); - } - if (other.rdfReaderConfig != nullptr) { - this->rdfReaderConfig = other.rdfReaderConfig->copy(); - } - } - ReaderConfig(ReaderConfig&& other) = default; - - inline uint32_t getNumFiles() const { return filePaths.size(); } - - inline std::unique_ptr copy() const { - return std::make_unique(*this); - } -}; - -} // namespace common -} // namespace kuzu diff --git a/src/include/common/copier_config/csv_reader_config.h b/src/include/common/copier_config/csv_reader_config.h new file mode 100644 index 00000000000..027b0150df7 --- /dev/null +++ b/src/include/common/copier_config/csv_reader_config.h @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include "common/constants.h" +#include "reader_config.h" + +namespace kuzu { +namespace common { + +struct CSVOption { + // TODO(Xiyang): Add newline character option and delimiter can be a string. + char escapeChar; + char delimiter; + char quoteChar; + bool hasHeader; + + CSVOption() + : escapeChar{CopyConstants::DEFAULT_CSV_ESCAPE_CHAR}, + delimiter{CopyConstants::DEFAULT_CSV_DELIMITER}, + quoteChar{CopyConstants::DEFAULT_CSV_QUOTE_CHAR}, + hasHeader{CopyConstants::DEFAULT_CSV_HAS_HEADER} {} + CSVOption(const CSVOption& other) + : escapeChar{other.escapeChar}, delimiter{other.delimiter}, quoteChar{other.quoteChar}, + hasHeader{other.hasHeader} {} + + virtual ~CSVOption() = default; + + inline std::unique_ptr copy() const { return std::make_unique(*this); } +}; + +struct CSVReaderConfig final : public ExtraReaderConfig { + CSVOption option; + bool parallel; + + CSVReaderConfig() : option{}, parallel{CopyConstants::DEFAULT_CSV_PARALLEL} {} + CSVReaderConfig(const CSVReaderConfig& other) + : option{other.option}, parallel{other.parallel} {} + + inline std::unique_ptr copy() override { + return std::make_unique(*this); + } +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/copier_config/rdf_config.h b/src/include/common/copier_config/rdf_reader_config.h similarity index 80% rename from src/include/common/copier_config/rdf_config.h rename to src/include/common/copier_config/rdf_reader_config.h index 0636a4c1898..a0adb2c06df 100644 --- a/src/include/common/copier_config/rdf_config.h +++ b/src/include/common/copier_config/rdf_reader_config.h @@ -1,7 +1,8 @@ #pragma once #include -#include + +#include "reader_config.h" namespace kuzu { @@ -18,7 +19,7 @@ enum class RdfReaderMode : uint8_t { LITERAL_TRIPLE = 3, }; -struct RdfReaderConfig { +struct RdfReaderConfig final : public ExtraReaderConfig { RdfReaderMode mode; storage::PrimaryKeyIndex* index; @@ -26,7 +27,7 @@ struct RdfReaderConfig { : mode{mode}, index{index} {} RdfReaderConfig(const RdfReaderConfig& other) : mode{other.mode}, index{other.index} {} - inline std::unique_ptr copy() const { + inline std::unique_ptr copy() override { return std::make_unique(*this); } }; diff --git a/src/include/common/copier_config/reader_config.h b/src/include/common/copier_config/reader_config.h new file mode 100644 index 00000000000..cc9e69d13d2 --- /dev/null +++ b/src/include/common/copier_config/reader_config.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +namespace kuzu { +namespace common { + +enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3, TURTLE = 4 }; + +struct FileTypeUtils { + static FileType getFileTypeFromExtension(std::string_view extension); + static std::string toString(FileType fileType); +}; + +struct ExtraReaderConfig { + virtual ~ExtraReaderConfig() = default; + virtual std::unique_ptr copy() = 0; +}; + +struct ReaderConfig { + FileType fileType = FileType::UNKNOWN; + std::vector filePaths; + std::unique_ptr extraConfig; + + ReaderConfig(FileType fileType, std::vector filePaths, + std::unique_ptr extraConfig) + : fileType{fileType}, filePaths{std::move(filePaths)}, extraConfig{std::move(extraConfig)} { + } + ReaderConfig(const ReaderConfig& other) : fileType{other.fileType}, filePaths{other.filePaths} { + if (other.extraConfig != nullptr) { + extraConfig = other.extraConfig->copy(); + } + } + ReaderConfig(ReaderConfig&& other) = default; + + inline uint32_t getNumFiles() const { return filePaths.size(); } + + inline std::unique_ptr copy() const { + return std::make_unique(*this); + } +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/function/cast/functions/cast_from_string_functions.h b/src/include/function/cast/functions/cast_from_string_functions.h index 43136d1e16d..7e6bcd5b65c 100644 --- a/src/include/function/cast/functions/cast_from_string_functions.h +++ b/src/include/function/cast/functions/cast_from_string_functions.h @@ -1,7 +1,7 @@ #pragma once #include "cast_string_non_nested_functions.h" -#include "common/copier_config/copier_config.h" +#include "common/copier_config/csv_reader_config.h" #include "common/type_utils.h" #include "common/types/blob.h" #include "common/vector/value_vector.h" @@ -12,8 +12,8 @@ namespace kuzu { namespace function { struct CastString { - static void copyStringToVector(ValueVector* vector, uint64_t rowToAdd, std::string_view strVal, - const CSVReaderConfig* csvReaderConfig); + static void copyStringToVector( + ValueVector* vector, uint64_t rowToAdd, std::string_view strVal, const CSVOption* option); template static inline bool tryCast(const ku_string_t& input, T& result) { @@ -25,142 +25,128 @@ struct CastString { template static inline void operation(const ku_string_t& input, T& result, ValueVector* /*resultVector*/ = nullptr, uint64_t /*rowToAdd*/ = 0, - const CSVReaderConfig* /*csvReaderConfig*/ = nullptr) { + const CSVOption* /*option*/ = nullptr) { // base case: int64 simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::INT64}); } static void castToFixedList(const ku_string_t& input, ValueVector* resultVector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + uint64_t rowToAdd, const CSVOption* option); }; template<> inline void CastString::operation(const ku_string_t& input, int128_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleInt128Cast(reinterpret_cast(input.getData()), input.len, result); } template<> inline void CastString::operation(const ku_string_t& input, int32_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::INT32}); } template<> inline void CastString::operation(const ku_string_t& input, int16_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::INT16}); } template<> inline void CastString::operation(const ku_string_t& input, int8_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::INT8}); } template<> inline void CastString::operation(const ku_string_t& input, uint64_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::UINT64}); } template<> inline void CastString::operation(const ku_string_t& input, uint32_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::UINT32}); } template<> inline void CastString::operation(const ku_string_t& input, uint16_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::UINT16}); } template<> inline void CastString::operation(const ku_string_t& input, uint8_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { simpleIntegerCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::UINT8}); } template<> inline void CastString::operation(const ku_string_t& input, float_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { doubleCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::FLOAT}); } template<> inline void CastString::operation(const ku_string_t& input, double_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { doubleCast(reinterpret_cast(input.getData()), input.len, result, LogicalType{LogicalTypeID::DOUBLE}); } template<> inline void CastString::operation(const ku_string_t& input, date_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result = Date::fromCString((const char*)input.getData(), input.len); } template<> inline void CastString::operation(const ku_string_t& input, timestamp_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result = Timestamp::fromCString((const char*)input.getData(), input.len); } template<> inline void CastString::operation(const ku_string_t& input, interval_t& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { result = Interval::fromCString((const char*)input.getData(), input.len); } template<> inline void CastString::operation(const ku_string_t& input, bool& result, - ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, - const CSVReaderConfig* /*csvReaderConfig*/) { + ValueVector* /*resultVector*/, uint64_t /*rowToAdd*/, const CSVOption* /*option*/) { castStringToBool(reinterpret_cast(input.getData()), input.len, result); } template<> void CastString::operation(const ku_string_t& input, blob_t& result, ValueVector* resultVector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + uint64_t rowToAdd, const CSVOption* option); template<> void CastString::operation(const ku_string_t& input, list_entry_t& result, - ValueVector* resultVector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + ValueVector* resultVector, uint64_t rowToAdd, const CSVOption* option); template<> void CastString::operation(const ku_string_t& input, map_entry_t& result, ValueVector* resultVector, - uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + uint64_t rowToAdd, const CSVOption* option); template<> void CastString::operation(const ku_string_t& input, struct_entry_t& result, - ValueVector* resultVector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + ValueVector* resultVector, uint64_t rowToAdd, const CSVOption* option); template<> void CastString::operation(const ku_string_t& input, union_entry_t& result, - ValueVector* resultVector, uint64_t rowToAdd, const CSVReaderConfig* csvReaderConfig); + ValueVector* resultVector, uint64_t rowToAdd, const CSVOption* option); } // namespace function } // namespace kuzu diff --git a/src/include/function/function.h b/src/include/function/function.h index 2b02ce3577d..511981397e5 100644 --- a/src/include/function/function.h +++ b/src/include/function/function.h @@ -1,7 +1,7 @@ #pragma once #include "binder/expression/expression.h" -#include "common/copier_config/copier_config.h" +#include "common/copier_config/csv_reader_config.h" namespace kuzu { namespace function { diff --git a/src/include/function/table_functions/bind_data.h b/src/include/function/table_functions/bind_data.h index 8a36339a18c..c85945f9b40 100644 --- a/src/include/function/table_functions/bind_data.h +++ b/src/include/function/table_functions/bind_data.h @@ -1,6 +1,8 @@ #pragma once -#include "common/copier_config/copier_config.h" +#include "common/copier_config/csv_reader_config.h" +#include "common/copier_config/rdf_reader_config.h" +#include "common/copier_config/reader_config.h" #include "common/types/types.h" #include "storage/buffer_manager/memory_manager.h" @@ -8,12 +10,14 @@ namespace kuzu { namespace function { struct TableFuncBindData { - std::vector> columnTypes; + common::logical_types_t columnTypes; std::vector columnNames; - TableFuncBindData(std::vector> columnTypes, - std::vector columnNames) + TableFuncBindData(common::logical_types_t columnTypes, std::vector columnNames) : columnTypes{std::move(columnTypes)}, columnNames{std::move(columnNames)} {} + TableFuncBindData(const TableFuncBindData& other) + : columnTypes{common::LogicalType::copy(other.columnTypes)}, columnNames{ + other.columnNames} {} virtual ~TableFuncBindData() = default; @@ -24,15 +28,15 @@ struct ScanBindData : public TableFuncBindData { storage::MemoryManager* mm; common::ReaderConfig config; - ScanBindData(std::vector> columnTypes, - std::vector columnNames, storage::MemoryManager* mm, - const common::ReaderConfig& config) + ScanBindData(common::logical_types_t columnTypes, std::vector columnNames, + storage::MemoryManager* mm, const common::ReaderConfig& config) : TableFuncBindData{std::move(columnTypes), std::move(columnNames)}, mm{mm}, config{ config} {} + ScanBindData(const ScanBindData& other) + : TableFuncBindData{other}, mm{other.mm}, config{other.config} {} - std::unique_ptr copy() override { - return std::make_unique( - common::LogicalType::copy(columnTypes), columnNames, mm, config); + inline std::unique_ptr copy() override { + return std::make_unique(*this); } }; diff --git a/src/include/function/table_functions/bind_input.h b/src/include/function/table_functions/bind_input.h index 1007fbfd265..7b967d9084f 100644 --- a/src/include/function/table_functions/bind_input.h +++ b/src/include/function/table_functions/bind_input.h @@ -2,7 +2,7 @@ #include -#include "common/copier_config/copier_config.h" +#include "common/copier_config/reader_config.h" #include "common/types/value/value.h" #include "storage/buffer_manager/memory_manager.h" diff --git a/src/include/function/unary_function_executor.h b/src/include/function/unary_function_executor.h index b4403ea5ec9..7dbd1cbce0d 100644 --- a/src/include/function/unary_function_executor.h +++ b/src/include/function/unary_function_executor.h @@ -41,7 +41,7 @@ struct UnaryCastStringFunctionWrapper { auto resultVector_ = (common::ValueVector*)resultVector; FUNC::operation(inputVector_.getValue(inputPos), resultVector_->getValue(resultPos), resultVector_, inputPos, - &reinterpret_cast(dataPtr)->csvConfig); + &reinterpret_cast(dataPtr)->csvConfig.option); } }; diff --git a/src/include/planner/operator/persistent/logical_copy_to.h b/src/include/planner/operator/persistent/logical_copy_to.h index 24a75475dab..463dfdf602a 100644 --- a/src/include/planner/operator/persistent/logical_copy_to.h +++ b/src/include/planner/operator/persistent/logical_copy_to.h @@ -1,6 +1,6 @@ #pragma once -#include "common/copier_config/copier_config.h" +#include "common/copier_config/csv_reader_config.h" #include "planner/operator/logical_operator.h" namespace kuzu { @@ -33,8 +33,7 @@ class LogicalCopyTo : public LogicalOperator { inline std::unique_ptr copy() override { return make_unique(filePath, fileType, columnNames, - common::LogicalType::copy(columnTypes), copyToOption->copyCSVOption(), - children[0]->copy()); + common::LogicalType::copy(columnTypes), copyToOption->copy(), children[0]->copy()); } private: diff --git a/src/include/processor/operator/persistent/copy_to_csv.h b/src/include/processor/operator/persistent/copy_to_csv.h index d26458c80ea..5ff023e6777 100644 --- a/src/include/processor/operator/persistent/copy_to_csv.h +++ b/src/include/processor/operator/persistent/copy_to_csv.h @@ -1,5 +1,6 @@ #pragma once +#include "common/copier_config/csv_reader_config.h" #include "common/file_utils.h" #include "common/serializer/buffered_serializer.h" #include "copy_to.h" @@ -22,7 +23,7 @@ struct CopyToCSVInfo final : public CopyToInfo { inline std::unique_ptr copy() override { return std::make_unique( - names, dataPoses, fileName, isFlat, copyToOption->copyCSVOption()); + names, dataPoses, fileName, isFlat, copyToOption->copy()); } }; diff --git a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h index 97ec7a05654..dc723931e10 100644 --- a/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/base_csv_reader.h @@ -4,7 +4,7 @@ #include #include -#include "common/copier_config/copier_config.h" +#include "common/copier_config/csv_reader_config.h" #include "common/data_chunk/data_chunk.h" #include "common/types/types.h" @@ -16,7 +16,7 @@ class BaseCSVReader { public: BaseCSVReader( - const std::string& filePath, const common::ReaderConfig& readerConfig, uint64_t numColumns); + const std::string& filePath, const common::CSVOption& option, uint64_t numColumns); virtual ~BaseCSVReader(); @@ -63,7 +63,7 @@ class BaseCSVReader { protected: std::string filePath; - common::CSVReaderConfig csvReaderConfig; + common::CSVOption option; uint64_t numColumns; int fd; diff --git a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h index 8abb35f11d9..52d4be19ac3 100644 --- a/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/parallel_csv_reader.h @@ -17,7 +17,7 @@ class ParallelCSVReader final : public BaseCSVReader { public: ParallelCSVReader( - const std::string& filePath, const common::ReaderConfig& readerConfig, uint64_t numColumns); + const std::string& filePath, const common::CSVOption& option, uint64_t numColumns); bool hasMoreToRead() const; uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) override; diff --git a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h index e3363cb6cbd..70daa9fc437 100644 --- a/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h +++ b/src/include/processor/operator/persistent/reader/csv/serial_csv_reader.h @@ -13,7 +13,7 @@ namespace processor { class SerialCSVReader final : public BaseCSVReader { public: SerialCSVReader( - const std::string& filePath, const common::ReaderConfig& readerConfig, uint64_t numColumns); + const std::string& filePath, const common::CSVOption& option, uint64_t numColumns); //! Sniffs CSV dialect and determines skip rows, header row, column types and column names std::vector> sniffCSV(); diff --git a/src/include/processor/operator/persistent/reader/rdf/rdf_reader.h b/src/include/processor/operator/persistent/reader/rdf/rdf_reader.h index 3b22fab7c81..77cc5084d27 100644 --- a/src/include/processor/operator/persistent/reader/rdf/rdf_reader.h +++ b/src/include/processor/operator/persistent/reader/rdf/rdf_reader.h @@ -1,6 +1,6 @@ #pragma once -#include "common/copier_config/rdf_config.h" +#include "common/copier_config/rdf_reader_config.h" #include "common/data_chunk/data_chunk.h" #include "function/scalar_function.h" #include "function/table_functions.h" @@ -13,7 +13,7 @@ namespace processor { class RDFReader { public: - RDFReader(std::string filePath, std::unique_ptr config); + RDFReader(std::string filePath, const common::RdfReaderConfig& config); ~RDFReader(); @@ -33,7 +33,8 @@ class RDFReader { private: const std::string filePath; - std::unique_ptr config; + common::RdfReaderMode mode; + storage::PrimaryKeyIndex* index; FILE* fp; SerdReader* reader; diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index a54534fdf08..31402b6addf 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -130,7 +130,8 @@ std::unique_ptr PlanMapper::mapCopyNodeFrom(LogicalOperator* l reinterpret_cast(copyFromInfo->fileScanInfo->bindData.get()) ->config; if (readerConfig.fileType == FileType::TURTLE && - readerConfig.rdfReaderConfig->mode == RdfReaderMode::RESOURCE) { + reinterpret_cast(readerConfig.extraConfig.get())->mode == + RdfReaderMode::RESOURCE) { copyNode = std::make_unique(sharedState, std::move(info), std::make_unique(copyFrom->getSchema()), std::move(prevOperator), getOperatorID(), copyFrom->getExpressionsForPrinting()); diff --git a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp index 01b86db3ed9..6786c8e80f7 100644 --- a/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/base_csv_reader.cpp @@ -20,9 +20,9 @@ namespace kuzu { namespace processor { BaseCSVReader::BaseCSVReader( - const std::string& filePath, const common::ReaderConfig& readerConfig, uint64_t numColumns) - : filePath{filePath}, csvReaderConfig{*readerConfig.csvReaderConfig}, numColumns(numColumns), - fd(-1), buffer(nullptr), bufferSize(0), position(0), rowEmpty(false) { + const std::string& filePath, const common::CSVOption& option, uint64_t numColumns) + : filePath{filePath}, option{option}, numColumns(numColumns), fd(-1), buffer(nullptr), + bufferSize(0), position(0), rowEmpty(false) { // TODO(Ziyi): should we wrap this fd using kuzu file handler? fd = open(filePath.c_str(), O_RDONLY #ifdef _WIN32 @@ -79,7 +79,7 @@ uint64_t BaseCSVReader::countRows() { } else if (buffer[position] == '\n') { position++; goto line_start; - } else if (buffer[position] == csvReaderConfig.quoteChar) { + } else if (buffer[position] == option.quoteChar) { position++; goto in_quotes; } else { @@ -105,10 +105,10 @@ uint64_t BaseCSVReader::countRows() { } do { - if (buffer[position] == csvReaderConfig.quoteChar) { + if (buffer[position] == option.quoteChar) { position++; goto normal; - } else if (buffer[position] == csvReaderConfig.escapeChar) { + } else if (buffer[position] == option.escapeChar) { position++; goto escape; } else { @@ -166,7 +166,7 @@ void BaseCSVReader::addValue(Driver& driver, uint64_t rowNum, column_id_t column void BaseCSVReader::handleFirstBlock() { readBOM(); - if (csvReaderConfig.hasHeader) { + if (option.hasHeader) { readHeader(); } } @@ -251,7 +251,7 @@ uint64_t BaseCSVReader::parseCSV(Driver& driver) { value_start: /* state: value_start */ // this state parses the first character of a value - if (buffer[position] == csvReaderConfig.quoteChar) { + if (buffer[position] == option.quoteChar) { [[unlikely]] // quote: actual value starts in the next position // move to in_quotes state @@ -270,7 +270,7 @@ uint64_t BaseCSVReader::parseCSV(Driver& driver) { // newline do { for (; position < bufferSize; position++) { - if (buffer[position] == csvReaderConfig.delimiter) { + if (buffer[position] == option.delimiter) { // delimiter: end the value and add it to the chunk goto add_value; } else if (isNewLine(buffer[position])) { @@ -285,7 +285,7 @@ uint64_t BaseCSVReader::parseCSV(Driver& driver) { goto final_state; add_value: // We get here after we have a delimiter. - KU_ASSERT(buffer[position] == csvReaderConfig.delimiter); + KU_ASSERT(buffer[position] == option.delimiter); // Trim one character if we have quotes. addValue(driver, rowNum, column, std::string_view(buffer.get() + start, position - start - hasQuotes), escapePositions); @@ -334,10 +334,10 @@ add_row : { position++; do { for (; position < bufferSize; position++) { - if (buffer[position] == csvReaderConfig.quoteChar) { + if (buffer[position] == option.quoteChar) { // quote: move to unquoted state goto unquote; - } else if (buffer[position] == csvReaderConfig.escapeChar) { + } else if (buffer[position] == option.escapeChar) { // escape: store the escaped position and move to handle_escape state escapePositions.push_back(position - start); goto handle_escape; @@ -351,7 +351,7 @@ add_row : { throw CopyException(stringFormat( "Error in file {} on line {}: unterminated quotes.", filePath, getLineNumber())); unquote: - KU_ASSERT(hasQuotes && buffer[position] == csvReaderConfig.quoteChar); + KU_ASSERT(hasQuotes && buffer[position] == option.quoteChar); // this state handles the state directly after we unquote // in this state we expect either another quote (entering the quoted state again, and // escaping the quote) or a delimiter/newline, ending the current value and moving on to the @@ -361,12 +361,12 @@ add_row : { // file ends right after unquote, go to final state goto final_state; } - if (buffer[position] == csvReaderConfig.quoteChar && - (!csvReaderConfig.escapeChar || csvReaderConfig.escapeChar == csvReaderConfig.quoteChar)) { + if (buffer[position] == option.quoteChar && + (!option.escapeChar || option.escapeChar == option.quoteChar)) { // escaped quote, return to quoted state and store escape position escapePositions.push_back(position - start); goto in_quotes; - } else if (buffer[position] == csvReaderConfig.delimiter || + } else if (buffer[position] == option.delimiter || buffer[position] == CopyConstants::DEFAULT_CSV_LIST_END_CHAR) { // delimiter, add value goto add_value; @@ -387,8 +387,7 @@ add_row : { [[unlikely]] throw CopyException(stringFormat( "Error in file {} on line {}: escape at end of file.", filePath, getLineNumber())); } - if (buffer[position] != csvReaderConfig.quoteChar && - buffer[position] != csvReaderConfig.escapeChar) { + if (buffer[position] != option.quoteChar && buffer[position] != option.escapeChar) { [[unlikely]] throw CopyException(stringFormat( "Error in file {} on line {}: neither QUOTE nor ESCAPE is proceeded by ESCAPE.", filePath, getLineNumber())); diff --git a/src/processor/operator/persistent/reader/csv/driver.cpp b/src/processor/operator/persistent/reader/csv/driver.cpp index 7594d9aa31b..84131074ff7 100644 --- a/src/processor/operator/persistent/reader/csv/driver.cpp +++ b/src/processor/operator/persistent/reader/csv/driver.cpp @@ -37,7 +37,7 @@ void ParsingDriver::addValue( } function::CastString::copyStringToVector( - chunk.getValueVector(columnIdx).get(), rowNum, value, &reader->csvReaderConfig); + chunk.getValueVector(columnIdx).get(), rowNum, value, &reader->option); } bool ParsingDriver::addRow(uint64_t /*rowNum*/, common::column_id_t columnCount) { diff --git a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp index d26e113d9da..22d990bca8d 100644 --- a/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/parallel_csv_reader.cpp @@ -21,8 +21,8 @@ namespace kuzu { namespace processor { ParallelCSVReader::ParallelCSVReader( - const std::string& filePath, const common::ReaderConfig& readerConfig, uint64_t numColumns) - : BaseCSVReader{filePath, readerConfig, numColumns} {} + const std::string& filePath, const common::CSVOption& option, uint64_t numColumns) + : BaseCSVReader{filePath, option, numColumns} {} bool ParallelCSVReader::hasMoreToRead() const { // If we haven't started the first block yet or are done our block, get the next block. @@ -35,7 +35,7 @@ uint64_t ParallelCSVReader::parseBlock( seekToBlockStart(); if (blockIdx == 0) { readBOM(); - if (csvReaderConfig.hasHeader) { + if (option.hasHeader) { readHeader(); } } @@ -141,9 +141,11 @@ void ParallelCSVScan::tableFunc(TableFunctionInput& input, common::DataChunk& ou } if (fileIdx != parallelCSVLocalState->fileIdx) { parallelCSVLocalState->fileIdx = fileIdx; + auto csvConfig = reinterpret_cast( + parallelCSVSharedState->readerConfig.extraConfig.get()); parallelCSVLocalState->reader = std::make_unique( - parallelCSVSharedState->readerConfig.filePaths[fileIdx], - parallelCSVSharedState->readerConfig, parallelCSVSharedState->numColumns); + parallelCSVSharedState->readerConfig.filePaths[fileIdx], csvConfig->option, + parallelCSVSharedState->numColumns); } auto numRowsRead = parallelCSVLocalState->reader->parseBlock(blockIdx, outputChunk); outputChunk.state->selVector->selectedSize = numRowsRead; @@ -175,10 +177,11 @@ std::unique_ptr ParallelCSVScan::bindFunc( std::unique_ptr ParallelCSVScan::initSharedState( function::TableFunctionInitInput& input) { auto bindData = reinterpret_cast(input.bindData); + auto csvConfig = reinterpret_cast(bindData->config.extraConfig.get()); common::row_idx_t numRows = 0; for (const auto& path : bindData->config.filePaths) { auto reader = - make_unique(path, bindData->config, bindData->columnNames.size()); + make_unique(path, csvConfig->option, bindData->columnNames.size()); numRows += reader->countRows(); } return std::make_unique( @@ -188,10 +191,11 @@ std::unique_ptr ParallelCSVScan::initSharedState std::unique_ptr ParallelCSVScan::initLocalState( function::TableFunctionInitInput& /*input*/, function::TableFuncSharedState* state) { auto localState = std::make_unique(); - auto scanSharedState = reinterpret_cast(state); - localState->reader = - std::make_unique(scanSharedState->readerConfig.filePaths[0], - scanSharedState->readerConfig, scanSharedState->numColumns); + auto sharedState = reinterpret_cast(state); + auto csvConfig = + reinterpret_cast(sharedState->readerConfig.extraConfig.get()); + localState->reader = std::make_unique( + sharedState->readerConfig.filePaths[0], csvConfig->option, sharedState->numColumns); localState->fileIdx = 0; return localState; } diff --git a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp index 1754bf18747..35766f492bc 100644 --- a/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp +++ b/src/processor/operator/persistent/reader/csv/serial_csv_reader.cpp @@ -11,13 +11,13 @@ namespace kuzu { namespace processor { SerialCSVReader::SerialCSVReader( - const std::string& filePath, const common::ReaderConfig& readerConfig, uint64_t numColumns) - : BaseCSVReader{filePath, readerConfig, numColumns} {} + const std::string& filePath, const common::CSVOption& option, uint64_t numColumns) + : BaseCSVReader{filePath, option, numColumns} {} std::vector> SerialCSVReader::sniffCSV() { readBOM(); - if (csvReaderConfig.hasHeader) { + if (option.hasHeader) { SniffCSVNameAndTypeDriver driver; parseCSV(driver); return driver.columns; @@ -61,8 +61,9 @@ void SerialCSVScanSharedState::read(common::DataChunk& outputChunk) { void SerialCSVScanSharedState::initReader() { if (fileIdx < readerConfig.getNumFiles()) { + auto csvConfig = reinterpret_cast(readerConfig.extraConfig.get()); reader = std::make_unique( - readerConfig.filePaths[fileIdx], readerConfig, numColumns); + readerConfig.filePaths[fileIdx], csvConfig->option, numColumns); } } @@ -97,10 +98,11 @@ std::unique_ptr SerialCSVScan::bindFunc( std::unique_ptr SerialCSVScan::initSharedState( function::TableFunctionInitInput& input) { auto bindData = reinterpret_cast(input.bindData); + auto csvConfig = reinterpret_cast(bindData->config.extraConfig.get()); common::row_idx_t numRows = 0; for (const auto& path : bindData->config.filePaths) { auto reader = - make_unique(path, bindData->config, bindData->columnNames.size()); + make_unique(path, csvConfig->option, bindData->columnNames.size()); numRows += reader->countRows(); } return std::make_unique( @@ -127,8 +129,9 @@ void SerialCSVScan::bindColumns(const ReaderConfig& readerConfig, void SerialCSVScan::bindColumns(const common::ReaderConfig& readerConfig, uint32_t fileIdx, std::vector& columnNames, std::vector>& columnTypes) { + auto csvConfig = reinterpret_cast(readerConfig.extraConfig.get()); auto csvReader = - SerialCSVReader(readerConfig.filePaths[fileIdx], readerConfig, 0 /* numColumns */); + SerialCSVReader(readerConfig.filePaths[fileIdx], csvConfig->option, 0 /* numColumns */); auto sniffedColumns = csvReader.sniffCSV(); for (auto& [name, type] : sniffedColumns) { columnNames.push_back(name); diff --git a/src/processor/operator/persistent/reader/rdf/rdf_reader.cpp b/src/processor/operator/persistent/reader/rdf/rdf_reader.cpp index 177014ca996..6984f56f8a9 100644 --- a/src/processor/operator/persistent/reader/rdf/rdf_reader.cpp +++ b/src/processor/operator/persistent/reader/rdf/rdf_reader.cpp @@ -20,8 +20,8 @@ using namespace kuzu::function; namespace kuzu { namespace processor { -RDFReader::RDFReader(std::string filePath, std::unique_ptr config) - : filePath{std::move(filePath)}, config{std::move(config)}, rowOffset{0}, +RDFReader::RDFReader(std::string filePath, const common::RdfReaderConfig& config) + : filePath{std::move(filePath)}, mode{config.mode}, index{config.index}, rowOffset{0}, vectorSize{0}, sVector{nullptr}, pVector{nullptr}, oVector{nullptr}, status{SERD_SUCCESS} { std::string fileName = this->filePath.substr(this->filePath.find_last_of("/\\") + 1); fp = fopen(this->filePath.c_str(), "rb"); @@ -156,7 +156,7 @@ SerdStatus RDFReader::readerStatementSink(void* handle, SerdStatementFlags /*fla } auto reader = reinterpret_cast(handle); - switch (reader->config->mode) { + switch (reader->mode) { case RdfReaderMode::RESOURCE: { addResourceToVector(reader->sVector, reader->vectorSize++, subject); addResourceToVector(reader->sVector, reader->vectorSize++, predicate); @@ -175,9 +175,9 @@ SerdStatus RDFReader::readerStatementSink(void* handle, SerdStatementFlags /*fla if (object->type == SERD_LITERAL) { return SERD_SUCCESS; } - auto subjectOffset = lookupResourceNode(reader->config->index, subject); - auto predicateOffset = lookupResourceNode(reader->config->index, predicate); - auto objectOffset = lookupResourceNode(reader->config->index, object); + auto subjectOffset = lookupResourceNode(reader->index, subject); + auto predicateOffset = lookupResourceNode(reader->index, predicate); + auto objectOffset = lookupResourceNode(reader->index, object); reader->sVector->setValue(reader->rowOffset, subjectOffset); reader->pVector->setValue(reader->rowOffset, predicateOffset); reader->oVector->setValue(reader->rowOffset, objectOffset); @@ -188,8 +188,8 @@ SerdStatus RDFReader::readerStatementSink(void* handle, SerdStatementFlags /*fla if (object->type != SERD_LITERAL) { return SERD_SUCCESS; } - auto subjectOffset = lookupResourceNode(reader->config->index, subject); - auto predicateOffset = lookupResourceNode(reader->config->index, predicate); + auto subjectOffset = lookupResourceNode(reader->index, subject); + auto predicateOffset = lookupResourceNode(reader->index, predicate); auto objectOffset = reader->rowOffset; reader->sVector->setValue(reader->rowOffset, subjectOffset); reader->pVector->setValue(reader->rowOffset, predicateOffset); @@ -246,7 +246,7 @@ offset_t RDFReader::read(DataChunk* dataChunk) { return 0; } - switch (config->mode) { + switch (mode) { case common::RdfReaderMode::RESOURCE_TRIPLE: case common::RdfReaderMode::LITERAL_TRIPLE: { sVector = dataChunk->getValueVector(0).get(); @@ -301,17 +301,17 @@ std::unique_ptr RdfScan::bindFunc(main::ClientConte std::unique_ptr RdfScan::initSharedState( function::TableFunctionInitInput& input) { auto bindData = reinterpret_cast(input.bindData); - auto reader = make_unique( - bindData->config.filePaths[0], bindData->config.rdfReaderConfig->copy()); + auto rdfConfig = reinterpret_cast(bindData->config.extraConfig.get()); + auto reader = make_unique(bindData->config.filePaths[0], *rdfConfig); return std::make_unique(bindData->config, reader->countLine()); } std::unique_ptr RdfScan::initLocalState( function::TableFunctionInitInput& input, function::TableFuncSharedState* /*state*/) { auto bindData = reinterpret_cast(input.bindData); + auto rdfConfig = reinterpret_cast(bindData->config.extraConfig.get()); auto localState = std::make_unique(); - localState->reader = std::make_unique( - bindData->config.filePaths[0], bindData->config.rdfReaderConfig->copy()); + localState->reader = std::make_unique(bindData->config.filePaths[0], *rdfConfig); return localState; }