Skip to content

Commit

Permalink
clp-s: Update core functionality to prepare for generic parser suppor…
Browse files Browse the repository at this point in the history
…t. (#355)

Co-authored-by: wraymo <37269683+wraymo@users.noreply.github.com>
  • Loading branch information
gibber9809 and wraymo committed May 13, 2024
1 parent 0611040 commit 3e95aaf
Show file tree
Hide file tree
Showing 49 changed files with 1,340 additions and 784 deletions.
2 changes: 1 addition & 1 deletion components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -460,5 +460,5 @@ target_link_libraries(unitTest
ZStd::ZStd
)
target_compile_features(unitTest
PRIVATE cxx_std_17
PRIVATE cxx_std_20
)
2 changes: 1 addition & 1 deletion components/core/src/clp/clg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ set(
)

add_executable(clg ${CLG_SOURCES})
target_compile_features(clg PRIVATE cxx_std_17)
target_compile_features(clg PRIVATE cxx_std_20)
target_include_directories(clg PRIVATE "${PROJECT_SOURCE_DIR}/submodules")
target_link_libraries(clg
PRIVATE
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp/clo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ set(
)

add_executable(clo ${CLO_SOURCES} ${REDUCER_SOURCES})
target_compile_features(clo PRIVATE cxx_std_17)
target_compile_features(clo PRIVATE cxx_std_20)
target_include_directories(clo PRIVATE "${PROJECT_SOURCE_DIR}/submodules")
target_link_libraries(clo
PRIVATE
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ set(
)

add_executable(clp ${CLP_SOURCES})
target_compile_features(clp PRIVATE cxx_std_17)
target_compile_features(clp PRIVATE cxx_std_20)
target_include_directories(clp PRIVATE "${PROJECT_SOURCE_DIR}/submodules")
target_link_libraries(clp
PRIVATE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ set(
)

add_executable(make-dictionaries-readable ${MAKE_DICTIONARIES_READABLE_SOURCES})
target_compile_features(make-dictionaries-readable PRIVATE cxx_std_17)
target_compile_features(make-dictionaries-readable PRIVATE cxx_std_20)
target_include_directories(make-dictionaries-readable PRIVATE "${PROJECT_SOURCE_DIR}/submodules")
target_link_libraries(make-dictionaries-readable
PRIVATE
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp/string_utils/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ add_library(
)
add_library(clp::string_utils ALIAS string_utils)
target_include_directories(string_utils PUBLIC ../)
target_compile_features(string_utils PRIVATE cxx_std_17)
target_compile_features(string_utils PRIVATE cxx_std_20)
158 changes: 114 additions & 44 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ void ArchiveReader::read_metadata() {
int32_t schema_id;
uint64_t num_messages;
size_t table_offset;
size_t uncompressed_size;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id);
ErrorCodeSuccess != error)
Expand All @@ -60,7 +61,13 @@ void ArchiveReader::read_metadata() {
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset};
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size};
m_schema_ids.push_back(schema_id);
}
m_table_metadata_decompressor.close();
Expand All @@ -74,7 +81,7 @@ void ArchiveReader::read_dictionaries_and_metadata() {
read_metadata();
}

std::unique_ptr<SchemaReader> ArchiveReader::read_table(
SchemaReader& ArchiveReader::read_table(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
Expand All @@ -85,93 +92,156 @@ std::unique_ptr<SchemaReader> ArchiveReader::read_table(
throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__);
}

auto schema_reader
auto& schema_reader
= create_schema_reader(schema_id, should_extract_timestamp, should_marshal_records);

m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset);
m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity);
schema_reader->load(m_tables_decompressor);
m_tables_decompressor.close();
schema_reader.load(m_tables_decompressor, m_id_to_table_metadata[schema_id].uncompressed_size);
m_tables_decompressor.close_for_reuse();
return schema_reader;
}

BaseColumnReader*
ArchiveReader::append_reader_column(std::unique_ptr<SchemaReader>& reader, int32_t column_id) {
BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int32_t column_id) {
BaseColumnReader* column_reader = nullptr;
auto node = m_schema_tree->get_node(column_id);
std::string key_name = node->get_key_name();
switch (node->get_type()) {
case NodeType::INTEGER:
column_reader = new Int64ColumnReader(key_name, column_id);
break;
case NodeType::FLOAT:
column_reader = new FloatColumnReader(key_name, column_id);
auto const& node = m_schema_tree->get_node(column_id);
switch (node.get_type()) {
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
break;
case NodeType::CLPSTRING:
column_reader = new ClpStringColumnReader(key_name, column_id, m_var_dict, m_log_dict);
case NodeType::Float:
column_reader = new FloatColumnReader(column_id);
break;
case NodeType::VARSTRING:
column_reader = new VariableStringColumnReader(key_name, column_id, m_var_dict);
case NodeType::ClpString:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict);
break;
case NodeType::BOOLEAN:
column_reader = new BooleanColumnReader(key_name, column_id);
case NodeType::VarString:
column_reader = new VariableStringColumnReader(column_id, m_var_dict);
break;
case NodeType::ARRAY:
column_reader = new ClpStringColumnReader(
key_name,
column_id,
m_var_dict,
m_array_dict,
true
);
case NodeType::Boolean:
column_reader = new BooleanColumnReader(column_id);
break;
case NodeType::DATESTRING:
column_reader = new DateStringColumnReader(key_name, column_id, m_timestamp_dict);
case NodeType::UnstructuredArray:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_array_dict, true);
break;
case NodeType::OBJECT:
case NodeType::NULLVALUE:
reader->append_column(column_id);
case NodeType::DateString:
column_reader = new DateStringColumnReader(column_id, m_timestamp_dict);
break;
case NodeType::UNKNOWN:
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Object:
case NodeType::NullValue:
case NodeType::Unknown:
break;
}

if (column_reader) {
reader->append_column(column_reader);
reader.append_column(column_reader);
}
return column_reader;
}

std::unique_ptr<SchemaReader> ArchiveReader::create_schema_reader(
void ArchiveReader::append_unordered_reader_columns(
SchemaReader& reader,
NodeType unordered_object_type,
std::span<int32_t> schema_ids,
bool should_marshal_records
) {
int32_t mst_subtree_root_node_id = INT32_MAX;
size_t object_begin_pos = reader.get_column_size();
for (int32_t column_id : schema_ids) {
if (Schema::schema_entry_is_unordered_object(column_id)) {
continue;
}
BaseColumnReader* column_reader = nullptr;
auto const& node = m_schema_tree->get_node(column_id);
if (INT32_MAX == mst_subtree_root_node_id) {
mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree(
-1,
column_id,
unordered_object_type
);
}
switch (node.get_type()) {
case NodeType::Integer:
column_reader = new Int64ColumnReader(column_id);
break;
case NodeType::Float:
column_reader = new FloatColumnReader(column_id);
break;
case NodeType::ClpString:
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_log_dict);
break;
case NodeType::VarString:
column_reader = new VariableStringColumnReader(column_id, m_var_dict);
break;
case NodeType::Boolean:
column_reader = new BooleanColumnReader(column_id);
break;
// UnstructuredArray and DateString currently aren't supported as part of any unordered
// object, so we disregard them here
case NodeType::UnstructuredArray:
case NodeType::DateString:
// No need to push columns without associated object readers into the SchemaReader.
case NodeType::Object:
case NodeType::NullValue:
case NodeType::Unknown:
break;
}

if (column_reader) {
reader.append_unordered_column(column_reader);
}
}

if (should_marshal_records) {
reader.mark_unordered_object(object_begin_pos, mst_subtree_root_node_id, schema_ids);
}
}

SchemaReader& ArchiveReader::create_schema_reader(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
) {
auto reader = std::make_unique<SchemaReader>(
auto& schema = (*m_schema_map)[schema_id];
m_schema_reader.reset(
m_schema_tree,
schema_id,
schema.get_ordered_schema_view(),
m_id_to_table_metadata[schema_id].num_messages,
should_marshal_records
);
auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids();

for (int32_t column_id : (*m_schema_map)[reader->get_schema_id()]) {
BaseColumnReader* column_reader = append_reader_column(reader, column_id);
for (size_t i = 0; i < schema.size(); ++i) {
int32_t column_id = schema[i];
if (Schema::schema_entry_is_unordered_object(column_id)) {
size_t length = Schema::get_unordered_object_length(column_id);
append_unordered_reader_columns(
m_schema_reader,
Schema::get_unordered_object_type(column_id),
schema.get_view(i + 1, length),
should_marshal_records
);
i += length;
continue;
}
BaseColumnReader* column_reader = append_reader_column(m_schema_reader, column_id);

if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
{
reader->mark_column_as_timestamp(column_reader);
m_schema_reader.mark_column_as_timestamp(column_reader);
}
}
return reader;
return m_schema_reader;
}

void ArchiveReader::store(FileWriter& writer) {
std::string message;

for (auto& [id, table_metadata] : m_id_to_table_metadata) {
auto schema_reader = read_table(id, false, true);
while (schema_reader->get_next_message(message)) {
auto& schema_reader = read_table(id, false, true);
while (schema_reader.get_next_message(message)) {
writer.write(message.c_str(), message.length());
}
}
Expand Down
27 changes: 23 additions & 4 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <map>
#include <set>
#include <span>
#include <utility>

#include <boost/filesystem.hpp>
Expand All @@ -11,6 +12,7 @@
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

namespace clp_s {
class ArchiveReader {
Expand Down Expand Up @@ -87,7 +89,7 @@ class ArchiveReader {
* @param should_marshal_records
* @return the schema reader
*/
std::unique_ptr<SchemaReader>
SchemaReader&
read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records);

std::shared_ptr<VariableDictionaryReader> get_variable_dictionary() { return m_var_dict; }
Expand Down Expand Up @@ -127,8 +129,9 @@ class ArchiveReader {
* @param schema_id
* @param should_extract_timestamp
* @param should_marshal_records
* @return a reference to the newly created schema reader initialized with the given parameters
*/
std::unique_ptr<SchemaReader> create_schema_reader(
SchemaReader& create_schema_reader(
int32_t schema_id,
bool should_extract_timestamp,
bool should_marshal_records
Expand All @@ -138,9 +141,24 @@ class ArchiveReader {
* Appends a column to the schema reader.
* @param reader
* @param column_id
* @return a pointer to the newly appended column reader or nullptr if no column reader was
* created
*/
BaseColumnReader*
append_reader_column(std::unique_ptr<SchemaReader>& reader, int32_t column_id);
BaseColumnReader* append_reader_column(SchemaReader& reader, int32_t column_id);

/**
* Appends columns for the entire schema of an unordered object.
* @param reader
* @param unordered_object_type
* @param schema_ids
* @param should_marshal_records
*/
void append_unordered_reader_columns(
SchemaReader& reader,
NodeType unordered_object_type,
std::span<int32_t> schema_ids,
bool should_marshal_records
);

bool m_is_open;
std::string m_archive_path;
Expand All @@ -159,6 +177,7 @@ class ArchiveReader {
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_tables_decompressor;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
};
} // namespace clp_s

Expand Down
Loading

0 comments on commit 3e95aaf

Please sign in to comment.