Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encoding version 2: binary-only memory layout #1317

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ endif ()

## Core library without python bindings ##
set(arcticdb_srcs
storage/memory_layout.hpp
# header files
async/async_store.hpp
async/batch_read_args.hpp
Expand All @@ -188,7 +189,10 @@ set(arcticdb_srcs
codec/lz4.hpp
codec/magic_words.hpp
codec/passthrough.hpp
codec/protobuf_mappings.hpp
codec/slice_data_sink.hpp
codec/segment_header.hpp
codec/segment_identifier.hpp
codec/typed_block_encoder_impl.hpp
codec/zstd.hpp
column_store/block.hpp
Expand Down Expand Up @@ -277,6 +281,7 @@ set(arcticdb_srcs
storage/azure/azure_mock_client.hpp
storage/azure/azure_real_client.hpp
storage/azure/azure_storage.hpp
storage/lmdb/lmdb.hpp
storage/lmdb/lmdb_client_wrapper.hpp
storage/lmdb/lmdb_mock_client.hpp
storage/lmdb/lmdb_real_client.hpp
Expand Down Expand Up @@ -310,7 +315,7 @@ set(arcticdb_srcs
stream/index.hpp
stream/merge.hpp
stream/merge.hpp
stream/merge.hpp util/ref_counted_map.hpp
stream/merge.hpp
stream/protobuf_mappings.hpp
stream/row_builder.hpp
stream/schema.hpp
Expand Down Expand Up @@ -352,8 +357,6 @@ set(arcticdb_srcs
util/preconditions.hpp
util/preprocess.hpp
util/ranges_from_future.hpp
util/ref_counted_map.hpp
util/ref_counted_map.hpp
util/regex_filter.hpp
util/simple_string_hash.hpp
util/slab_allocator.hpp
Expand Down Expand Up @@ -390,9 +393,10 @@ set(arcticdb_srcs
codec/codec.cpp
codec/encode_v1.cpp
codec/encode_v2.cpp
codec/encoding_sizes.cpp
codec/encoded_field.cpp
codec/protobuf_mappings.cpp
codec/segment.cpp
codec/variant_encoded_field_collection.cpp
codec/segment_header.cpp
column_store/chunked_buffer.cpp
column_store/column.cpp
column_store/column_data.cpp
Expand All @@ -406,6 +410,7 @@ set(arcticdb_srcs
entity/merge_descriptors.cpp
entity/metrics.cpp
entity/performance_tracing.cpp
entity/protobuf_mappings.cpp
entity/types.cpp
entity/type_utils.cpp
entity/types_proto.cpp
Expand Down Expand Up @@ -455,6 +460,7 @@ set(arcticdb_srcs
storage/mongo/mongo_mock_client.cpp
storage/mongo/mongo_storage.cpp
storage/s3/nfs_backed_storage.cpp
storage/s3/ec2_utils.cpp
storage/s3/s3_api.cpp
storage/s3/s3_real_client.cpp
storage/s3/s3_mock_client.cpp
Expand All @@ -465,6 +471,7 @@ set(arcticdb_srcs
stream/append_map.cpp
stream/index.cpp
stream/piloted_clock.cpp
stream/protobuf_mappings.cpp
toolbox/library_tool.cpp
util/allocator.cpp
util/buffer_pool.cpp
Expand All @@ -490,7 +497,7 @@ set(arcticdb_srcs
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
storage/lmdb/lmdb.hpp
)
)

if(${ARCTICDB_INCLUDE_ROCKSDB})
list (APPEND arcticdb_srcs
Expand Down Expand Up @@ -741,6 +748,9 @@ if(${TEST})
set(unit_test_srcs
async/test/test_async.cpp
codec/test/test_codec.cpp
codec/test/test_encode_field_collection.cpp
codec/test/test_segment_header.cpp
codec/test/test_encoded_field.cpp
column_store/test/ingestion_stress_test.cpp
column_store/test/test_column.cpp
column_store/test/test_column_data_random_accessor.cpp
Expand Down Expand Up @@ -808,7 +818,7 @@ if(${TEST})
version/test/test_sorting_info_state_machine.cpp
version/test/version_map_model.hpp
storage/test/common.hpp
)
)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ std::vector<folly::Future<bool>> batch_key_exists(
}


folly::Future<SliceAndKey> async_write(
folly::Future<SliceAndKey> async_write(
folly::Future<std::tuple<PartialKey, SegmentInMemory, pipelines::FrameSlice>> &&input_fut,
const std::shared_ptr<DeDupMap> &de_dup_map) override {
using KeyOptSegment = std::pair<VariantKey, std::optional<Segment>>;
Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,13 @@ class TaskScheduler {

explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
cpu_thread_count_(cpu_thread_count ? *cpu_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumCPUThreads", get_default_num_cpus())),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", std::min(100, (int) (cpu_thread_count_ * 1.5)))),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", (int) (cpu_thread_count_ * 1.5))),
cpu_exec_(cpu_thread_count_, std::make_shared<InstrumentedNamedFactory>("CPUPool")) ,
io_exec_(io_thread_count_, std::make_shared<InstrumentedNamedFactory>("IOPool")){
util::check(cpu_thread_count_ > 0 && io_thread_count_ > 0, "Zero IO or CPU threads: {} {}", io_thread_count_, cpu_thread_count_);
ARCTICDB_RUNTIME_DEBUG(log::schedule(), "Task scheduler created with {:d} {:d}", cpu_thread_count_, io_thread_count_);
}

~TaskScheduler() = default;

template<class Task>
auto submit_cpu_task(Task &&t) {
auto task = std::forward<decltype(t)>(t);
Expand Down
13 changes: 6 additions & 7 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ namespace arcticdb::async {
}
}

return index_descriptor(StreamDescriptor::id_from_proto(desc.proto()), idx, fields);
return index_descriptor_from_range(desc.id(), idx, fields);
}
else {
return index_descriptor(StreamDescriptor::id_from_proto(desc.proto()), idx, desc.fields());
return index_descriptor_from_range(desc.id(), idx, desc.fields());
}
});
}
Expand All @@ -39,10 +39,10 @@ namespace arcticdb::async {
auto key = std::move(key_segment_pair.atom_key());
auto seg = std::move(key_segment_pair.release_segment());
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
seg.size(),
key);
auto &hdr = seg.header();
auto desc = StreamDescriptor(std::make_shared<StreamDescriptor::Proto>(std::move(*hdr.mutable_stream_descriptor())), seg.fields_ptr());
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, columns_to_decode_);
ranges_and_key_.col_range_.second = ranges_and_key_.col_range_.first + (descriptor.field_count() - descriptor.index().field_count());
ARCTICDB_TRACE(log::codec(), "Creating segment");
Expand All @@ -53,12 +53,11 @@ namespace arcticdb::async {

pipelines::SliceAndKey DecodeSlicesTask::decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const {
auto [seg, sk] = std::move(sk_pair);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(sk.key()));

auto &hdr = seg.header();
auto desc = StreamDescriptor(std::make_shared<StreamDescriptor::Proto>(std::move(*hdr.mutable_stream_descriptor())), seg.fields_ptr());
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, filter_columns_);
sk.slice_.adjust_columns(descriptor.field_count() - descriptor.index().field_count());

Expand Down
29 changes: 12 additions & 17 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct EncodeAtomTask : BaseTask {
storage::KeySegmentPair encode() {
ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_);
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
auto content_hash = hash_segment_header(enc_seg.header());
auto content_hash = get_segment_hash(enc_seg);

AtomKey k = partial_key_.build_key(creation_ts_, content_hash);
return {std::move(k), std::move(enc_seg)};
Expand Down Expand Up @@ -208,7 +208,9 @@ struct KeySegmentContinuation {
};

inline storage::KeySegmentPair read_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) { return lib->read(key, opts); });
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
return lib->read(key, opts);
});
}

template <typename Callable>
Expand Down Expand Up @@ -322,8 +324,7 @@ struct DecodeSegmentTask : BaseTask {
ARCTICDB_SAMPLE(DecodeAtomTask, 0)

auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

return {key_seg.variant_key(), decode_segment(std::move(key_seg.segment()))};
Expand Down Expand Up @@ -365,10 +366,10 @@ struct DecodeSlicesTask : BaseTask {
Composite<pipelines::SliceAndKey> operator()(Composite<std::pair<Segment, pipelines::SliceAndKey>> && skp) const {
ARCTICDB_SAMPLE(DecodeSlicesTask, 0)
auto sk_pairs = std::move(skp);
return sk_pairs.transform([that=this] (auto&& ssp){
auto seg_slice_pair = std::forward<decltype(ssp)>(ssp);
return sk_pairs.transform([this] (auto&& ssp){
auto seg_slice_pair = std::move(ssp);
ARCTICDB_DEBUG(log::version(), "Decoding slice {}", seg_slice_pair.second.key());
return that->decode_into_slice(std::move(seg_slice_pair));
return decode_into_slice(std::move(seg_slice_pair));
});
}

Expand Down Expand Up @@ -442,8 +443,7 @@ struct DecodeMetadataTask : BaseTask {
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto meta = decode_metadata_from_segment(key_seg.segment());
std::pair<VariantKey, std::optional<google::protobuf::Any>> output;
Expand All @@ -463,18 +463,14 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor(key_seg.segment());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
std::move(key_seg.variant_key()),
TimeseriesDescriptor{
std::make_shared<TimeseriesDescriptor::Proto>(std::move(std::get<1>(*maybe_desc))),
std::make_shared<FieldCollection>(std::move(std::get<2>(*maybe_desc)))}
);
std::move(*maybe_desc));

}
};
Expand All @@ -486,8 +482,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {
std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto [any, descriptor] = decode_metadata_and_descriptor_fields(key_seg.segment());
return std::make_tuple(
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST(Async, SinkBasic) {

auto seg = ac::SegmentInMemory();
aa::EncodeAtomTask enc{
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::entity::NumericId{123}, ac::entity::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::NumericId{123}, ac::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2
};

auto v = sched.submit_cpu_task(std::move(enc)).via(&aa::io_executor()).thenValue(aa::WriteSegmentTask{lib}).get();
Expand All @@ -52,7 +52,7 @@ TEST(Async, SinkBasic) {
auto default_content_hash = h.digest();

ASSERT_EQ(ac::entity::atom_key_builder().gen_id(6).start_index(456).end_index(457).creation_ts(999)
.content_hash(default_content_hash).build(ac::entity::NumericId{123}, ac::entity::KeyType::GENERATION),
.content_hash(default_content_hash).build(ac::NumericId{123}, ac::entity::KeyType::GENERATION),
to_atom(v)
);
}
Expand Down Expand Up @@ -129,10 +129,10 @@ TEST(Async, CollectWithThrow) {
}
auto vec_fut = folly::collectAll(stuff).get();
} catch(std::exception&) {
log::version().info("Caught something");
ARCTICDB_DEBUG(log::version(), "Caught something");
}

log::version().info("Collect returned");
ARCTICDB_DEBUG(log::version(), "Collect returned");
}

using IndexSegmentReader = int;
Expand Down
50 changes: 26 additions & 24 deletions cpp/arcticdb/codec/codec-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
#include <arcticdb/codec/lz4.hpp>
#include <arcticdb/codec/encoded_field.hpp>
#include <arcticdb/codec/magic_words.hpp>

#include <arcticdb/util/bitset.hpp>
#include <arcticdb/util/buffer.hpp>
#include <arcticdb/util/sparse_utils.hpp>

#include <type_traits>

Expand All @@ -32,23 +33,23 @@ void decode_block(const BlockType &block, const std::uint8_t *input, T *output)
arcticdb::detail::PassthroughDecoder::decode_block<T>(input, size_to_decode, output, decoded_size);
} else {
std::uint32_t encoder_version = block.encoder_version();
switch (block.codec().codec_case()) {
case arcticdb::proto::encoding::VariantCodec::kZstd:
arcticdb::detail::ZstdDecoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
case arcticdb::proto::encoding::VariantCodec::kLz4:
arcticdb::detail::Lz4Decoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
default:
util::raise_error_msg("Unsupported block codec {}", block);
switch (block.codec().codec_type()) {
case arcticdb::Codec::ZSTD:
arcticdb::detail::ZstdDecoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
case arcticdb::Codec::LZ4:
arcticdb::detail::Lz4Decoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
default:
util::raise_rte("Unsupported block codec {}", codec_type_to_string(block.codec().codec_type()));
}
}
}
Expand Down Expand Up @@ -78,14 +79,15 @@ std::size_t decode_ndarray(
EncodingVersion encoding_version
) {
ARCTICDB_SUBSAMPLE_AGG(DecodeNdArray)

std::size_t read_bytes = 0;
td.visit_tag([&](auto type_desc_tag) {
using TD = std::decay_t<decltype(type_desc_tag)>;
using T = typename TD::DataTypeTag::raw_type;

const auto data_size = encoding_sizes::data_uncompressed_size(field);
const bool is_empty_array = (data_size == 0) && type_desc_tag.dimension() > Dimension::Dim0;
// Empty array will not contain actual data, however, its sparse map should be loaded
// Empty array types will not contain actual data, however, its sparse map should be loaded
// so that we can distinguish None from []
if(data_size == 0 && !is_empty_array) {
util::check(type_desc_tag.data_type() == DataType::EMPTYVAL,
Expand Down Expand Up @@ -152,25 +154,25 @@ std::size_t decode_ndarray(
return read_bytes;
}

template<class DataSink, typename EncodedFieldType>
template<class DataSink>
std::size_t decode_field(
const TypeDescriptor &td,
const EncodedFieldType &field,
const EncodedFieldImpl &field,
const std::uint8_t *input,
DataSink &data_sink,
std::optional<util::BitMagic>& bv,
EncodingVersion encoding_version) {
size_t magic_size = 0u;
if constexpr(std::is_same_v<EncodedFieldType, EncodedField>) {
if (encoding_version != EncodingVersion::V1) {
magic_size += sizeof(ColumnMagic);
util::check_magic<ColumnMagic>(input);
}

switch (field.encoding_case()) {
case EncodedFieldType::kNdarray:
case EncodedFieldType::NDARRAY:
return decode_ndarray(td, field.ndarray(), input, data_sink, bv, encoding_version) + magic_size;
default:
util::raise_error_msg("Unsupported encoding {}", field);
util::raise_rte("Unsupported encoding {}", field);
}
}

Expand Down
Loading
Loading