From e5be2f34f52447627e4d7434c79ff855345ec21a Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Thu, 9 May 2024 11:50:29 +0100 Subject: [PATCH 1/4] Only write ref key once when writing with prune_previous_versions --- cpp/arcticdb/version/version_map.hpp | 20 +++------ .../test_concurrent_read_and_write.py | 45 +++++++++++++++++++ 2 files changed, 52 insertions(+), 13 deletions(-) create mode 100644 python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 134cae66a4..7d4f1eb465 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -246,15 +246,6 @@ class VersionMapImpl { log_write(store, key.id(), key.version_id()); } - AtomKey write_tombstone_all_key( - const std::shared_ptr& store, - const AtomKey& previous_key, - const std::shared_ptr& entry) { - auto tombstone_key = write_tombstone_all_key_internal(store, previous_key, entry); - write_symbol_ref(store, tombstone_key, std::nullopt, entry->head_.value()); - return tombstone_key; - } - /** * Tombstone all non-deleted versions of the given stream and do the related housekeeping. * @param first_key_to_tombstone The first key in the version chain that should be tombstoned. When empty @@ -298,8 +289,8 @@ class VersionMapImpl { __FUNCTION__); auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry); - do_write(store, key, entry); - write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value()); + auto previous_index = do_write(store, key, entry); + write_symbol_ref(store, *entry->keys_.cbegin(), previous_index, entry->head_.value()); if (log_changes_) log_write(store, key.id(), key.version_id()); @@ -502,7 +493,10 @@ class VersionMapImpl { return storage_reload(store, stream_id, load_param, load_param.iterate_on_failure_); } - void do_write( + /** + * Returns the second undeleted index (after the write). + */ + std::optional do_write( std::shared_ptr store, const AtomKey &key, const std::shared_ptr &entry) { @@ -512,7 +506,7 @@ class VersionMapImpl { auto journal_key = to_atom(std::move(journal_single_key(store, key, entry->head_))); write_to_entry(entry, key, journal_key); auto previous_index = entry->get_second_undeleted_index(); - write_symbol_ref(store, key, previous_index, journal_key); + return previous_index; } AtomKey write_tombstone( diff --git a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py new file mode 100644 index 0000000000..b050ecf9e3 --- /dev/null +++ b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py @@ -0,0 +1,45 @@ +import time +import pytest +from multiprocessing import Process, Queue + + +@pytest.fixture +def writer_store(lmdb_version_store_delayed_deletes_v2): + return lmdb_version_store_delayed_deletes_v2 + + +@pytest.fixture +def reader_store(lmdb_version_store_delayed_deletes_v2): + return lmdb_version_store_delayed_deletes_v2 + + +def read_repeatedly(version_store, queue: Queue): + while True: + try: + version_store.read("sym") + except Exception as e: + queue.put(e) + raise # don't get stuck in the while loop when we already know there's an issue + time.sleep(0.1) + + +def write_repeatedly(version_store): + while True: + version_store.write("sym", [1, 2, 3], prune_previous_version=True) + time.sleep(0.1) + + +def test_concurrent_read_write(writer_store, reader_store): + """When using delayed deletes, a reader should always be able to read a symbol even if it is being modified + and pruned by another process.""" + writer_store.write("sym", [1, 2, 3], prune_previous_version=True) + exceptions_in_reader = Queue() + reader = Process(target=read_repeatedly, args=(reader_store, exceptions_in_reader)) + writer = Process(target=write_repeatedly, args=(writer_store,)) + + reader.start() + writer.start() + reader.join(5) + writer.join(0.001) + + assert exceptions_in_reader.empty() From 6ca50a05f0c63fa6844e91c3038b9b168a856b04 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Wed, 15 May 2024 08:18:30 +0100 Subject: [PATCH 2/4] Terminate the child processes so that pytest can terminate, otherwise pytest continues to run indefinitely even after the tests have passed. --- build_tooling/parallel_test.sh | 20 +++++++++---------- .../test_concurrent_read_and_write.py | 2 ++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/build_tooling/parallel_test.sh b/build_tooling/parallel_test.sh index 7410b1c618..cc12308a41 100755 --- a/build_tooling/parallel_test.sh +++ b/build_tooling/parallel_test.sh @@ -9,16 +9,16 @@ echo Saving results to ${TEST_OUTPUT_DIR:="$(realpath "$tooling_dir/../cpp/out") catch=`{ which catchsegv 2>/dev/null || echo ; } | tail -n 1` - set -o xtrace -o pipefail +set -o xtrace -o pipefail - # Build a directory that's just the test assets, so can't access other Python source not in the wheel - mkdir -p $PARALLEL_TEST_ROOT - MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/ - cd $PARALLEL_TEST_ROOT +# Build a directory that's just the test assets, so can't access other Python source not in the wheel +mkdir -p $PARALLEL_TEST_ROOT +MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/ +cd $PARALLEL_TEST_ROOT - export ARCTICDB_RAND_SEED=$RANDOM +export ARCTICDB_RAND_SEED=$RANDOM - $catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \ - --junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \ - --basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \ - "$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#" +$catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \ + --junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \ + --basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \ + "$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#" diff --git a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py index b050ecf9e3..d2767c5626 100644 --- a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py +++ b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py @@ -41,5 +41,7 @@ def test_concurrent_read_write(writer_store, reader_store): writer.start() reader.join(5) writer.join(0.001) + writer.terminate() + reader.terminate() assert exceptions_in_reader.empty() From 5d863ebb3061036c791ea9dd50f8b300b2eb9db4 Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Wed, 15 May 2024 09:53:45 +0100 Subject: [PATCH 3/4] Fix places where write_entry_to_storage and its callers both write the symbol ref key. Usage: write_entry_to_storage |_ compact_and_remove_deleted_indexes (unused, no impact) |_ writes the symbol ref a second time |_ overwrite_symbol_tree |_ writes the symbol ref a second time |_ rewrite_entry |_ recover_deleted |_ writes the symbol ref a second time |_ remove_and_rewrite_version_keys |_ writes the symbol ref a second time |_ scan_and_rewrite |_ writes the symbol ref a second time overwrite_symbol_tree is used in: initial sync process in replication (no impact really, we don't expect stuff to be readable during the initial sync) fix_symbol_trees (unused) recover_deleted is only used in tests remove_and_rewrite_version_keys is unused scan_and_rewrite is used in fix_ref_key which is unused Testing limited as most call sites are unused. I verified that replication still works correctly after this change. --- cpp/arcticdb/version/version_map.hpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index 7d4f1eb465..a196ad2130 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -360,7 +360,6 @@ class VersionMapImpl { } } new_entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, new_entry); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); remove_entry_version_keys(store, entry, stream_id); if (validate_) new_entry->validate(); @@ -468,7 +467,6 @@ class VersionMapImpl { entry->keys_.assign(std::begin(index_keys), std::end(index_keys)); auto new_version_id = index_keys[0].version_id(); entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, entry); - write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value()); if (validate_) entry->validate(); } @@ -896,8 +894,7 @@ class VersionMapImpl { entry->clear(); load_via_iteration(store, stream_id, entry, false); remove_duplicate_index_keys(entry); - auto new_entry = rewrite_entry(store, stream_id, entry); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); + rewrite_entry(store, stream_id, entry); } void remove_and_rewrite_version_keys(std::shared_ptr store, const StreamId& stream_id) { @@ -907,9 +904,8 @@ class VersionMapImpl { entry->clear(); load_via_iteration(store, stream_id, entry, true); remove_duplicate_index_keys(entry); - auto new_entry = rewrite_entry(store, stream_id, entry); + rewrite_entry(store, stream_id, entry); remove_entry_version_keys(store, old_entry, stream_id); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); } void fix_ref_key(std::shared_ptr store, const StreamId& stream_id) { @@ -958,8 +954,7 @@ class VersionMapImpl { entry->keys_.insert(std::begin(entry->keys_), std::begin(missing_versions), std::end(missing_versions)); entry->sort(); - auto new_entry = rewrite_entry(store, stream_id, entry); - write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value()); + rewrite_entry(store, stream_id, entry); } std::shared_ptr get_lock_object(const StreamId& stream_id) const { From 88d534e654c4c79e9c4c949e66682fd5202de6ff Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Wed, 15 May 2024 09:56:00 +0100 Subject: [PATCH 4/4] Make sure that we always terminate the child processes so that we don't hang the CI suite for 6 hours... --- .../test_concurrent_read_and_write.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py index d2767c5626..4452e8b1b1 100644 --- a/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py +++ b/python/tests/stress/arcticdb/version_store/test_concurrent_read_and_write.py @@ -37,11 +37,13 @@ def test_concurrent_read_write(writer_store, reader_store): reader = Process(target=read_repeatedly, args=(reader_store, exceptions_in_reader)) writer = Process(target=write_repeatedly, args=(writer_store,)) - reader.start() - writer.start() - reader.join(5) - writer.join(0.001) - writer.terminate() - reader.terminate() + try: + reader.start() + writer.start() + reader.join(5) + writer.join(0.001) + finally: + writer.terminate() + reader.terminate() assert exceptions_in_reader.empty()