Skip to content

Commit

Permalink
Add binary encoding format
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jun 23, 2024
1 parent 1cd7d33 commit 1f6ae95
Show file tree
Hide file tree
Showing 180 changed files with 5,543 additions and 3,521 deletions.
24 changes: 17 additions & 7 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ endif ()

## Core library without python bindings ##
set(arcticdb_srcs
storage/memory_layout.hpp
# header files
async/async_store.hpp
async/batch_read_args.hpp
Expand All @@ -188,7 +189,10 @@ set(arcticdb_srcs
codec/lz4.hpp
codec/magic_words.hpp
codec/passthrough.hpp
codec/protobuf_mappings.hpp
codec/slice_data_sink.hpp
codec/segment_header.hpp
codec/segment_identifier.hpp
codec/typed_block_encoder_impl.hpp
codec/zstd.hpp
column_store/block.hpp
Expand Down Expand Up @@ -277,6 +281,7 @@ set(arcticdb_srcs
storage/azure/azure_mock_client.hpp
storage/azure/azure_real_client.hpp
storage/azure/azure_storage.hpp
storage/lmdb/lmdb.hpp
storage/lmdb/lmdb_client_wrapper.hpp
storage/lmdb/lmdb_mock_client.hpp
storage/lmdb/lmdb_real_client.hpp
Expand Down Expand Up @@ -310,7 +315,7 @@ set(arcticdb_srcs
stream/index.hpp
stream/merge.hpp
stream/merge.hpp
stream/merge.hpp util/ref_counted_map.hpp
stream/merge.hpp
stream/protobuf_mappings.hpp
stream/row_builder.hpp
stream/schema.hpp
Expand Down Expand Up @@ -352,8 +357,6 @@ set(arcticdb_srcs
util/preconditions.hpp
util/preprocess.hpp
util/ranges_from_future.hpp
util/ref_counted_map.hpp
util/ref_counted_map.hpp
util/regex_filter.hpp
util/simple_string_hash.hpp
util/slab_allocator.hpp
Expand Down Expand Up @@ -390,9 +393,10 @@ set(arcticdb_srcs
codec/codec.cpp
codec/encode_v1.cpp
codec/encode_v2.cpp
codec/encoding_sizes.cpp
codec/encoded_field.cpp
codec/protobuf_mappings.cpp
codec/segment.cpp
codec/variant_encoded_field_collection.cpp
codec/segment_header.cpp
column_store/chunked_buffer.cpp
column_store/column.cpp
column_store/column_data.cpp
Expand All @@ -406,6 +410,7 @@ set(arcticdb_srcs
entity/merge_descriptors.cpp
entity/metrics.cpp
entity/performance_tracing.cpp
entity/protobuf_mappings.cpp
entity/types.cpp
entity/type_utils.cpp
entity/types_proto.cpp
Expand Down Expand Up @@ -455,6 +460,7 @@ set(arcticdb_srcs
storage/mongo/mongo_mock_client.cpp
storage/mongo/mongo_storage.cpp
storage/s3/nfs_backed_storage.cpp
storage/s3/ec2_utils.cpp
storage/s3/s3_api.cpp
storage/s3/s3_real_client.cpp
storage/s3/s3_mock_client.cpp
Expand All @@ -465,6 +471,7 @@ set(arcticdb_srcs
stream/append_map.cpp
stream/index.cpp
stream/piloted_clock.cpp
stream/protobuf_mappings.cpp
toolbox/library_tool.cpp
util/allocator.cpp
util/buffer_pool.cpp
Expand All @@ -490,7 +497,7 @@ set(arcticdb_srcs
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
storage/lmdb/lmdb.hpp
)
)

if(${ARCTICDB_INCLUDE_ROCKSDB})
list (APPEND arcticdb_srcs
Expand Down Expand Up @@ -741,6 +748,9 @@ if(${TEST})
set(unit_test_srcs
async/test/test_async.cpp
codec/test/test_codec.cpp
codec/test/test_encode_field_collection.cpp
codec/test/test_segment_header.cpp
codec/test/test_encoded_field.cpp
column_store/test/ingestion_stress_test.cpp
column_store/test/test_column.cpp
column_store/test/test_column_data_random_accessor.cpp
Expand Down Expand Up @@ -808,7 +818,7 @@ if(${TEST})
version/test/test_sorting_info_state_machine.cpp
version/test/version_map_model.hpp
storage/test/common.hpp
)
)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ std::vector<folly::Future<bool>> batch_key_exists(
}


folly::Future<SliceAndKey> async_write(
folly::Future<SliceAndKey> async_write(
folly::Future<std::tuple<PartialKey, SegmentInMemory, pipelines::FrameSlice>> &&input_fut,
const std::shared_ptr<DeDupMap> &de_dup_map) override {
using KeyOptSegment = std::pair<VariantKey, std::optional<Segment>>;
Expand Down
4 changes: 1 addition & 3 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,13 @@ class TaskScheduler {

explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
cpu_thread_count_(cpu_thread_count ? *cpu_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumCPUThreads", get_default_num_cpus())),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", std::min(100, (int) (cpu_thread_count_ * 1.5)))),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", (int) (cpu_thread_count_ * 1.5))),
cpu_exec_(cpu_thread_count_, std::make_shared<InstrumentedNamedFactory>("CPUPool")) ,
io_exec_(io_thread_count_, std::make_shared<InstrumentedNamedFactory>("IOPool")){
util::check(cpu_thread_count_ > 0 && io_thread_count_ > 0, "Zero IO or CPU threads: {} {}", io_thread_count_, cpu_thread_count_);
ARCTICDB_RUNTIME_DEBUG(log::schedule(), "Task scheduler created with {:d} {:d}", cpu_thread_count_, io_thread_count_);
}

~TaskScheduler() = default;

template<class Task>
auto submit_cpu_task(Task &&t) {
auto task = std::forward<decltype(t)>(t);
Expand Down
13 changes: 6 additions & 7 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ namespace arcticdb::async {
}
}

return index_descriptor(StreamDescriptor::id_from_proto(desc.proto()), idx, fields);
return index_descriptor_from_range(desc.id(), idx, fields);
}
else {
return index_descriptor(StreamDescriptor::id_from_proto(desc.proto()), idx, desc.fields());
return index_descriptor_from_range(desc.id(), idx, desc.fields());
}
});
}
Expand All @@ -39,10 +39,10 @@ namespace arcticdb::async {
auto key = std::move(key_segment_pair.atom_key());
auto seg = std::move(key_segment_pair.release_segment());
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
seg.size(),
key);
auto &hdr = seg.header();
auto desc = StreamDescriptor(std::make_shared<StreamDescriptor::Proto>(std::move(*hdr.mutable_stream_descriptor())), seg.fields_ptr());
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, columns_to_decode_);
ranges_and_key_.col_range_.second = ranges_and_key_.col_range_.first + (descriptor.field_count() - descriptor.index().field_count());
ARCTICDB_TRACE(log::codec(), "Creating segment");
Expand All @@ -53,12 +53,11 @@ namespace arcticdb::async {

pipelines::SliceAndKey DecodeSlicesTask::decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const {
auto [seg, sk] = std::move(sk_pair);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(sk.key()));

auto &hdr = seg.header();
auto desc = StreamDescriptor(std::make_shared<StreamDescriptor::Proto>(std::move(*hdr.mutable_stream_descriptor())), seg.fields_ptr());
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, filter_columns_);
sk.slice_.adjust_columns(descriptor.field_count() - descriptor.index().field_count());

Expand Down
29 changes: 12 additions & 17 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct EncodeAtomTask : BaseTask {
storage::KeySegmentPair encode() {
ARCTICDB_DEBUG(log::codec(), "Encoding object with partial key {}", partial_key_);
auto enc_seg = ::arcticdb::encode_dispatch(std::move(segment_), *codec_meta_, encoding_version_);
auto content_hash = hash_segment_header(enc_seg.header());
auto content_hash = get_segment_hash(enc_seg);

AtomKey k = partial_key_.build_key(creation_ts_, content_hash);
return {std::move(k), std::move(enc_seg)};
Expand Down Expand Up @@ -208,7 +208,9 @@ struct KeySegmentContinuation {
};

inline storage::KeySegmentPair read_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) { return lib->read(key, opts); });
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
return lib->read(key, opts);
});
}

template <typename Callable>
Expand Down Expand Up @@ -322,8 +324,7 @@ struct DecodeSegmentTask : BaseTask {
ARCTICDB_SAMPLE(DecodeAtomTask, 0)

auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

return {key_seg.variant_key(), decode_segment(std::move(key_seg.segment()))};
Expand Down Expand Up @@ -365,10 +366,10 @@ struct DecodeSlicesTask : BaseTask {
Composite<pipelines::SliceAndKey> operator()(Composite<std::pair<Segment, pipelines::SliceAndKey>> && skp) const {
ARCTICDB_SAMPLE(DecodeSlicesTask, 0)
auto sk_pairs = std::move(skp);
return sk_pairs.transform([that=this] (auto&& ssp){
auto seg_slice_pair = std::forward<decltype(ssp)>(ssp);
return sk_pairs.transform([this] (auto&& ssp){
auto seg_slice_pair = std::move(ssp);
ARCTICDB_DEBUG(log::version(), "Decoding slice {}", seg_slice_pair.second.key());
return that->decode_into_slice(std::move(seg_slice_pair));
return decode_into_slice(std::move(seg_slice_pair));
});
}

Expand Down Expand Up @@ -442,8 +443,7 @@ struct DecodeMetadataTask : BaseTask {
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto meta = decode_metadata_from_segment(key_seg.segment());
std::pair<VariantKey, std::optional<google::protobuf::Any>> output;
Expand All @@ -463,18 +463,14 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor(key_seg.segment());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
std::move(key_seg.variant_key()),
TimeseriesDescriptor{
std::make_shared<TimeseriesDescriptor::Proto>(std::move(std::get<1>(*maybe_desc))),
std::make_shared<FieldCollection>(std::move(std::get<2>(*maybe_desc)))}
);
std::move(*maybe_desc));

}
};
Expand All @@ -486,8 +482,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {
std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto [any, descriptor] = decode_metadata_and_descriptor_fields(key_seg.segment());
return std::make_tuple(
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST(Async, SinkBasic) {

auto seg = ac::SegmentInMemory();
aa::EncodeAtomTask enc{
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::entity::NumericId{123}, ac::entity::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::NumericId{123}, ac::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2
};

auto v = sched.submit_cpu_task(std::move(enc)).via(&aa::io_executor()).thenValue(aa::WriteSegmentTask{lib}).get();
Expand All @@ -52,7 +52,7 @@ TEST(Async, SinkBasic) {
auto default_content_hash = h.digest();

ASSERT_EQ(ac::entity::atom_key_builder().gen_id(6).start_index(456).end_index(457).creation_ts(999)
.content_hash(default_content_hash).build(ac::entity::NumericId{123}, ac::entity::KeyType::GENERATION),
.content_hash(default_content_hash).build(ac::NumericId{123}, ac::entity::KeyType::GENERATION),
to_atom(v)
);
}
Expand Down Expand Up @@ -129,10 +129,10 @@ TEST(Async, CollectWithThrow) {
}
auto vec_fut = folly::collectAll(stuff).get();
} catch(std::exception&) {
log::version().info("Caught something");
ARCTICDB_DEBUG(log::version(), "Caught something");
}

log::version().info("Collect returned");
ARCTICDB_DEBUG(log::version(), "Collect returned");
}

using IndexSegmentReader = int;
Expand Down
50 changes: 26 additions & 24 deletions cpp/arcticdb/codec/codec-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
#include <arcticdb/codec/lz4.hpp>
#include <arcticdb/codec/encoded_field.hpp>
#include <arcticdb/codec/magic_words.hpp>

#include <arcticdb/util/bitset.hpp>
#include <arcticdb/util/buffer.hpp>
#include <arcticdb/util/sparse_utils.hpp>

#include <type_traits>

Expand All @@ -32,23 +33,23 @@ void decode_block(const BlockType &block, const std::uint8_t *input, T *output)
arcticdb::detail::PassthroughDecoder::decode_block<T>(input, size_to_decode, output, decoded_size);
} else {
std::uint32_t encoder_version = block.encoder_version();
switch (block.codec().codec_case()) {
case arcticdb::proto::encoding::VariantCodec::kZstd:
arcticdb::detail::ZstdDecoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
case arcticdb::proto::encoding::VariantCodec::kLz4:
arcticdb::detail::Lz4Decoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
default:
util::raise_error_msg("Unsupported block codec {}", block);
switch (block.codec().codec_type()) {
case arcticdb::Codec::ZSTD:
arcticdb::detail::ZstdDecoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
case arcticdb::Codec::LZ4:
arcticdb::detail::Lz4Decoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
default:
util::raise_rte("Unsupported block codec {}", codec_type_to_string(block.codec().codec_type()));
}
}
}
Expand Down Expand Up @@ -78,14 +79,15 @@ std::size_t decode_ndarray(
EncodingVersion encoding_version
) {
ARCTICDB_SUBSAMPLE_AGG(DecodeNdArray)

std::size_t read_bytes = 0;
td.visit_tag([&](auto type_desc_tag) {
using TD = std::decay_t<decltype(type_desc_tag)>;
using T = typename TD::DataTypeTag::raw_type;

const auto data_size = encoding_sizes::data_uncompressed_size(field);
const bool is_empty_array = (data_size == 0) && type_desc_tag.dimension() > Dimension::Dim0;
// Empty array will not contain actual data, however, its sparse map should be loaded
// Empty array types will not contain actual data, however, its sparse map should be loaded
// so that we can distinguish None from []
if(data_size == 0 && !is_empty_array) {
util::check(type_desc_tag.data_type() == DataType::EMPTYVAL,
Expand Down Expand Up @@ -152,25 +154,25 @@ std::size_t decode_ndarray(
return read_bytes;
}

template<class DataSink, typename EncodedFieldType>
template<class DataSink>
std::size_t decode_field(
const TypeDescriptor &td,
const EncodedFieldType &field,
const EncodedFieldImpl &field,
const std::uint8_t *input,
DataSink &data_sink,
std::optional<util::BitMagic>& bv,
EncodingVersion encoding_version) {
size_t magic_size = 0u;
if constexpr(std::is_same_v<EncodedFieldType, EncodedField>) {
if (encoding_version != EncodingVersion::V1) {
magic_size += sizeof(ColumnMagic);
util::check_magic<ColumnMagic>(input);
}

switch (field.encoding_case()) {
case EncodedFieldType::kNdarray:
case EncodedFieldType::NDARRAY:
return decode_ndarray(td, field.ndarray(), input, data_sink, bv, encoding_version) + magic_size;
default:
util::raise_error_msg("Unsupported encoding {}", field);
util::raise_rte("Unsupported encoding {}", field);
}
}

Expand Down
Loading

0 comments on commit 1f6ae95

Please sign in to comment.