Skip to content

Commit

Permalink
Post rebase fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Mar 5, 2024
1 parent 73c2a35 commit 761b0c2
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 22 deletions.
1 change: 0 additions & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,6 @@ set(arcticdb_srcs
version/version_utils.cpp
version/symbol_list.cpp
version/version_map_batch_methods.cpp
<<<<<<< HEAD
storage/mongo/mongo_client_wrapper.hpp
version/symbol_metadata.hpp
version/metadata_cache.hpp
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/pipeline/write_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ std::tuple<stream::StreamSink::PartialKey, SegmentInMemory, FrameSlice> WriteToS
if (frame_->desc.index().field_count() > 0) {
util::check(static_cast<bool>(frame_->index_tensor), "Got null index tensor in write_slices");
auto opt_error = aggregator_set_data(
*frame_,
frame_->desc.fields(0).type(),
frame_->index_tensor.value(),
agg, 0, rows_to_write, offset_in_frame, slice_num_for_column_, regular_slice_size, false);
Expand All @@ -97,6 +98,7 @@ std::tuple<stream::StreamSink::PartialKey, SegmentInMemory, FrameSlice> WriteToS
auto& fd = slice_.non_index_field(col);
auto& tensor = frame_->field_tensors[slice_.absolute_field_col(col)];
auto opt_error = aggregator_set_data(
*frame_,
fd.type(),
tensor, agg, abs_col, rows_to_write, offset_in_frame, slice_num_for_column_,
regular_slice_size, sparsify_floats_);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/mapped_file_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void MappedFileStorage::do_write(Composite<KeySegmentPair>&& kvs) {
});
}

void MappedFileStorage::do_update(Composite<KeySegmentPair>&&, UpdateOpts) {
void MappedFileStorage::do_update(Composite<KeySegmentPair>&&, storage::StorageUpdateOptions) {
util::raise_rte("Update not implemented for file storages");
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class MappedFileStorage final : public SingleFileStorage {

void do_write(Composite<KeySegmentPair>&& kvs) override;

[[noreturn]] void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) override;
[[noreturn]] void do_update(Composite<KeySegmentPair>&& kvs, storage::StorageUpdateOptions opts) override;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/test/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ inline void write_in_store(Storage &store, std::string symbol) {

inline void update_in_store(Storage &store, std::string symbol) {
auto variant_key = get_test_key(symbol);
store.update(KeySegmentPair(std::move(variant_key), get_test_segment()), arcticdb::storage::UpdateOpts{});
store.update(KeySegmentPair(std::move(variant_key), get_test_segment()), arcticdb::storage::StorageUpdateOptions{});
}

inline bool exists_in_store(Storage &store, std::string symbol) {
Expand Down
6 changes: 0 additions & 6 deletions cpp/arcticdb/storage/test/test_storage_exceptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,8 @@ TEST(S3MockStorageTest, TestPermissionErrorException) {
failureSymbol = s3::MockS3Client::get_failure_trigger("sym3", s3::S3Operation::PUT, Aws::S3::S3Errors::INVALID_ACCESS_KEY_ID);

ASSERT_THROW({
<<<<<<< HEAD
update_in_store(*storage, failureSymbol);
}, StoragePermissionException);
=======
storage->update(std::move(kv), storage::StorageUpdateOptions{});
}, PermissionException);
>>>>>>> eed4ea88 (Metadata caching)

}

TEST(S3MockStorageTest, TestS3RetryableException) {
Expand Down
8 changes: 3 additions & 5 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ VersionedItem LocalVersionedEngine::update_internal(
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const ModificationOptions& options) {
TimeseriesInfo ts_info;
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: update");
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
Expand All @@ -587,8 +586,7 @@ VersionedItem LocalVersionedEngine::update_internal(
query,
frame,
get_write_options(),
options,
ts_info);
options);

write_version(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -736,7 +734,8 @@ VersionedItem LocalVersionedEngine::write_versioned_dataframe_internal(
if(cfg().symbol_list())
symbol_list().add_symbol(store(), stream_id, versioned_item.key_.version_id());

write_version_and_prune_previous(prune_previous_versions, versioned_item.key_, deleted ? std::nullopt : maybe_prev);
write_version(prune_previous_versions, versioned_item.key_, deleted ? std::nullopt : maybe_prev);

if(cfg().metadata_cache())
write_symbol_metadata(store(), stream_id, versioned_item.key_.start_time(), versioned_item.key_.end_time(), ts_info.total_rows_, versioned_item.key_.creation_ts());

Expand Down Expand Up @@ -1421,7 +1420,6 @@ VersionedItem LocalVersionedEngine::append_internal(
} else {
if(options.upsert_) {
auto write_options = get_write_options();
version_store::TimeseriesInfo ts_info;
auto versioned_item = write_dataframe_impl(store_,
update_info.next_version_id_,
frame,
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ std::unordered_map<entity::StreamId, SymbolMetadata> get_symbol_metadata(

auto version_map = std::make_shared<VersionMap>();
auto version_futures =
batch_get_versions_async(engine.get_store(), version_map, missing_symbols, version_queries, false);
batch_get_versions_async(engine.get_store(), version_map, missing_symbols, version_queries);
std::vector<folly::Future<DescriptorItem>> descriptor_futures;
for (auto &&[idx, version_fut] : folly::enumerate(version_futures)) {
descriptor_futures.emplace_back(
Expand Down
7 changes: 3 additions & 4 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ VersionedItem update_impl(
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
const version_store::ModificationOptions& update_options,
version_store::TimeseriesInfo& ts_info) {
const version_store::ModificationOptions& update_options) {
util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into");
const StreamId stream_id = frame->desc.id();
ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id());
Expand All @@ -338,8 +337,8 @@ VersionedItem update_impl(
util::check(index_desc.kind() == IndexDescriptor::TIMESTAMP, "Update not supported for non-timeseries indexes");
sorted_data_check_update(*frame, index_segment_reader);
bool bucketize_dynamic = index_segment_reader.bucketize_dynamic();
(void)check_and_mark_slices(index_segment_reader, update_options.dynamic_schema_,, false, std::nullopt, bucketize_dynamic);
fix_descriptor_mismatch_or_throw(UPDATE, update_options.dynamic_schema_,, index_segment_reader, *frame);
(void)check_and_mark_slices(index_segment_reader, update_options.dynamic_schema_, false, std::nullopt, bucketize_dynamic);
fix_descriptor_mismatch_or_throw(UPDATE, update_options.dynamic_schema_, index_segment_reader, *frame);

std::vector<FilterQuery<index::IndexSegmentReader>> queries =
build_update_query_filters<index::IndexSegmentReader>(query.row_filter, frame->index, frame->index_range, update_options.dynamic_schema_, index_segment_reader.bucketize_dynamic());
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ VersionedItem update_impl(
const UpdateQuery & query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& write_options,
const ModificationOptions& update_options,
TimeseriesInfo& ts_info);
const ModificationOptions& update_options);

VersionedItem delete_range_impl(
const std::shared_ptr<Store>& store,
Expand Down

0 comments on commit 761b0c2

Please sign in to comment.