diff --git a/build_tooling/parallel_test.sh b/build_tooling/parallel_test.sh index 7410b1c618..cc12308a41 100755 --- a/build_tooling/parallel_test.sh +++ b/build_tooling/parallel_test.sh @@ -9,16 +9,16 @@ echo Saving results to ${TEST_OUTPUT_DIR:="$(realpath "$tooling_dir/../cpp/out") catch=`{ which catchsegv 2>/dev/null || echo ; } | tail -n 1` - set -o xtrace -o pipefail +set -o xtrace -o pipefail - # Build a directory that's just the test assets, so can't access other Python source not in the wheel - mkdir -p $PARALLEL_TEST_ROOT - MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/ - cd $PARALLEL_TEST_ROOT +# Build a directory that's just the test assets, so can't access other Python source not in the wheel +mkdir -p $PARALLEL_TEST_ROOT +MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/ +cd $PARALLEL_TEST_ROOT - export ARCTICDB_RAND_SEED=$RANDOM +export ARCTICDB_RAND_SEED=$RANDOM - $catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \ - --junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \ - --basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \ - "$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#" +$catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \ + --junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \ + --basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \ + "$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#" diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 134cae66a4..a196ad2130 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -246,15 +246,6 @@ class VersionMapImpl { log_write(store, key.id(), key.version_id()); } - AtomKey write_tombstone_all_key( - const std::shared_ptr& store, - const AtomKey& previous_key, - const std::shared_ptr& entry) { - auto tombstone_key = write_tombstone_all_key_internal(store, previous_key, entry); - write_symbol_ref(store, tombstone_key, std::nullopt, entry->head_.value()); - return tombstone_key; - } - /** * Tombstone all non-deleted versions of the given stream and do the related housekeeping. * @param first_key_to_tombstone The first key in the version chain that should be tombstoned. When empty @@ -298,8 +289,8 @@ class VersionMapImpl { __FUNCTION__); auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry); - do_write(store, key, entry); - write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value()); + auto previous_index = do_write(store, key, entry); + write_symbol_ref(store, *entry->keys_.cbegin(), previous_index, entry->head_.value()); if (log_changes_) log_write(store, key.id(), key.version_id()); @@ -369,7 +360,6 @@ class VersionMapImpl { } } new_entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, new_entry); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); remove_entry_version_keys(store, entry, stream_id); if (validate_) new_entry->validate(); @@ -477,7 +467,6 @@ class VersionMapImpl { entry->keys_.assign(std::begin(index_keys), std::end(index_keys)); auto new_version_id = index_keys[0].version_id(); entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, entry); - write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value()); if (validate_) entry->validate(); } @@ -502,7 +491,10 @@ class VersionMapImpl { return storage_reload(store, stream_id, load_param, load_param.iterate_on_failure_); } - void do_write( + /** + * Returns the second undeleted index (after the write). + */ + std::optional do_write( std::shared_ptr store, const AtomKey &key, const std::shared_ptr &entry) { @@ -512,7 +504,7 @@ class VersionMapImpl { auto journal_key = to_atom(std::move(journal_single_key(store, key, entry->head_))); write_to_entry(entry, key, journal_key); auto previous_index = entry->get_second_undeleted_index(); - write_symbol_ref(store, key, previous_index, journal_key); + return previous_index; } AtomKey write_tombstone( @@ -902,8 +894,7 @@ class VersionMapImpl { entry->clear(); load_via_iteration(store, stream_id, entry, false); remove_duplicate_index_keys(entry); - auto new_entry = rewrite_entry(store, stream_id, entry); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); + rewrite_entry(store, stream_id, entry); } void remove_and_rewrite_version_keys(std::shared_ptr store, const StreamId& stream_id) { @@ -913,9 +904,8 @@ class VersionMapImpl { entry->clear(); load_via_iteration(store, stream_id, entry, true); remove_duplicate_index_keys(entry); - auto new_entry = rewrite_entry(store, stream_id, entry); + rewrite_entry(store, stream_id, entry); remove_entry_version_keys(store, old_entry, stream_id); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); } void fix_ref_key(std::shared_ptr store, const StreamId& stream_id) { @@ -964,8 +954,7 @@ class VersionMapImpl { entry->keys_.insert(std::begin(entry->keys_), std::begin(missing_versions), std::end(missing_versions)); entry->sort(); - auto new_entry = rewrite_entry(store, stream_id, entry); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); + rewrite_entry(store, stream_id, entry); } std::shared_ptr get_lock_object(const StreamId& stream_id) const { diff --git a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py new file mode 100644 index 0000000000..4452e8b1b1 --- /dev/null +++ b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py @@ -0,0 +1,49 @@ +import time +import pytest +from multiprocessing import Process, Queue + + +@pytest.fixture +def writer_store(lmdb_version_store_delayed_deletes_v2): + return lmdb_version_store_delayed_deletes_v2 + + +@pytest.fixture +def reader_store(lmdb_version_store_delayed_deletes_v2): + return lmdb_version_store_delayed_deletes_v2 + + +def read_repeatedly(version_store, queue: Queue): + while True: + try: + version_store.read("sym") + except Exception as e: + queue.put(e) + raise # don't get stuck in the while loop when we already know there's an issue + time.sleep(0.1) + + +def write_repeatedly(version_store): + while True: + version_store.write("sym", [1, 2, 3], prune_previous_version=True) + time.sleep(0.1) + + +def test_concurrent_read_write(writer_store, reader_store): + """When using delayed deletes, a reader should always be able to read a symbol even if it is being modified + and pruned by another process.""" + writer_store.write("sym", [1, 2, 3], prune_previous_version=True) + exceptions_in_reader = Queue() + reader = Process(target=read_repeatedly, args=(reader_store, exceptions_in_reader)) + writer = Process(target=write_repeatedly, args=(writer_store,)) + + try: + reader.start() + writer.start() + reader.join(5) + writer.join(0.001) + finally: + writer.terminate() + reader.terminate() + + assert exceptions_in_reader.empty()