Skip to content

Commit

Permalink
Fixing thing
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Apr 24, 2024
1 parent 0e9cc22 commit 398eed5
Show file tree
Hide file tree
Showing 23 changed files with 167 additions and 143 deletions.
4 changes: 2 additions & 2 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ if(${TEST})
)
endif()

gtest_discover_tests(test_unit_arcticdb PROPERTIES DISCOVERY_TIMEOUT 60)
# gtest_discover_tests(test_unit_arcticdb PROPERTIES DISCOVERY_TIMEOUT 60)

set(benchmark_srcs
stream/test/stream_test_common.cpp
Expand Down Expand Up @@ -991,5 +991,5 @@ if(${TEST})
)
endif()

gtest_discover_tests(arcticdb_rapidcheck_tests PROPERTIES DISCOVERY_TIMEOUT 60)
#gtest_discover_tests(arcticdb_rapidcheck_tests PROPERTIES DISCOVERY_TIMEOUT 60)
endif()
5 changes: 2 additions & 3 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace arcticdb::async {
auto key = std::move(key_segment_pair.atom_key());
auto seg = std::move(key_segment_pair.release_segment());
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
seg.size(),
key);
auto &hdr = seg.header();
const auto& desc = seg.descriptor();
Expand All @@ -53,8 +53,7 @@ namespace arcticdb::async {

pipelines::SliceAndKey DecodeSlicesTask::decode_into_slice(std::pair<Segment, pipelines::SliceAndKey>&& sk_pair) const {
auto [seg, sk] = std::move(sk_pair);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(sk.key()));

auto &hdr = seg.header();
Expand Down
12 changes: 4 additions & 8 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,7 @@ struct DecodeSegmentTask : BaseTask {
ARCTICDB_SAMPLE(DecodeAtomTask, 0)

auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(),
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

return {key_seg.variant_key(), decode_segment(std::move(key_seg.segment()))};
Expand Down Expand Up @@ -442,8 +441,7 @@ struct DecodeMetadataTask : BaseTask {
std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeMetadataTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto meta = decode_metadata_from_segment(key_seg.segment());
std::pair<VariantKey, std::optional<google::protobuf::Any>> output;
Expand All @@ -463,8 +461,7 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor(key_seg.segment());

Expand All @@ -483,8 +480,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {
std::tuple<VariantKey, std::optional<google::protobuf::Any>, StreamDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(ReadMetadataAndDescriptorTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment of size {} with key {}",
key_seg.segment().total_segment_size(), variant_key_view(key_seg.variant_key()));
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto [any, descriptor] = decode_metadata_and_descriptor_fields(key_seg.segment());
return std::make_tuple(
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/encode_v1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,6 @@ namespace arcticdb {
ARCTICDB_DEBUG(log::codec(), "Setting buffer bytes to {}", pos);
out_buffer->set_bytes(pos);
descriptor_data->compressed_bytes_ = pos;
return {std::move(segment_header), std::move(out_buffer), descriptor_data, in_mem_seg.descriptor().fields_ptr(), in_mem_seg.descriptor().id()};
return {std::move(segment_header), std::move(out_buffer), descriptor_data, in_mem_seg.descriptor().fields_ptr(), in_mem_seg.descriptor().id(), std::nullopt};
}
}
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/encode_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ static void encode_encoded_fields(

ARCTICDB_TRACE(log::codec(), "Encoded header: {}", segment_header);
const auto& desc = in_mem_seg.descriptor();
return {std::move(segment_header), std::move(out_buffer), descriptor_data, desc.fields_ptr(), desc.id()};
return {std::move(segment_header), std::move(out_buffer), descriptor_data, desc.fields_ptr(), desc.id(), std::nullopt};
}

} //namespace arcticdb
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/encoded_field_collection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class EncodedFieldCollection {
}

EncodedFieldImpl& current() {
return *reinterpret_cast<EncodedFieldImpl*>(buffer_->ptr_cast<uint8_t>(pos_, sizeof(EncodedFieldImpl)));
return *reinterpret_cast<EncodedFieldImpl*>(buffer_->ptr_cast<uint8_t>(pos_, EncodedFieldImpl::Size));
}

EncodedFieldImpl& operator*() {
Expand Down
25 changes: 12 additions & 13 deletions cpp/arcticdb/codec/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ SegmentCompressedSize compressed(const SegmentHeader &seg_hdr, const std::option
if (seg_hdr.has_metadata_field())
metadata_size = encoding_sizes::ndarray_field_compressed_size(seg_hdr.metadata_field().ndarray());

auto buffer_size = 0U;
auto body_size = 0U;
size_t buffer_size;
size_t body_size;
if(seg_hdr.encoding_version() == EncodingVersion::V1) {
const auto fields_size = encoding_sizes::segment_compressed_size(proto_wrapper->proto().fields());
ARCTICDB_DEBUG(log::codec(), "Calculating total size: {} fields + {} metadata + {} string pool = {}", fields_size, metadata_size, string_pool_size, fields_size + metadata_size + string_pool_size);
Expand Down Expand Up @@ -209,7 +209,7 @@ Segment Segment::from_bytes(const std::uint8_t* src, std::size_t readable_size,
}

set_body_fields(seg_hdr, src);
return {std::move(seg_hdr), std::move(variant_buffer), std::move(desc_data), std::move(fields), stream_id};
return {std::move(seg_hdr), std::move(variant_buffer), std::move(desc_data), std::move(fields), stream_id, readable_size};
}

Segment Segment::from_buffer(const std::shared_ptr<Buffer>& buffer) {
Expand All @@ -234,12 +234,12 @@ Segment Segment::from_buffer(const std::shared_ptr<Buffer>& buffer) {
set_body_fields(seg_hdr, src);
buffer->set_preamble(FIXED_HEADER_SIZE + fixed_hdr->header_bytes);
ARCTICDB_SUBSAMPLE(CreateSegment, 0)
return{std::move(seg_hdr), buffer, std::move(desc_data), std::move(fields), stream_id};
return{std::move(seg_hdr), buffer, std::move(desc_data), std::move(fields), stream_id, readable_size};
}

size_t Segment::write_proto_header(uint8_t* dst) const {
size_t Segment::write_proto_header(uint8_t* dst) {
const auto& header = generate_header_proto();
const auto hdr_size = proto_size();
const auto& header = header_proto();
FixedHeader hdr = {MAGIC_NUMBER, HEADER_VERSION_V1, std::uint32_t(hdr_size)};
write_fixed_header(dst, hdr);

Expand All @@ -264,7 +264,7 @@ std::pair<uint8_t*, size_t> Segment::serialize_header_v2(size_t expected_bytes)
auto* dst = buffer->preamble();
write_fixed_header(dst, hdr);
header_.serialize_to_bytes(dst + FIXED_HEADER_SIZE, expected_bytes);
return std::make_pair(buffer->preamble(), total_segment_size());
return std::make_pair(buffer->preamble(), calculate_size());
}

std::pair<uint8_t*, size_t> Segment::serialize_v1_header_in_place(size_t total_hdr_size) {
Expand All @@ -273,19 +273,19 @@ std::pair<uint8_t*, size_t> Segment::serialize_v1_header_in_place(size_t total_h
util::check(base_ptr + total_hdr_size == buffer->data(), "Expected base ptr to align with data ptr, {} != {}",fmt::ptr(base_ptr + total_hdr_size),fmt::ptr(buffer->data()));
write_proto_header(base_ptr);
ARCTICDB_TRACE(log::storage(), "Header fits in internal buffer {:x} with {} bytes space: {}", intptr_t (base_ptr), buffer->preamble_bytes() - total_hdr_size,dump_bytes(buffer->data(), buffer->bytes(), 100u));
return std::make_pair(base_ptr, total_segment_size());
return std::make_pair(base_ptr, calculate_size());
}

std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_v1_header_to_buffer(size_t hdr_size) {
std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_v1_header_to_buffer(size_t hdr_size) {
auto tmp = std::make_unique<Buffer>();
ARCTICDB_DEBUG(log::storage(), "Header doesn't fit in internal buffer, needed {} bytes but had {}, writing to temp buffer at {:x}", hdr_size, buffer_.preamble_bytes(),uintptr_t(tmp->data()));
tmp->ensure(total_segment_size());
tmp->ensure(calculate_size());
auto* dst = tmp->preamble();
write_proto_header(dst);
std::memcpy(dst + FIXED_HEADER_SIZE + hdr_size,
buffer().data(),
buffer().bytes());
return std::make_tuple(tmp->preamble(), total_segment_size(), std::move(tmp));
return std::make_tuple(tmp->preamble(), calculate_size(), std::move(tmp));
}

std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_header_v1() {
Expand Down Expand Up @@ -322,8 +322,7 @@ std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> Segment::serialize_header(
return desc_.fields(pos);
}

const arcticdb::proto::encoding::SegmentHeader& Segment::header_proto() const {
util::check(header_.encoding_version() == EncodingVersion::V1, "Got proto request in V2 encoding");
const arcticdb::proto::encoding::SegmentHeader& Segment::generate_header_proto() {
if(!proto_)
proto_ = std::make_unique<arcticdb::proto::encoding::SegmentHeader>(generate_proto_header(header_, desc_));

Expand Down
103 changes: 65 additions & 38 deletions cpp/arcticdb/codec/segment.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ SegmentHeaderProtoWrapper decode_protobuf_header(const uint8_t* data, size_t hea

arcticdb::proto::encoding::SegmentHeader generate_proto_header(const SegmentHeader& header, const StreamDescriptor& desc);

template<typename T, typename = std::enable_if_t<std::is_integral_v<T>>>
constexpr EncodingVersion to_encoding_version(T encoding_version) {
util::check(encoding_version >= 0 && encoding_version < uint16_t(EncodingVersion::COUNT), "Invalid encoding version");
return static_cast<EncodingVersion>(encoding_version);
}

static constexpr uint16_t HEADER_VERSION_V1 = 1;
static constexpr uint16_t HEADER_VERSION_V2 = 2;

Expand All @@ -76,35 +70,51 @@ class Segment {
public:
Segment() = default;

Segment(SegmentHeader&& header, std::shared_ptr<Buffer> buffer, std::shared_ptr<SegmentDescriptorImpl> data, std::shared_ptr<FieldCollection> fields, StreamId stream_id) :
header_(std::move(header)),
buffer_(std::move(buffer)),
desc_(std::move(data), std::move(fields), std::move(stream_id)){
Segment(
SegmentHeader&& header,
std::shared_ptr<Buffer> buffer,
std::shared_ptr<SegmentDescriptorImpl> data,
std::shared_ptr<FieldCollection> fields,
StreamId stream_id,
size_t size) :
header_(std::move(header)),
buffer_(std::move(buffer)),
desc_(std::move(data), std::move(fields), std::move(stream_id)),
size_(size) {
}

Segment(SegmentHeader&& header, BufferView buffer, std::shared_ptr<SegmentDescriptorImpl> data, std::shared_ptr<FieldCollection> fields, StreamId stream_id) :
header_(std::move(header)),
buffer_(buffer),
desc_(std::move(data), std::move(fields), std::move(stream_id)) {
Segment(
SegmentHeader&& header,
BufferView buffer,
std::shared_ptr<SegmentDescriptorImpl> data,
std::shared_ptr<FieldCollection> fields,
StreamId stream_id,
const std::optional<size_t>& size) :
header_(std::move(header)),
buffer_(buffer),
desc_(std::move(data), std::move(fields), std::move(stream_id)),
size_(size) {
}

Segment(SegmentHeader&& header, VariantBuffer &&buffer, std::shared_ptr<SegmentDescriptorImpl> data, std::shared_ptr<FieldCollection> fields, StreamId stream_id) :
header_(std::move(header)),
buffer_(std::move(buffer)),
desc_(std::move(data), std::move(fields), std::move(stream_id)) {
}

Segment(SegmentHeader&& header, VariantBuffer &&buffer, StreamDescriptor&& desc) :
header_(std::move(header)),
buffer_(std::move(buffer)),
desc_(std::move(desc)) {
Segment(
SegmentHeader&& header,
VariantBuffer &&buffer,
std::shared_ptr<SegmentDescriptorImpl> data,
std::shared_ptr<FieldCollection> fields,
StreamId stream_id,
const std::optional<size_t> size) :
header_(std::move(header)),
buffer_(std::move(buffer)),
desc_(std::move(data), std::move(fields), std::move(stream_id)),
size_(size) {
}

Segment(Segment &&that) noexcept {
using std::swap;
swap(header_, that.header_);
swap(desc_, that.desc_);
swap(keepalive_, that.keepalive_);
swap(size_, that.size_);
buffer_.move_buffer(std::move(that.buffer_));
}

Expand All @@ -113,6 +123,7 @@ class Segment {
swap(header_, that.header_);
swap(desc_, that.desc_);
swap(keepalive_, that.keepalive_);
swap(size_, that.size_);
buffer_.move_buffer(std::move(that.buffer_));
return *this;
}
Expand All @@ -131,22 +142,33 @@ class Segment {

std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> serialize_header();

size_t write_proto_header(uint8_t* dst) const;
size_t write_proto_header(uint8_t* dst);

[[nodiscard]] std::size_t total_segment_size() const {
return FIXED_HEADER_SIZE + segment_header_bytes_size() + buffer_bytes();
[[nodiscard]] std::size_t size() const {
util::check(size_.has_value(), "Segment size has not been set");
return *size_;
}

[[nodiscard]] size_t proto_size() const {
if(!proto_size_)
proto_size_ = header_proto().ByteSizeLong();
[[nodiscard]] std::size_t calculate_size() {
if(!size_.has_value())
size_ = FIXED_HEADER_SIZE + segment_header_bytes_size() + buffer_bytes();

return *proto_size_;
return *size_;
}

[[nodiscard]] std::size_t segment_header_bytes_size() const {
if(header_.encoding_version() == EncodingVersion::V1)
const arcticdb::proto::encoding::SegmentHeader& generate_header_proto();

[[nodiscard]] size_t proto_size() {
util::check(static_cast<bool>(proto_), "Proto has not been generated");

return proto_->ByteSizeLong();
}

[[nodiscard]] std::size_t segment_header_bytes_size() {
if(header_.encoding_version() == EncodingVersion::V1) {
generate_header_proto();
return proto_size();
}
else
return header_.bytes();
}
Expand Down Expand Up @@ -177,8 +199,6 @@ class Segment {

[[nodiscard]] const Field& fields(size_t pos) const;

const arcticdb::proto::encoding::SegmentHeader& header_proto() const;

void force_own_buffer() {
buffer_.force_own_buffer();
keepalive_.reset();
Expand Down Expand Up @@ -206,10 +226,17 @@ class Segment {
}

Segment clone() const {
return Segment{header_.clone(), buffer_.clone(), desc_.clone()};
return Segment{header_.clone(), buffer_.clone(), desc_.clone(), size_};
}

private:
Segment(SegmentHeader&& header, VariantBuffer&& buffer, StreamDescriptor&& desc, const std::optional<size_t> size) :
header_(std::move(header)),
buffer_(std::move(buffer)),
desc_(std::move(desc)),
size_(size) {
}

std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> serialize_v1_header_to_buffer(size_t total_hdr_size);
std::pair<uint8_t*, size_t> serialize_v1_header_in_place(size_t total_header_size);
std::tuple<uint8_t*, size_t, std::unique_ptr<Buffer>> serialize_header_v1();
Expand All @@ -220,8 +247,8 @@ class Segment {
VariantBuffer buffer_;
StreamDescriptor desc_;
std::any keepalive_;
mutable std::unique_ptr<arcticdb::proto::encoding::SegmentHeader> proto_;
mutable std::optional<size_t> proto_size_;
std::unique_ptr<arcticdb::proto::encoding::SegmentHeader> proto_;
std::optional<size_t> size_;
};

} //namespace arcticdb
Expand Down
6 changes: 3 additions & 3 deletions cpp/arcticdb/codec/test/test_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ TEST(Segment, RoundtripTimeseriesDescriptorWriteToBufferV1) {
auto copy = in_mem_seg.clone();
auto seg = encode_v1(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.total_segment_size();
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
Expand All @@ -462,7 +462,7 @@ TEST(Segment, RoundtripStringsWriteToBufferV1) {
auto copy = in_mem_seg.clone();
auto seg = encode_v1(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.total_segment_size();
const auto bytes = seg.calculate_size();
vec.resize(bytes);
seg.write_to(vec.data());
auto unserialized = Segment::from_bytes(vec.data(), bytes);
Expand Down Expand Up @@ -501,7 +501,7 @@ TEST(Segment, RoundtripTimeseriesDescriptorWriteToBufferV2) {
auto copy = in_mem_seg.clone();
auto seg = encode_v2(std::move(in_mem_seg), codec::default_lz4_codec());
std::vector<uint8_t> vec;
const auto bytes = seg.total_segment_size();
const auto bytes = seg.calculate_size();
log::codec().info("## Resizing buffer to {} bytes", bytes);
vec.resize(bytes);
seg.write_to(vec.data());
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/entity/timeseries_descriptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct TimeseriesDescriptor {
return *frame_data_;
}

[[nodiscard]] IndexDescriptor index() const {
[[nodiscard]] IndexDescriptorImpl index() const {
return segment_desc_->index_;
}

Expand Down
Loading

0 comments on commit 398eed5

Please sign in to comment.