Skip to content

Commit

Permalink
WIP descriptor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Apr 24, 2024
1 parent 0859017 commit 2789314
Show file tree
Hide file tree
Showing 157 changed files with 4,614 additions and 2,989 deletions.
21 changes: 10 additions & 11 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ set(arcticdb_srcs
entity/type_conversion.hpp
entity/types.hpp
entity/type_utils.hpp
entity/types_proto.hpp
entity/types-inl.hpp
entity/variant_key.hpp
entity/versioned_item.hpp
Expand Down Expand Up @@ -310,7 +309,7 @@ set(arcticdb_srcs
stream/index.hpp
stream/merge.hpp
stream/merge.hpp
stream/merge.hpp util/ref_counted_map.hpp
stream/merge.hpp
stream/protobuf_mappings.hpp
stream/row_builder.hpp
stream/schema.hpp
Expand Down Expand Up @@ -353,8 +352,6 @@ set(arcticdb_srcs
util/preconditions.hpp
util/preprocess.hpp
util/ranges_from_future.hpp
util/ref_counted_map.hpp
util/ref_counted_map.hpp
util/regex_filter.hpp
util/simple_string_hash.hpp
util/slab_allocator.hpp
Expand Down Expand Up @@ -390,9 +387,7 @@ set(arcticdb_srcs
codec/codec.cpp
codec/encode_v1.cpp
codec/encode_v2.cpp
codec/encoding_sizes.cpp
codec/segment.cpp
codec/variant_encoded_field_collection.cpp
column_store/chunked_buffer.cpp
column_store/column.cpp
column_store/column_data.cpp
Expand All @@ -408,7 +403,6 @@ set(arcticdb_srcs
entity/performance_tracing.cpp
entity/types.cpp
entity/type_utils.cpp
entity/types_proto.cpp
log/log.cpp
pipeline/column_mapping.cpp
pipeline/column_stats.cpp
Expand Down Expand Up @@ -487,7 +481,12 @@ set(arcticdb_srcs
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
)
codec/segment_header.hpp
codec/protobuf_mappings.hpp
codec/encoded_field.cpp
memory_layout.hpp
codec/protobuf_mappings.cpp
entity/protobuf_mappings.cpp entity/types_proto.hpp entity/types_proto.cpp stream/protobuf_mappings.cpp codec/segment_header.cpp codec/segment_identifier.hpp)

if(${ARCTICDB_INCLUDE_ROCKSDB})
list (APPEND arcticdb_srcs
Expand Down Expand Up @@ -831,7 +830,7 @@ if(${TEST})
version/test/test_sorting_info_state_machine.cpp
version/test/version_map_model.hpp
storage/test/common.hpp
)
codec/test/test_encode_field_collection.cpp codec/test/test_segment_header.cpp codec/test/test_encoded_field.cpp)

set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

Expand Down Expand Up @@ -906,7 +905,7 @@ if(${TEST})
)
endif()

gtest_discover_tests(test_unit_arcticdb PROPERTIES DISCOVERY_TIMEOUT 60)
# gtest_discover_tests(test_unit_arcticdb PROPERTIES DISCOVERY_TIMEOUT 60)

set(benchmark_srcs
stream/test/stream_test_common.cpp
Expand Down Expand Up @@ -992,5 +991,5 @@ if(${TEST})
)
endif()

gtest_discover_tests(arcticdb_rapidcheck_tests PROPERTIES DISCOVERY_TIMEOUT 60)
#gtest_discover_tests(arcticdb_rapidcheck_tests PROPERTIES DISCOVERY_TIMEOUT 60)
endif()
6 changes: 4 additions & 2 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,16 @@ class TaskScheduler {

explicit TaskScheduler(const std::optional<size_t>& cpu_thread_count = std::nullopt, const std::optional<size_t>& io_thread_count = std::nullopt) :
cpu_thread_count_(cpu_thread_count ? *cpu_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumCPUThreads", get_default_num_cpus())),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", std::min(100, (int) (cpu_thread_count_ * 1.5)))),
io_thread_count_(io_thread_count ? *io_thread_count : ConfigsMap::instance()->get_int("VersionStore.NumIOThreads", (int) (cpu_thread_count_ * 1.5))),
cpu_exec_(cpu_thread_count_, std::make_shared<InstrumentedNamedFactory>("CPUPool")) ,
io_exec_(io_thread_count_, std::make_shared<InstrumentedNamedFactory>("IOPool")){
util::check(cpu_thread_count_ > 0 && io_thread_count_ > 0, "Zero IO or CPU threads: {} {}", io_thread_count_, cpu_thread_count_);
ARCTICDB_RUNTIME_DEBUG(log::schedule(), "Task scheduler created with {:d} {:d}", cpu_thread_count_, io_thread_count_);
}

~TaskScheduler() = default;
~TaskScheduler() {
log::version().info("#### Destroying task scheduler");
}

template<class Task>
auto submit_cpu_task(Task &&t) {
Expand Down
13 changes: 6 additions & 7 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ namespace arcticdb::async {
}
}

return index_descriptor(StreamDescriptor::id_from_proto(desc.proto()), idx, fields);
return index_descriptor_from_range(desc.id(), idx, fields);
}
else {
return index_descriptor(StreamDescriptor::id_from_proto(desc.proto()), idx, desc.fields());
return index_descriptor_from_range(desc.id(), idx, desc.fields());
}
});
}
Expand All @@ -39,10 +39,10 @@ namespace arcticdb::async {
auto key = std::move(key_segment_pair.atom_key());
auto seg = std::move(key_segment_pair.release_segment());
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
seg.size(),
key);
auto &hdr = seg.header();
auto desc = StreamDescriptor(std::make_shared<StreamDescriptor::Proto>(std::move(*hdr.mutable_stream_descriptor())), seg.fields_ptr());
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, columns_to_decode_);
ranges_and_key_.col_range_.second = ranges_and_key_.col_range_.first + (descriptor.field_count() - descriptor.index().field_count());
ARCTICDB_TRACE(log::codec(), "Creating segment");
Expand All @@ -53,12 +53,11 @@ namespace arcticdb::async {

pipelines::SliceAndKey DecodeSlicesTask::decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const {
auto [seg, sk] = std::move(sk_pair);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(sk.key()));

auto &hdr = seg.header();
auto desc = StreamDescriptor(std::make_shared<StreamDescriptor::Proto>(std::move(*hdr.mutable_stream_descriptor())), seg.fields_ptr());
const auto& desc = seg.descriptor();
auto descriptor = async::get_filtered_descriptor(desc, filter_columns_);
sk.slice_.adjust_columns(descriptor.field_count() - descriptor.index().field_count());

Expand Down
23 changes: 9 additions & 14 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ struct KeySegmentContinuation {
};

inline storage::KeySegmentPair read_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) { return lib->read(key, opts); });
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
return lib->read(key, opts);
});
}

template <typename Callable>
Expand Down Expand Up @@ -321,8 +323,7 @@ struct DecodeSegmentTask : BaseTask {
ARCTICDB_SAMPLE(DecodeAtomTask, 0)

auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

return {key_seg.variant_key(), decode_segment(std::move(key_seg.segment()))};
Expand Down Expand Up @@ -365,7 +366,7 @@ struct DecodeSlicesTask : BaseTask {
ARCTICDB_SAMPLE(DecodeSlicesTask, 0)
auto sk_pairs = std::move(skp);
return sk_pairs.transform([that=this] (auto&& ssp){
auto seg_slice_pair = std::forward<decltype(ssp)>(ssp);
auto seg_slice_pair = std::move(ssp);
ARCTICDB_DEBUG(log::version(), "Decoding slice {}", seg_slice_pair.second.key());
return that->decode_into_slice(std::move(seg_slice_pair));
});
Expand Down Expand Up @@ -440,8 +441,7 @@ struct DecodeMetadataTask : BaseTask {
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto meta = decode_metadata_from_segment(key_seg.segment());
std::pair<VariantKey, std::optional<google::protobuf::Any>> output;
Expand All @@ -461,18 +461,14 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor(key_seg.segment());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
std::move(key_seg.variant_key()),
TimeseriesDescriptor{
std::make_shared<TimeseriesDescriptor::Proto>(std::move(std::get<1>(*maybe_desc))),
std::make_shared<FieldCollection>(std::move(std::get<2>(*maybe_desc)))}
);
std::move(*maybe_desc));

}
};
Expand All @@ -484,8 +480,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {
std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto [any, descriptor] = decode_metadata_and_descriptor_fields(key_seg.segment());
return std::make_tuple(
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST(Async, SinkBasic) {

auto seg = ac::SegmentInMemory();
aa::EncodeAtomTask enc{
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::entity::NumericId{123}, ac::entity::NumericId{456}, ac::timestamp{457}, {ac::entity::NumericIndex{999}}, std::move(seg), codec_opt, ac::EncodingVersion::V2
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::NumericId{123}, ac::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2
};

auto v = sched.submit_cpu_task(std::move(enc)).via(&aa::io_executor()).thenValue(aa::WriteSegmentTask{lib}).get();
Expand All @@ -52,7 +52,7 @@ TEST(Async, SinkBasic) {
auto default_content_hash = h.digest();

ASSERT_EQ(ac::entity::atom_key_builder().gen_id(6).start_index(456).end_index(457).creation_ts(999)
.content_hash(default_content_hash).build(ac::entity::NumericId{123}, ac::entity::KeyType::GENERATION),
.content_hash(default_content_hash).build(ac::NumericId{123}, ac::entity::KeyType::GENERATION),
to_atom(v)
);
}
Expand Down
51 changes: 27 additions & 24 deletions cpp/arcticdb/codec/codec-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
#include <arcticdb/codec/lz4.hpp>
#include <arcticdb/codec/encoded_field.hpp>
#include <arcticdb/codec/magic_words.hpp>

#include <arcticdb/util/bitset.hpp>
#include <arcticdb/util/buffer.hpp>
#include <arcticdb/util/sparse_utils.hpp>
#include <google/protobuf/text_format.h>

#include <type_traits>

Expand All @@ -32,23 +34,23 @@ void decode_block(const BlockType &block, const std::uint8_t *input, T *output)
arcticdb::detail::PassthroughDecoder::decode_block<T>(input, size_to_decode, output, decoded_size);
} else {
std::uint32_t encoder_version = block.encoder_version();
switch (block.codec().codec_case()) {
case arcticdb::proto::encoding::VariantCodec::kZstd:
arcticdb::detail::ZstdDecoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
case arcticdb::proto::encoding::VariantCodec::kLz4:
arcticdb::detail::Lz4Decoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
default:
util::raise_error_msg("Unsupported block codec {}", block);
switch (block.codec().codec_type()) {
case arcticdb::Codec::ZSTD:
arcticdb::detail::ZstdDecoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
case arcticdb::Codec::LZ4:
arcticdb::detail::Lz4Decoder::decode_block<T>(encoder_version,
input,
size_to_decode,
output,
decoded_size);
break;
default:
util::raise_rte("Unsupported block codec {}", codec_type_to_string(block.codec().codec_type()));
}
}
}
Expand Down Expand Up @@ -78,14 +80,15 @@ std::size_t decode_ndarray(
EncodingVersion encoding_version
) {
ARCTICDB_SUBSAMPLE_AGG(DecodeNdArray)

std::size_t read_bytes = 0;
td.visit_tag([&](auto type_desc_tag) {
using TD = std::decay_t<decltype(type_desc_tag)>;
using T = typename TD::DataTypeTag::raw_type;

const auto data_size = encoding_sizes::data_uncompressed_size(field);
const bool is_empty_array = (data_size == 0) && type_desc_tag.dimension() > Dimension::Dim0;
// Empty array will not contain actual data, however, its sparse map should be loaded
// Empty array types will not contain actual data, however, its sparse map should be loaded
// so that we can distinguish None from []
if(data_size == 0 && !is_empty_array) {
util::check(type_desc_tag.data_type() == DataType::EMPTYVAL,
Expand Down Expand Up @@ -152,25 +155,25 @@ std::size_t decode_ndarray(
return read_bytes;
}

template<class DataSink, typename EncodedFieldType>
template<class DataSink>
std::size_t decode_field(
const TypeDescriptor &td,
const EncodedFieldType &field,
const EncodedFieldImpl &field,
const std::uint8_t *input,
DataSink &data_sink,
std::optional<util::BitMagic>& bv,
EncodingVersion encoding_version) {
size_t magic_size = 0u;
if constexpr(std::is_same_v<EncodedFieldType, EncodedField>) {
if (encoding_version != EncodingVersion::V1) {
magic_size += sizeof(ColumnMagic);
util::check_magic<ColumnMagic>(input);
}

switch (field.encoding_case()) {
case EncodedFieldType::kNdarray:
case EncodedFieldType::NDARRAY:
return decode_ndarray(td, field.ndarray(), input, data_sink, bv, encoding_version) + magic_size;
default:
util::raise_error_msg("Unsupported encoding {}", field);
util::raise_rte("Unsupported encoding {}", field);
}
}

Expand Down
Loading

0 comments on commit 2789314

Please sign in to comment.