Skip to content

Commit

Permalink
Fridayfrolics
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Feb 16, 2024
1 parent 6494270 commit 468ae02
Show file tree
Hide file tree
Showing 61 changed files with 919 additions and 723 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ set(arcticdb_srcs
version/version_utils.cpp
version/symbol_list.cpp
version/version_map_batch_methods.cpp
codec/segment_header.hpp codec/protobuf_encoding.hpp codec/encoded_field.cpp codec/encoding_version.hpp memory_layout.hpp)
codec/segment_header.hpp codec/protobuf_mappings.hpp codec/encoded_field.cpp codec/encoding_version.hpp memory_layout.hpp codec/protobuf_mappings.cpp entity/protobuf_mappings.cpp)

if(${ARCTICDB_INCLUDE_ROCKSDB})
list (APPEND arcticdb_srcs
Expand Down
64 changes: 44 additions & 20 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@ constexpr TypeDescriptor metadata_type_desc() {
};
}

constexpr TypeDescriptor encoded_blocks_type_desc() {
return TypeDescriptor{
DataType::UINT8, Dimension::Dim1
};
}

Segment encode_dispatch(
SegmentInMemory&& in_mem_seg,
Expand Down Expand Up @@ -116,6 +111,17 @@ std::optional<google::protobuf::Any> decode_metadata(
}
}

std::shared_ptr<arcticdb::proto::descriptors::FrameMetadata> extract_frame_metadata(
SegmentHeader& hdr,
SegmentInMemory& res
) {
std::shared_ptr<arcticdb::proto::descriptors::FrameMetadata> output;
util::check(res.has_metadata(), "Cannot extract frame metadata as it is null");
res.metadata()->UnpackTo(output.get());
return output; //TODO nuke metadata?
}


void decode_metadata(
const SegmentHeader& hdr,
const uint8_t*& data,
Expand All @@ -137,21 +143,35 @@ std::optional<google::protobuf::Any> decode_metadata_from_segment(const Segment
return decode_metadata(hdr, data, begin);
}

Buffer decode_encoded_fields(
EncodedFieldCollection decode_encoded_fields(
const SegmentHeader& hdr,
const uint8_t* data,
const uint8_t* begin ARCTICDB_UNUSED) {
ARCTICDB_TRACE(log::codec(), "Decoding encoded fields");

util::check(hdr.has_column_fields(), "Expected encoded field description to be set in header");
std::optional<util::BitMagic> bv;
const auto uncompressed_size = encoding_sizes::uncompressed_size(hdr.column_fields());
constexpr auto type_desc = encoded_fields_type_desc();
Column encoded_column(type_desc, uncompressed_size, true, false);
decode_field(type_desc, hdr.column_fields(), data, encoded_column, bv, hdr.encoding_version());

ARCTICDB_TRACE(log::codec(), "Decoded encoded fields at position {}", data-begin);
return {std::move(encoded_column.release_buffer()), std::move(encoded_column.release_shapes())};
}

FrameDescriptorImpl read_frame_descriptor(
const SegmentHeader& hdr,
const uint8_t* data,
const uint8_t* begin ARCTICDB_UNUSED) {
ARCTICDB_TRACE(log::codec(), "Decoding encoded fields");
MetaBuffer meta_buffer;
std::optional<util::BitMagic> bv;
if(hdr.has_column_fields()) {
constexpr auto type_desc = encoded_blocks_type_desc();
decode_field(type_desc, hdr.column_fields(), data, meta_buffer, bv, hdr.encoding_version());
}
ARCTICDB_TRACE(log::codec(), "Decoded encoded fields at position {}", data-begin);
return meta_buffer.detach_buffer();
const uint8_t*& data,
const uint8_t* begin ARCTICDB_UNUSED,
const uint8_t* end) {
util::check_magic<FrameDataMagic>(data);
auto* frame_descriptor = reinterpret_cast<const FrameDescriptorImpl*>(data);
data += sizeof(FrameDescriptorImpl);
return *frame_descriptor;
}


std::optional<FieldCollection> decode_index_fields(
const SegmentHeader& hdr,
const uint8_t*& data,
Expand Down Expand Up @@ -309,9 +329,13 @@ void decode_v2(const Segment& segment,
data += encoding_sizes::field_compressed_size(hdr.descriptor_field());

util::check_magic<IndexMagic>(data);
if(auto index_fields = decode_index_fields(hdr, data, begin, end); index_fields)
res.set_index_fields(std::make_shared<FieldCollection>(std::move(*index_fields)));

if(hdr.has_index_descriptor_field()) {
auto index_frame_descriptor = read_frame_descriptor(hdr, data, begin, end);
auto index_fields = decode_index_fields(hdr, data, begin, end);
auto frame_metadata = extract_frame_metadata(hdr, res);
util::check(index_fields.has_value(), "Failed to get index fields");
res.set_index_descriptorstd::make_shared<FieldCollection>(std::move(*index_fields)));
}
util::check(hdr.has_column_fields(), "Expected column fields in v2 encoding");
util::check_magic<EncodedMagic>(encoded_fields_ptr);
if (data!=end) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Segment encode_dispatch(
const arcticdb::proto::encoding::VariantCodec &codec_opts,
EncodingVersion encoding_version);

Buffer decode_encoded_fields(
EncodedFieldCollection decode_encoded_fields(
const SegmentHeader& hdr,
const uint8_t* data,
const uint8_t* begin ARCTICDB_UNUSED);
Expand Down
16 changes: 10 additions & 6 deletions cpp/arcticdb/codec/encode_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace arcticdb {
// might be non-zero.
max_compressed_bytes += Encoder::max_compressed_size(codec_opts, *block);
}
add_bitmagic_compressed_size(column_data, uncompressed_bytes, max_compressed_bytes);
add_bitmagic_compressed_size(column_data, max_compressed_bytes, uncompressed_bytes);
return std::make_pair(uncompressed_bytes, max_compressed_bytes);
});
}
Expand All @@ -70,7 +70,7 @@ namespace arcticdb {
Buffer& out,
std::ptrdiff_t& pos
) {
column_data.type().visit_tag([&](auto type_desc_tag) {
column_data.type().visit_tag([&codec_opts, &column_data, &field, &out, &pos](auto type_desc_tag) {
using TDT = decltype(type_desc_tag);
using Encoder = TypedBlockEncoderImpl<TypedBlockData, TDT, EncodingVersion::V1>;
ARCTICDB_TRACE(log::codec(), "Column data has {} blocks", column_data.num_blocks());
Expand Down Expand Up @@ -115,7 +115,7 @@ namespace arcticdb {
std::ptrdiff_t pos = 0;
static auto block_to_header_ratio = ConfigsMap::instance()->get_int("Codec.EstimatedHeaderRatio", 75);
const auto preamble = in_mem_seg.num_blocks() * block_to_header_ratio;
auto [max_compressed_size, uncompressed_size, encoded_blocks_bytes] = max_compressed_size_v1(in_mem_seg, codec_opts);
auto [max_compressed_size, uncompressed_size, encoded_buffer_size] = max_compressed_size_v1(in_mem_seg, codec_opts);
ARCTICDB_TRACE(log::codec(), "Estimated max buffer requirement: {}", max_compressed_size);
auto out_buffer = std::make_shared<Buffer>(max_compressed_size, preamble);
ColumnEncoderV1 encoder;
Expand All @@ -124,18 +124,22 @@ namespace arcticdb {
auto descriptor_data = in_mem_seg.descriptor().data_ptr();
descriptor_data->uncompressed_bytes_ = uncompressed_size;

EncodedFieldCollection encoded_fields(encoded_buffer_size, in_mem_seg.num_columns());
auto encoded_field_pos = 0u;

encode_metadata<EncodingPolicyV1>(in_mem_seg, segment_header, codec_opts, *out_buffer, pos);

if(in_mem_seg.row_count() > 0) {
ARCTICDB_TRACE(log::codec(), "Encoding fields");
for (std::size_t column_index = 0; column_index < in_mem_seg.num_columns(); ++column_index) {
auto column_data = in_mem_seg.column_data(column_index);
auto *encoded_field = segment_header->mutable_fields()->Add();
encoder.encode(codec_opts, column_data, encoded_field, *out_buffer, pos);
auto* column_field = encoded_fields.add_field(column_index, encoded_field_pos);
encoder.encode(codec_opts, column_data, *column_field, *out_buffer, pos);
ARCTICDB_TRACE(log::codec(), "Encoded column {}: ({}) to position {}", column_index, in_mem_seg.descriptor().fields(column_index).name(), pos);
}
encode_string_pool<EncodingPolicyV1>(in_mem_seg, *segment_header, codec_opts, *out_buffer, pos);
encode_string_pool<EncodingPolicyV1>(in_mem_seg, segment_header, codec_opts, *out_buffer, pos);
}
segment_header.set_body_fields(EncodedFieldCollection(std::move(encoded_fields)));
ARCTICDB_DEBUG(log::codec(), "Setting buffer bytes to {}", pos);
out_buffer->set_bytes(pos);
descriptor_data->compressed_bytes_ = pos;
Expand Down
Loading

0 comments on commit 468ae02

Please sign in to comment.