Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only write ref key once when writing with prune_previous_versions #1560

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 10 additions & 10 deletions build_tooling/parallel_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ echo Saving results to ${TEST_OUTPUT_DIR:="$(realpath "$tooling_dir/../cpp/out")

catch=`{ which catchsegv 2>/dev/null || echo ; } | tail -n 1`

set -o xtrace -o pipefail
set -o xtrace -o pipefail

# Build a directory that's just the test assets, so can't access other Python source not in the wheel
mkdir -p $PARALLEL_TEST_ROOT
MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/
cd $PARALLEL_TEST_ROOT
# Build a directory that's just the test assets, so can't access other Python source not in the wheel
mkdir -p $PARALLEL_TEST_ROOT
MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/
cd $PARALLEL_TEST_ROOT

export ARCTICDB_RAND_SEED=$RANDOM
export ARCTICDB_RAND_SEED=$RANDOM

$catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \
--junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \
--basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \
"$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#"
$catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \
--junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \
--basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \
"$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#"
31 changes: 10 additions & 21 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,6 @@ class VersionMapImpl {
log_write(store, key.id(), key.version_id());
}

AtomKey write_tombstone_all_key(
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
const std::shared_ptr<Store>& store,
const AtomKey& previous_key,
const std::shared_ptr<VersionMapEntry>& entry) {
auto tombstone_key = write_tombstone_all_key_internal(store, previous_key, entry);
write_symbol_ref(store, tombstone_key, std::nullopt, entry->head_.value());
return tombstone_key;
}

/**
* Tombstone all non-deleted versions of the given stream and do the related housekeeping.
* @param first_key_to_tombstone The first key in the version chain that should be tombstoned. When empty
Expand Down Expand Up @@ -298,8 +289,8 @@ class VersionMapImpl {
__FUNCTION__);
auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry);

do_write(store, key, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
auto previous_index = do_write(store, key, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), previous_index, entry->head_.value());

if (log_changes_)
log_write(store, key.id(), key.version_id());
Expand Down Expand Up @@ -369,7 +360,6 @@ class VersionMapImpl {
}
}
new_entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, new_entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
remove_entry_version_keys(store, entry, stream_id);
if (validate_)
new_entry->validate();
Expand Down Expand Up @@ -477,7 +467,6 @@ class VersionMapImpl {
entry->keys_.assign(std::begin(index_keys), std::end(index_keys));
auto new_version_id = index_keys[0].version_id();
entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
if (validate_)
entry->validate();
}
Expand All @@ -502,7 +491,10 @@ class VersionMapImpl {
return storage_reload(store, stream_id, load_param, load_param.iterate_on_failure_);
}

void do_write(
/**
* Returns the second undeleted index (after the write).
*/
std::optional<AtomKey> do_write(
std::shared_ptr<Store> store,
const AtomKey &key,
const std::shared_ptr<VersionMapEntry> &entry) {
Expand All @@ -512,7 +504,7 @@ class VersionMapImpl {
auto journal_key = to_atom(std::move(journal_single_key(store, key, entry->head_)));
write_to_entry(entry, key, journal_key);
auto previous_index = entry->get_second_undeleted_index();
write_symbol_ref(store, key, previous_index, journal_key);
return previous_index;
alexowens90 marked this conversation as resolved.
Show resolved Hide resolved
}

AtomKey write_tombstone(
Expand Down Expand Up @@ -902,8 +894,7 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, false);
remove_duplicate_index_keys(entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
rewrite_entry(store, stream_id, entry);
}

void remove_and_rewrite_version_keys(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand All @@ -913,9 +904,8 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, true);
remove_duplicate_index_keys(entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
rewrite_entry(store, stream_id, entry);
remove_entry_version_keys(store, old_entry, stream_id);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

void fix_ref_key(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand Down Expand Up @@ -964,8 +954,7 @@ class VersionMapImpl {

entry->keys_.insert(std::begin(entry->keys_), std::begin(missing_versions), std::end(missing_versions));
entry->sort();
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
rewrite_entry(store, stream_id, entry);
}

std::shared_ptr<Lock> get_lock_object(const StreamId& stream_id) const {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time
import pytest
from multiprocessing import Process, Queue


@pytest.fixture
def writer_store(lmdb_version_store_delayed_deletes_v2):
return lmdb_version_store_delayed_deletes_v2


@pytest.fixture
def reader_store(lmdb_version_store_delayed_deletes_v2):
return lmdb_version_store_delayed_deletes_v2


def read_repeatedly(version_store, queue: Queue):
while True:
try:
version_store.read("sym")
except Exception as e:
queue.put(e)
raise # don't get stuck in the while loop when we already know there's an issue
time.sleep(0.1)


def write_repeatedly(version_store):
while True:
version_store.write("sym", [1, 2, 3], prune_previous_version=True)
time.sleep(0.1)


def test_concurrent_read_write(writer_store, reader_store):
"""When using delayed deletes, a reader should always be able to read a symbol even if it is being modified
and pruned by another process."""
writer_store.write("sym", [1, 2, 3], prune_previous_version=True)
exceptions_in_reader = Queue()
reader = Process(target=read_repeatedly, args=(reader_store, exceptions_in_reader))
writer = Process(target=write_repeatedly, args=(writer_store,))

try:
reader.start()
writer.start()
reader.join(5)
writer.join(0.001)
finally:
writer.terminate()
reader.terminate()

assert exceptions_in_reader.empty()