Skip to content

Commit

Permalink
materialized view: fix bug in some large modifications to base partit…
Browse files Browse the repository at this point in the history
…ions

Sometimes a single modification to a base partition requires updates to
a large number of view rows. A common example is deletion of a base
partition containing many rows. A large BATCH is also possible.

To avoid large allocations, we split the large amount of work into
batch of 100 (max_rows_for_view_updates) rows each. The existing code
assumed an empty result from one of these batches meant that we are
done. But this assumption was incorrect: There are several cases when
a base-table update may not need a view update to be generated (see
can_skip_view_updates()) so if all 100 rows in a batch were skipped,
the view update stopped prematurely. This patch includes two tests
showing when this bug can happen - one test using a partition deletion
with a USING TIMESTAMP causing the deletion to not affect the first
100 rows, and a second test using a specially-crafed large BATCH.
These use cases are fairly esoteric, but in fact hit a user in the
wild, which led to the discovery of this bug.

The fix is fairly simple: To detect when build_some() is done it is no
longer enough to check if it returned zero view-update rows; Rather,
it explicitly returns whether or not it is done as an std::optional.

The patch includes several tests for this bug, which pass on Cassandra,
failed on Scylla before this patch, and pass with this patch.

Fixes #12297.

Signed-off-by: Nadav Har'El <nyh@scylladb.com>

Closes #12305
  • Loading branch information
nyh authored and denesb committed Dec 14, 2022
1 parent e7d8855 commit 92d03be
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 10 deletions.
6 changes: 5 additions & 1 deletion db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1219,8 +1219,12 @@ future<stop_iteration> view_update_builder::stop() const {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}

future<utils::chunked_vector<frozen_mutation_and_schema>> view_update_builder::build_some() {
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> view_update_builder::build_some() {
(void)co_await advance_all();
if (!_update && !_existing) {
// Tell the caller there is no more data to build.
co_return std::nullopt;
}
bool do_advance_updates = false;
bool do_advance_existings = false;
if (_update && _update->is_partition_start()) {
Expand Down
10 changes: 9 additions & 1 deletion db/view/view.hh
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,15 @@ public:
}
view_update_builder(view_update_builder&& other) noexcept = default;

future<utils::chunked_vector<frozen_mutation_and_schema>> build_some();

// build_some() works on batches of 100 (max_rows_for_view_updates)
// updated rows, but can_skip_view_updates() can decide that some of
// these rows do not effect the view, and as a result build_some() can
// fewer than 100 rows - in extreme cases even zero (see issue #12297).
// So we can't use an empty returned vector to signify that the view
// update building is done - and we wrap the return value in an
// std::optional, which is disengaged when the iteration is done.
future<std::optional<utils::chunked_vector<frozen_mutation_and_schema>>> build_some();

future<> close() noexcept;

Expand Down
16 changes: 8 additions & 8 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1809,20 +1809,20 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base,

std::exception_ptr err = nullptr;
while (true) {
utils::chunked_vector<frozen_mutation_and_schema> updates;
std::optional<utils::chunked_vector<frozen_mutation_and_schema>> updates;
try {
updates = co_await builder.build_some();
} catch (...) {
err = std::current_exception();
break;
}
if (updates.empty()) {
if (!updates) {
break;
}
tracing::trace(tr_state, "Generated {} view update mutations", updates.size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates));
tracing::trace(tr_state, "Generated {} view update mutations", updates->size());
auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(*updates));
try {
co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats, tr_state,
co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats, tr_state,
std::move(units), service::allow_hints::yes, db::view::wait_for_all_updates::no);
} catch (...) {
// Ignore exceptions: any individual failure to propagate a view update will be reported
Expand Down Expand Up @@ -1947,14 +1947,14 @@ future<> table::populate_views(
while (true) {
try {
auto updates = co_await builder.build_some();
if (updates.empty()) {
if (!updates) {
break;
}
size_t update_size = memory_usage_of(updates);
size_t update_size = memory_usage_of(*updates);
size_t units_to_wait_for = std::min(_config.view_update_concurrency_semaphore_limit, update_size);
auto units = co_await seastar::get_units(*_config.view_update_concurrency_semaphore, units_to_wait_for);
units.adopt(seastar::consume_units(*_config.view_update_concurrency_semaphore, update_size - units_to_wait_for));
co_await db::view::mutate_MV(base_token, std::move(updates), _view_stats, *_config.cf_stats,
co_await db::view::mutate_MV(base_token, std::move(*updates), _view_stats, *_config.cf_stats,
tracing::trace_state_ptr(), std::move(units), service::allow_hints::no, db::view::wait_for_all_updates::yes);
} catch (...) {
if (!err) {
Expand Down
103 changes: 103 additions & 0 deletions test/cql-pytest/test_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,106 @@ def test_view_update_and_alter_base(cql, test_keyspace, scylla_only):
# Try to modify an item. This failed in #11542.
cql.execute(f'UPDATE {table} SET v=-1 WHERE p=1')
assert len(list(cql.execute(f"SELECT v from {mv}"))) == 0

# Reproducer for issue #12297, reproducing a specific way in which a view
# table could be made inconsistent with the base table:
# The test writes 500 rows to one partition in a base table, and then uses
# USING TIMESTAMP with the right value to cause a base partition deletion
# which deletes not the entire partition but just its last 50 rows. As the
# 50 rows of the base partition get deleted, we expect 50 rows from the
# view table to also get deleted - but bug #12297 was that this wasn't
# happening - rather, all rows remained in the view.
# The bug cannot be reproduced with 100 rows (and deleting the last 10)
# but 113 rows (and 101 rows after deleting the last 12) does reproduce
# it. Reproducing the bug also required a setup where USING TIMESTAMP
# deleted the *last* rows - using it to delete the *first* rows did not
# have a bug (the view rows were deleted fine).
@pytest.mark.parametrize("size", [100, 113, 500])
def test_long_skipped_view_update_delete_with_timestamp(cql, test_keyspace, size):
with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table:
with new_materialized_view(cql, table, '*', 'p, x, c', 'p is not null and x is not null and c is not null') as mv:
# Write size rows with c=0..(size-1). Because the iteration is in
# reverse order, the first row in clustering order (c=0) will
# have the latest write timestamp.
for i in reversed(range(size)):
cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})')
assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1"))
# Get the timestamp of the size*0.9th item. Because we wrote items
# in reverse, items 0.9-1.0*size all have earlier timestamp than
# that.
t = list(cql.execute(f"SELECT writetime(y) FROM {table} WHERE p = 1 and c = {int(size*0.9)}"))[0].writetime_y
cql.execute(f'DELETE FROM {table} USING TIMESTAMP {t} WHERE p=1')
# After the deletion we expect to see size*0.9 rows remaining
# (timestamp ties cannot happen for separate writes, if they
# did we could have a bit less), but most importantly, the view
# should have exactly the same rows.
assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1"))

# Same test as above, just that in this version the view partition key is
# different from the base's, so we can be sure that Scylla needs to go
# through the loop of deleting many view rows and cannot delete an entire
# view partition in one fell swoop. In the above test, Scylla *may* contain
# such an optimization (currently it doesn't), so it may reach a different
# code path.
def test_long_skipped_view_update_delete_with_timestamp2(cql, test_keyspace):
size = 200
with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table:
with new_materialized_view(cql, table, '*', 'x, p, c', 'p is not null and x is not null and c is not null') as mv:
for i in reversed(range(size)):
cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})')
assert list(cql.execute(f"SELECT c FROM {table}")) == sorted(list(cql.execute(f"SELECT c FROM {mv}")))
t = list(cql.execute(f"SELECT writetime(y) FROM {table} WHERE p = 1 and c = {int(size*0.9)}"))[0].writetime_y
cql.execute(f'DELETE FROM {table} USING TIMESTAMP {t} WHERE p=1')
assert list(cql.execute(f"SELECT c FROM {table}")) == sorted(list(cql.execute(f"SELECT c FROM {mv}")))

# Another, more fundemental, reproducer for issue #12297 where a certain
# modification to a base partition modifing more than 100 rows was not
# applied to the view beyond the 100th row.
# The test above, test_long_skipped_view_update_delete_with_timestamp was one
# such specific case, which involved a partition tombstone and a specific
# choice of timestamp which causes the first 100 rows to NOT be changed.
# In this test we show that the bug is not just about do-nothing tombstones:
# In any base modification which involves more than 100 rows, if the first
# 100 rows don't change the view (as decided by the can_skip_view_updates()
# function), the other rows are wrongly skipped at well and not applied to
# the view!
# The specific case we use here is an update that sets some irrelevant
# (not-selected-by-the-view) column y on 200 rows, and additionally writes
# a new row as the 201st row. With bug #12297, that 201st row will be
# missing in the view.
def test_long_skipped_view_update_irrelevant_column(cql, test_keyspace):
size = 200
with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table:
# Note that column "y" is not selected by the materialized view
with new_materialized_view(cql, table, 'p, x, c', 'p, x, c', 'p is not null and x is not null and c is not null') as mv:
for i in range(size):
cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})')
# In a single batch (a single mutation), update "y" column in all
# 'size' existing rows, plus add one new row in the last position
# (the partition is sorted by the "c" column). The first 'size'
# UPDATEs can be skipped in the view (because y isn't selected),
# but the last INSERT can't be skipped - it really adds a new row.
cmd = 'BEGIN BATCH '
for i in range(size):
cmd += f'UPDATE {table} SET y=7 where p=1 and c={i}; '
cmd += f'INSERT INTO {table} (p,c,x,y) VALUES (1,{size+1},{size+1},{size+1}); '
cmd += 'APPLY BATCH;'
cql.execute(cmd)
# We should now have the same size+1 rows in both base and view
assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1"))

# After the previous tests checked elaborate conditions where modifying a
# base-table partition resulted in many skipped view updates, let's also
# check the more basic situation where the base-table partition modification
# (in this case, a deletion) result in many view-table updates, and all
# of them should happen even if the code needs to do it internally in
# several batches of 100 (for example).
def test_mv_long_delete(cql, test_keyspace):
size = 300
with new_test_table(cql, test_keyspace, 'p int, c int, x int, y int, primary key (p,c)') as table:
with new_materialized_view(cql, table, '*', 'p, x, c', 'p is not null and x is not null and c is not null') as mv:
for i in range(size):
cql.execute(f'INSERT INTO {table} (p,c,x,y) VALUES (1,{i},{i},{i})')
cql.execute(f'DELETE FROM {table} WHERE p=1')
assert list(cql.execute(f"SELECT c FROM {table} WHERE p = 1")) == []
assert list(cql.execute(f"SELECT c FROM {mv} WHERE p = 1")) == []

0 comments on commit 92d03be

Please sign in to comment.