Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Mar 6, 2024
1 parent 106fda3 commit f5f7bb2
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 32 deletions.
20 changes: 15 additions & 5 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ folly::Future<DescriptorItem> LocalVersionedEngine::get_descriptor_async(
return std::move(version_fut)
.thenValue([this, &stream_id, &version_query](std::optional<AtomKey>&& key){
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(key.has_value(),
"Unable to retrieve descriptor data. {}@{}: version not found", stream_id, version_query);
"Unable to retrieve descriptor data in get_descriptor_async. {}@{}: version not found", stream_id, version_query);
return get_descriptor(std::move(key.value()));
}).via(&async::cpu_executor());
}
Expand Down Expand Up @@ -678,7 +678,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
})
.thenValue([this, prune_previous_versions](auto&& index_key_and_update_info){
auto&& [index_key, update_info] = index_key_and_update_info;
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions, !update_info.previous_index_key_.has_value());
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions, !update_info.previous_index_key_.has_value(), cfg().metadata_cache());
}));
}

Expand Down Expand Up @@ -1259,7 +1259,8 @@ folly::Future<VersionedItem> LocalVersionedEngine::write_index_key_to_version_ma
AtomKey&& index_key,
UpdateInfo&& stream_update_info,
bool prune_previous_versions,
bool add_new_symbol = true) {
bool add_new_symbol,
bool add_metadata_cache) {

folly::Future<folly::Unit> write_version_fut;

Expand All @@ -1272,12 +1273,21 @@ folly::Future<VersionedItem> LocalVersionedEngine::write_index_key_to_version_ma
write_version_fut = async::submit_io_task(WriteVersionTask{store(), version_map, index_key, stream_update_info.previous_index_key_});
}

if(add_metadata_cache) {
write_version_fut = std::move(write_version_fut).then([this, index_key] (auto&&) {
return async::submit_io_task(WriteSymbolMetadataTask{store(), index_key.id(), index_key.start_index(), index_key.end_index(), 0, index_key.creation_ts()});
});
};

if(add_new_symbol){
write_version_fut = std::move(write_version_fut)
.then([this, index_key_id = index_key.id(), reference_id = index_key.version_id()](auto &&) {
return async::submit_io_task(WriteSymbolTask(store(), symbol_list_ptr(), index_key_id, reference_id));
});
}



return std::move(write_version_fut)
.thenValue([index_key = std::move(index_key)](auto &&) mutable {
return VersionedItem(std::move(index_key));
Expand Down Expand Up @@ -1365,7 +1375,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
})
.thenValue([this, prune_previous_versions](auto&& index_key_and_update_info){
auto&& [index_key, update_info] = index_key_and_update_info;
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions);
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions, true, cfg().metadata_cache());
})
);
}
Expand Down Expand Up @@ -1476,7 +1486,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
})
.thenValue([this, prune_previous_versions=options.prune_previous_versions_](auto&& index_key_and_update_info) -> folly::Future<VersionedItem> {
auto&& [index_key, update_info] = index_key_and_update_info;
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions);
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions, true, cfg().metadata_cache());
})
);
}
Expand Down
3 changes: 2 additions & 1 deletion cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ class LocalVersionedEngine : public VersionedEngine {
AtomKey&& index_key,
UpdateInfo&& stream_update_info,
bool prune_previous_versions,
bool add_new_symbol);
bool add_new_symbol,
bool add_metadata_cache);

void write_version(
bool prune_previous_versions,
Expand Down
27 changes: 15 additions & 12 deletions cpp/arcticdb/version/metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,37 +83,40 @@ std::unordered_map<entity::StreamId, SymbolMetadata> get_symbol_metadata(
descriptor_futures.emplace_back(
engine.get_descriptor_async(std::move(version_fut), missing_symbols[idx], version_queries[idx]));
}
auto descriptors = folly::collect(descriptor_futures).get();
auto descriptors = folly::collectAll(descriptor_futures).get();
for (const auto &item : descriptors) {
arcticdb::proto::descriptors::TimeSeriesDescriptor tsd;
item.timeseries_descriptor()->UnpackTo(&tsd);
output.try_emplace(item.symbol(),
item.creation_ts(),
as_time(*item.start_index()),
as_time(*item.end_index()),
tsd.total_rows());
if(item.hasValue()) {
arcticdb::proto::descriptors::TimeSeriesDescriptor tsd;
item->timeseries_descriptor()->UnpackTo(&tsd);
output.try_emplace(item->symbol(),
item->creation_ts(),
as_time(*item->start_index()),
as_time(*item->end_index()),
tsd.total_rows());
}
}
}
return output;
}

void compact_symbol_metadata(
VersionedItem compact_symbol_metadata(
version_store::LocalVersionedEngine &engine) {
auto [segment, keys] = compact_metadata_keys(engine.get_store());
const auto compacted_rows = segment.row_count();
if(compacted_rows == 0) {
util::check(keys.empty(), "No rows in metadata cache segment but collected {} keys", keys.size());
ARCTICDB_DEBUG(log::version(), "Compact symbol metadata found no keys to compact");
return;
return VersionedItem{};
}

SegmentToInputFrameAdapter frame_adapter(std::move(segment));
version_store::ModificationOptions options;
options.upsert_ = true;
version_store::TimeseriesInfo ts_info;
engine.append_internal(StringId{MetadataSymbol}, frame_adapter.input_frame_, options);
auto versioned_item = engine.append_internal(StringId{MetadataSymbol}, frame_adapter.input_frame_, options);
ARCTICDB_DEBUG(log::version(), "Compact symbol metadata found {} keys and compacted {} rows", keys.size(), compacted_rows);
//delete_keys(engine.get_store(), std::move(keys), {});
delete_keys(engine.get_store(), std::move(keys), {});
return versioned_item;
}

} // namespace arcticdb
2 changes: 1 addition & 1 deletion cpp/arcticdb/version/metadata_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ std::unordered_map<entity::StreamId, SymbolMetadata> get_symbol_metadata(
timestamp from_time,
uint64_t lookback_seconds);

void compact_symbol_metadata(
VersionedItem compact_symbol_metadata(
version_store::LocalVersionedEngine &engine);

} // namespace arcticdb
4 changes: 4 additions & 0 deletions cpp/arcticdb/version/symbol_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ std::pair<SegmentInMemory, std::vector<AtomKey>> compact_metadata_keys(const std
using namespace arcticdb::stream;

auto keys = scan_metadata_keys(store, [] (const auto&) { return true; });
std::sort(std::begin(keys), std::end(keys), [] (const auto& l, const auto& r) {
return l.creation_ts() < r.creation_ts();
});

using AggregatorType = Aggregator<stream::TimeseriesIndex, FixedSchema, NeverSegmentPolicy>;
SegmentInMemory output;
AggregatorType agg{FixedSchema{symbol_metadata_descriptor(), symbol_metadata_index()}, [&output](auto&& segment) {
Expand Down
29 changes: 29 additions & 0 deletions cpp/arcticdb/version/symbol_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,33 @@ constexpr timestamp calc_start_time(
) {
return from_time - (static_cast<timestamp>(lookback_seconds) * 1000000000);
}

struct WriteSymbolMetadataTask : async::BaseTask {
std::shared_ptr<Store> store_;
entity::StreamId symbol_;
entity::IndexValue start_index_;
entity::IndexValue end_index_;
uint64_t total_rows_;
entity::timestamp update_time_;

WriteSymbolMetadataTask(
std::shared_ptr<Store> store,
entity::StreamId symbol,
entity::IndexValue start_index,
entity::IndexValue end_index,
uint64_t total_rows,
entity::timestamp update_time) :
store_(std::move(store)),
symbol_(std::move(symbol)),
start_index_(std::move(start_index)),
end_index_(std::move(end_index)),
total_rows_(total_rows),
update_time_(update_time) {
}

folly::Unit operator()() const {
write_symbol_metadata(store_, symbol_, start_index_, end_index_, total_rows_, update_time_);
return folly::Unit{};
}
};
} //namespace arcticdb
2 changes: 1 addition & 1 deletion python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def lmdb_version_store_column_buckets(version_store_factory):

@pytest.fixture
def lmdb_version_store_metadata_cache(version_store_factory):
return version_store_factory(metadata_cache=True)
return version_store_factory(lmdb_config={"map_size": 2**30}, metadata_cache=True)


@pytest.fixture
Expand Down
26 changes: 14 additions & 12 deletions python/tests/unit/arcticdb/version_store/test_metadata_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,32 @@ def test_metadata_cache_compact(lmdb_version_store_metadata_cache):
lib.write("symbol", df)
lib.version_store.compact_symbol_info_cache()
cache = lib.version_store.get_symbol_info_cache(["symbol"], int(pd.Timestamp.utcnow().value), 1000)
print(cache)
assert len(cache) == 1
assert cache["symbol"].total_rows == 10


def test_metadata_cache_stress(lmdb_version_store_metadata_cache):
lib = lmdb_version_store_metadata_cache
num_symbols = 10000;
num_symbols = 100000;
dfs = []
symbols = []
df = pd.DataFrame({"x": np.arange(10)})
for i in range(num_symbols):
lib.write("symbol_{}".format(i), df)
dfs.append(df)
symbols.append("symbol_{}".format(i))

lib.batch_write(symbols, dfs)

time.sleep(2)
start = time.time()
lib.version_store.compact_symbol_info_cache()
vit = lib.version_store.compact_symbol_info_cache()
elapsed = time.time() - start
print("Compaction time: {}".format(elapsed))
print(vit)

lib.version_store.compact_symbol_info_cache()
lib.version_store.get_symbol_info_cache(["symbol"], int(pd.Timestamp.utcnow().value), 1000)
elapsed = time.time() - start
print("Read time: {}".format(elapsed))
print(cache)


def test_metadata_cache_batch_write(lmdb_version_store_metadata_cache):
Expand All @@ -51,12 +56,9 @@ def test_metadata_cache_batch_write(lmdb_version_store_metadata_cache):
symbols.append("symbol_{}".format(i))

lib.batch_write(symbols, dfs)
time.sleep(2)

start = time.time()
lib.version_store.compact_symbol_info_cache()
elapsed = time.time() - start
print("Compaction time: {}".format(elapsed))

lib.version_store.compact_symbol_info_cache()
elapsed = time.time() - start
print("Compaction time: {}".format(elapsed))
cache = lib.version_store.get_symbol_info_cache(["symbol"], int(pd.Timestamp.utcnow().value), 1)
assert len(cache) == num_symbols

0 comments on commit f5f7bb2

Please sign in to comment.