Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Metadata cache work #1361

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,13 @@ set(arcticdb_srcs
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/mongo/mongo_client_wrapper.hpp
)
version/symbol_metadata.hpp
version/metadata_cache.hpp
util/frame_adapter.hpp
version/symbol_metadata.hpp
version/metadata_cache.hpp
version/metadata_cache.cpp
version/symbol_metadata.cpp)

if(${ARCTICDB_INCLUDE_ROCKSDB})
list (APPEND arcticdb_srcs
Expand Down
16 changes: 14 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ entity::VariantKey write_sync(
const StreamId &stream_id,
IndexValue start_index,
IndexValue end_index,
timestamp creation_ts,
SegmentInMemory &&segment) override {

util::check(segment.descriptor().id() == stream_id,
Expand All @@ -128,12 +129,23 @@ entity::VariantKey write_sync(
segment.descriptor().id());

auto encoded = EncodeAtomTask{
key_type, version_id, stream_id, start_index, end_index, current_timestamp(),
key_type, version_id, stream_id, start_index, end_index, creation_ts,
std::move(segment), codec_, encoding_version_
}();
return WriteSegmentTask{library_}(std::move(encoded));
}

entity::VariantKey write_sync(
stream::KeyType key_type,
VersionId version_id,
const StreamId &stream_id,
IndexValue start_index,
IndexValue end_index,
SegmentInMemory &&segment) override {

return write_sync(key_type, version_id, stream_id, start_index, end_index, current_timestamp(), std::move(segment));
}

entity::VariantKey write_sync(PartialKey pk, SegmentInMemory &&segment) override {
return write_sync(pk.key_type, pk.version_id, pk.stream_id, pk.start_index, pk.end_index, std::move(segment));
}
Expand All @@ -157,7 +169,7 @@ void write_compressed_sync(storage::KeySegmentPair &&ks) override {

folly::Future<entity::VariantKey> update(const entity::VariantKey &key,
SegmentInMemory &&segment,
storage::UpdateOpts opts) override {
storage::StorageUpdateOptions opts) override {
auto stream_id = variant_key_id(key);
util::check(segment.descriptor().id() == stream_id,
"Descriptor id mismatch in variant key {} != {}",
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ struct WriteSegmentTask : BaseTask {

struct UpdateSegmentTask : BaseTask {
std::shared_ptr<storage::Library> lib_;
storage::UpdateOpts opts_;
storage::StorageUpdateOptions opts_;

explicit UpdateSegmentTask(std::shared_ptr<storage::Library> lib, storage::UpdateOpts opts) :
explicit UpdateSegmentTask(std::shared_ptr<storage::Library> lib, storage::StorageUpdateOptions opts) :
lib_(std::move(lib)),
opts_(opts) {
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/column_store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ void Column::append_sparse_map(const util::BitMagic& bv, position_t at_row) {
void Column::append(const Column& other, position_t at_row) {
if (other.row_count() == 0)
return;

util::check(type() == other.type(), "Cannot append column type {} to column type {}", type(), other.type());
const bool was_sparse = is_sparse();
const bool was_empty = empty();
Expand Down
23 changes: 17 additions & 6 deletions cpp/arcticdb/entity/native_tensor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct NativeTensor {
dt_(other.dt_),
elsize_(other.elsize_),
ptr(other.ptr),
expanded_dim_(other.expanded_dim_){
expanded_dim_(other.expanded_dim_),
native_type_(other.native_type_){
for (ssize_t i = 0; i < std::min(MaxDimensions, ndim_); ++i)
shapes_[i] = other.shapes_[i];

Expand All @@ -94,6 +95,7 @@ struct NativeTensor {
swap(left.elsize_, right.elsize_);
swap(left.ptr, right.ptr);
swap(left.expanded_dim_, right.expanded_dim_);
swap(left.native_type_, right.native_type_);
for(ssize_t i = 0; i < MaxDimensions; ++i) {
swap(left.shapes_[i], right.shapes_[i]);
swap(left.strides_[i], right.strides_[i]);
Expand Down Expand Up @@ -134,6 +136,14 @@ struct NativeTensor {
return (&(reinterpret_cast<const T *>(ptr)[signed_pos]));
}

void set_is_native_type() {
native_type_ = true;
}

bool is_native_type() const {
return native_type_;
}

// returns number of elements, not bytesize
[[nodiscard]] ssize_t size() const {
return calc_elements(shape(), ndim());
Expand All @@ -142,17 +152,18 @@ struct NativeTensor {
NativeTensor &request() { return *this; }

util::MagicNum<'T','n','s','r'> magic_;
int64_t nbytes_;
int ndim_;
int64_t nbytes_ = 0;
int ndim_ = 0;
StrideContainer strides_ = {};
StrideContainer shapes_ = {};
DataType dt_;
stride_t elsize_;
const void *ptr;
stride_t elsize_ = 0;
const void *ptr = nullptr;
/// @note: when iterating strides and shapes we should use the ndim as it is the dimension reported by the
/// API providing the strides and shapes arrays, expanded_dim is what ArcticDB thinks of the tensor and using it
/// can lead to out of bounds reads from strides and shapes.
int expanded_dim_;
int32_t expanded_dim_ = 0;
bool native_type_ = false;
};

template <ssize_t> ssize_t byte_offset_impl(const stride_t* ) { return 0; }
Expand Down
56 changes: 37 additions & 19 deletions cpp/arcticdb/pipeline/frame_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ RawType* flatten_tensor(

template<typename Aggregator>
std::optional<convert::StringEncodingError> aggregator_set_data(
const pipelines::InputTensorFrame& frame,
const TypeDescriptor& type_desc,
const entity::NativeTensor& tensor,
Aggregator& agg,
Expand Down Expand Up @@ -146,28 +147,45 @@ std::optional<convert::StringEncodingError> aggregator_set_data(
column.allocate_data(rows_to_write * sizeof(entity::position_t));
auto out_ptr = reinterpret_cast<entity::position_t*>(column.buffer().data());
auto& string_pool = agg.segment().string_pool();
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*ptr_data == none.ptr()) {
*out_ptr++ = not_a_string();
} else if(is_py_nan(*ptr_data)){
*out_ptr++ = nan_placeholder();
} else {
if constexpr (is_utf_type(slice_value_type(dt))) {
wrapper_or_error = convert::py_unicode_to_buffer(*ptr_data, scoped_gil_lock);
if(tensor.is_native_type()) {
auto offset_data = reinterpret_cast<StringPool::offset_t*>(data);
offset_data += row;
auto input_string_pool = frame.string_pool();
util::check(static_cast<bool>(input_string_pool) || frame.empty(), "Expected string pool for native sequence types");
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*offset_data == not_a_string() || *offset_data == nan_placeholder()) {
*out_ptr++ =*offset_data;
} else {
wrapper_or_error = convert::pystring_to_buffer(*ptr_data, false);
}
// Cannot use util::variant_match as only one of the branches would have a return type
if (std::holds_alternative<convert::PyStringWrapper>(wrapper_or_error)) {
convert::PyStringWrapper wrapper(std::move(std::get<convert::PyStringWrapper>(wrapper_or_error)));
const auto offset = string_pool.get(wrapper.buffer_, wrapper.length_);
const auto view = input_string_pool->get_view(reinterpret_cast<StringPool::offset_t>(*offset_data));
const auto offset = string_pool.get(view);
*out_ptr++ = offset.offset();
} else if (std::holds_alternative<convert::StringEncodingError>(wrapper_or_error)) {
auto error = std::get<convert::StringEncodingError>(wrapper_or_error);
error.row_index_in_slice_ = s;
return std::optional<convert::StringEncodingError>(error);
}
}
} else {
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
if (*ptr_data == none.ptr()) {
*out_ptr++ = not_a_string();
} else if (is_py_nan(*ptr_data)) {
*out_ptr++ = nan_placeholder();
} else {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unexpected variant alternative");
if constexpr (is_utf_type(slice_value_type(dt))) {
wrapper_or_error = convert::py_unicode_to_buffer(*ptr_data, scoped_gil_lock);
} else {
wrapper_or_error = convert::pystring_to_buffer(*ptr_data, false);
}
// Cannot use util::variant_match as only one of the branches would have a return type
if (std::holds_alternative<convert::PyStringWrapper>(wrapper_or_error)) {
convert::PyStringWrapper
wrapper(std::move(std::get<convert::PyStringWrapper>(wrapper_or_error)));
const auto offset = string_pool.get(wrapper.buffer_, wrapper.length_);
*out_ptr++ = offset.offset();
} else if (std::holds_alternative<convert::StringEncodingError>(wrapper_or_error)) {
auto error = std::get<convert::StringEncodingError>(wrapper_or_error);
error.row_index_in_slice_ = s;
return std::optional<convert::StringEncodingError>(error);
} else {
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unexpected variant alternative");
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/pipeline/index_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ folly::Future<entity::AtomKey> write_index(
TimeseriesDescriptor &&metadata,
std::vector<SliceAndKey> &&sk,
const IndexPartialKey &partial_key,
const std::shared_ptr<stream::StreamSink> &sink
) {
const std::shared_ptr<stream::StreamSink> &sink) {
auto slice_and_keys = std::move(sk);
IndexWriter<IndexType> writer(sink, partial_key, std::move(metadata));
for (const auto &slice_and_key : slice_and_keys) {
Expand Down
13 changes: 13 additions & 0 deletions cpp/arcticdb/pipeline/input_tensor_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#include <arcticdb/entity/types.hpp>
#include <arcticdb/util/flatten_utils.hpp>

namespace arcticdb {
class StringPool;
}

namespace arcticdb::pipelines {

using namespace arcticdb::entity;
Expand All @@ -39,6 +43,15 @@ struct InputTensorFrame {
ssize_t num_rows = 0;
mutable ssize_t offset = 0;
mutable bool bucketize_dynamic = 0;
std::shared_ptr<StringPool> string_pool_;

void set_string_pool(std::shared_ptr<StringPool> string_pool) {
string_pool_ = std::move(string_pool);
}

const std::shared_ptr<StringPool>& string_pool() const {
return string_pool_;
}

void set_offset(ssize_t off) const {
offset = off;
Expand Down
Loading