Skip to content

Commit

Permalink
Metadata caching
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Mar 5, 2024
1 parent dcfb80f commit eed4ea8
Show file tree
Hide file tree
Showing 54 changed files with 761 additions and 403 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ set(arcticdb_srcs
version/version_utils.cpp
version/symbol_list.cpp
version/version_map_batch_methods.cpp
version/symbol_metadata.hpp version/metadata_cache.hpp util/frame_adapter.hpp)
version/symbol_metadata.hpp version/metadata_cache.hpp util/frame_adapter.hpp version/metadata_cache.cpp version/symbol_metadata.cpp)

if(${ARCTICDB_INCLUDE_ROCKSDB})
list (APPEND arcticdb_srcs
Expand Down
16 changes: 14 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ entity::VariantKey write_sync(
const StreamId &stream_id,
IndexValue start_index,
IndexValue end_index,
timestamp creation_ts,
SegmentInMemory &&segment) override {

util::check(segment.descriptor().id() == stream_id,
Expand All @@ -128,12 +129,23 @@ entity::VariantKey write_sync(
segment.descriptor().id());

auto encoded = EncodeAtomTask{
key_type, version_id, stream_id, start_index, end_index, current_timestamp(),
key_type, version_id, stream_id, start_index, end_index, creation_ts,
std::move(segment), codec_, encoding_version_
}();
return WriteSegmentTask{library_}(std::move(encoded));
}

entity::VariantKey write_sync(
stream::KeyType key_type,
VersionId version_id,
const StreamId &stream_id,
IndexValue start_index,
IndexValue end_index,
SegmentInMemory &&segment) override {

return write_sync(key_type, version_id, stream_id, start_index, end_index, current_timestamp(), std::move(segment));
}

entity::VariantKey write_sync(PartialKey pk, SegmentInMemory &&segment) override {
return write_sync(pk.key_type, pk.version_id, pk.stream_id, pk.start_index, pk.end_index, std::move(segment));
}
Expand All @@ -157,7 +169,7 @@ void write_compressed_sync(storage::KeySegmentPair &&ks) override {

folly::Future<entity::VariantKey> update(const entity::VariantKey &key,
SegmentInMemory &&segment,
storage::UpdateOpts opts) override {
storage::StorageUpdateOptions opts) override {
auto stream_id = variant_key_id(key);
util::check(segment.descriptor().id() == stream_id,
"Descriptor id mismatch in variant key {} != {}",
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ struct WriteSegmentTask : BaseTask {

struct UpdateSegmentTask : BaseTask {
std::shared_ptr<storage::Library> lib_;
storage::UpdateOpts opts_;
storage::StorageUpdateOptions opts_;

explicit UpdateSegmentTask(std::shared_ptr<storage::Library> lib, storage::UpdateOpts opts) :
explicit UpdateSegmentTask(std::shared_ptr<storage::Library> lib, storage::StorageUpdateOptions opts) :
lib_(std::move(lib)),
opts_(opts) {
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/column_store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void Column::append_sparse_map(const util::BitMagic& bv, position_t at_row) {
void Column::append(const Column& other, position_t at_row) {
if (other.row_count() == 0)
return;

util::check(type() == other.type(), "Cannot append column type {} to column type {}", type(), other.type());
const bool was_sparse = is_sparse();
const bool was_empty = empty();
Expand Down
23 changes: 17 additions & 6 deletions cpp/arcticdb/entity/native_tensor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct NativeTensor {
dt_(other.dt_),
elsize_(other.elsize_),
ptr(other.ptr),
expanded_dim_(other.expanded_dim_){
expanded_dim_(other.expanded_dim_),
native_type_(other.native_type_){
for (ssize_t i = 0; i < std::min(MaxDimensions, ndim_); ++i)
shapes_[i] = other.shapes_[i];

Expand All @@ -94,6 +95,7 @@ struct NativeTensor {
swap(left.elsize_, right.elsize_);
swap(left.ptr, right.ptr);
swap(left.expanded_dim_, right.expanded_dim_);
swap(left.native_type_, right.native_type_);
for(ssize_t i = 0; i < MaxDimensions; ++i) {
swap(left.shapes_[i], right.shapes_[i]);
swap(left.strides_[i], right.strides_[i]);
Expand Down Expand Up @@ -134,6 +136,14 @@ struct NativeTensor {
return (&(reinterpret_cast<const T *>(ptr)[signed_pos]));
}

void set_is_native_type() {
native_type_ = true;
}

bool is_native_type() const {
return native_type_;
}

// returns number of elements, not bytesize
[[nodiscard]] ssize_t size() const {
return calc_elements(shape(), ndim());
Expand All @@ -142,17 +152,18 @@ struct NativeTensor {
NativeTensor &request() { return *this; }

util::MagicNum<'T','n','s','r'> magic_;
int64_t nbytes_;
int ndim_;
int64_t nbytes_ = 0;
int ndim_ = 0;
StrideContainer strides_ = {};
StrideContainer shapes_ = {};
DataType dt_;
stride_t elsize_;
const void *ptr;
stride_t elsize_ = 0;
const void *ptr = nullptr;
/// @note: when iterating strides and shapes we should use the ndim as it is the dimension reported by the
/// API providing the strides and shapes arrays, expanded_dim is what ArcticDB thinks of the tensor and using it
/// can lead to out of bounds reads from strides and shapes.
int expanded_dim_;
int32_t expanded_dim_ = 0;
bool native_type_ = false;
};

template <ssize_t> ssize_t byte_offset_impl(const stride_t* ) { return 0; }
Expand Down
56 changes: 37 additions & 19 deletions cpp/arcticdb/pipeline/frame_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ RawType* flatten_tensor(

template<typename Aggregator>
std::optional<convert::StringEncodingError> aggregator_set_data(
const pipelines::InputTensorFrame& frame,
const TypeDescriptor& type_desc,
const entity::NativeTensor& tensor,
Aggregator& agg,
Expand Down Expand Up @@ -146,28 +147,45 @@ std::optional<convert::StringEncodingError> aggregator_set_data(
column.allocate_data(rows_to_write * sizeof(entity::position_t));
auto out_ptr = reinterpret_cast<entity::position_t*>(column.buffer().data());
auto& string_pool = agg.segment().string_pool();
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*ptr_data == none.ptr()) {
*out_ptr++ = not_a_string();
} else if(is_py_nan(*ptr_data)){
*out_ptr++ = nan_placeholder();
} else {
if constexpr (is_utf_type(slice_value_type(dt))) {
wrapper_or_error = convert::py_unicode_to_buffer(*ptr_data, scoped_gil_lock);
if(tensor.is_native_type()) {
auto offset_data = reinterpret_cast<StringPool::offset_t*>(data);
offset_data += row;
auto input_string_pool = frame.string_pool();
util::check(static_cast<bool>(input_string_pool) || frame.empty(), "Expected string pool for native sequence types");
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*offset_data == not_a_string() || *offset_data == nan_placeholder()) {
*out_ptr++ =*offset_data;
} else {
wrapper_or_error = convert::pystring_to_buffer(*ptr_data, false);
}
// Cannot use util::variant_match as only one of the branches would have a return type
if (std::holds_alternative<convert::PyStringWrapper>(wrapper_or_error)) {
convert::PyStringWrapper wrapper(std::move(std::get<convert::PyStringWrapper>(wrapper_or_error)));
const auto offset = string_pool.get(wrapper.buffer_, wrapper.length_);
const auto view = input_string_pool->get_view(reinterpret_cast<StringPool::offset_t>(*offset_data));
const auto offset = string_pool.get(view);
*out_ptr++ = offset.offset();
} else if (std::holds_alternative<convert::StringEncodingError>(wrapper_or_error)) {
auto error = std::get<convert::StringEncodingError>(wrapper_or_error);
error.row_index_in_slice_ = s;
return std::optional<convert::StringEncodingError>(error);
}
}
} else {
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*ptr_data == none.ptr()) {
*out_ptr++ = not_a_string();
} else if (is_py_nan(*ptr_data)) {
*out_ptr++ = nan_placeholder();
} else {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unexpected variant alternative");
if constexpr (is_utf_type(slice_value_type(dt))) {
wrapper_or_error = convert::py_unicode_to_buffer(*ptr_data, scoped_gil_lock);
} else {
wrapper_or_error = convert::pystring_to_buffer(*ptr_data, false);
}
// Cannot use util::variant_match as only one of the branches would have a return type
if (std::holds_alternative<convert::PyStringWrapper>(wrapper_or_error)) {
convert::PyStringWrapper
wrapper(std::move(std::get<convert::PyStringWrapper>(wrapper_or_error)));
const auto offset = string_pool.get(wrapper.buffer_, wrapper.length_);
*out_ptr++ = offset.offset();
} else if (std::holds_alternative<convert::StringEncodingError>(wrapper_or_error)) {
auto error = std::get<convert::StringEncodingError>(wrapper_or_error);
error.row_index_in_slice_ = s;
return std::optional<convert::StringEncodingError>(error);
} else {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unexpected variant alternative");
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ folly::Future<entity::AtomKey> write_index(
TimeseriesDescriptor &&metadata,
std::vector<SliceAndKey> &&sk,
const IndexPartialKey &partial_key,
const std::shared_ptr<stream::StreamSink> &sink
) {
const std::shared_ptr<stream::StreamSink> &sink) {
auto slice_and_keys = std::move(sk);
IndexWriter<IndexType> writer(sink, partial_key, std::move(metadata));
for (const auto &slice_and_key : slice_and_keys) {
Expand Down
13 changes: 13 additions & 0 deletions cpp/arcticdb/pipeline/input_tensor_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include <arcticdb/entity/types.hpp>
#include <arcticdb/util/flatten_utils.hpp>

namespace arcticdb {
class StringPool;
}

namespace arcticdb::pipelines {

using namespace arcticdb::entity;
Expand All @@ -39,6 +43,15 @@ struct InputTensorFrame {
ssize_t num_rows = 0;
mutable ssize_t offset = 0;
mutable bool bucketize_dynamic = 0;
std::shared_ptr<StringPool> string_pool_;

void set_string_pool(std::shared_ptr<StringPool> string_pool) {
string_pool_ = std::move(string_pool);
}

const std::shared_ptr<StringPool>& string_pool() const {
return string_pool_;
}

void set_offset(ssize_t off) const {
offset = off;
Expand Down
Loading

0 comments on commit eed4ea8

Please sign in to comment.