Skip to content

Commit

Permalink
Merge "table: Prevent resurrecting data from memtable on compaction" …
Browse files Browse the repository at this point in the history
…from Mikołaj

"
Mutations are not guaranteed to come in the order of their timestamps.
If there is an expired tombstone in the sstable and a repair inserts old
data into memtable, the compaction would not consider memtable data and
purge the tombstone leading to data resurrection. The solution is to
disallow purging tombstones newer than min memtable timestamp. If there
are no memtables, max timestamp is used.
"

* 'check-memtable-at-compact-tombstone-discard/v2' of github.com:mikolajsieluzycki/scylla:
  table: Prevent resurrecting data from memtable on compaction
  table: Add min_memtable_timestamp function to table
  • Loading branch information
avikivity authored and xemul committed Dec 9, 2021
2 parents 2ec36a6 + 504efe0 commit 242e191
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 1 deletion.
2 changes: 1 addition & 1 deletion compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ std::ostream& operator<<(std::ostream& os, pretty_printed_throughput tp) {

static api::timestamp_type get_max_purgeable_timestamp(const table_state& table_s, sstable_set::incremental_selector& selector,
const std::unordered_set<shared_sstable>& compacting_set, const dht::decorated_key& dk) {
auto timestamp = api::max_timestamp;
auto timestamp = table_s.min_memtable_timestamp();
std::optional<utils::hashed_key> hk;
for (auto&& sst : boost::range::join(selector.select(dk).sstables, table_s.compacted_undeleted_sstables())) {
if (compacting_set.contains(sst)) {
Expand Down
1 change: 1 addition & 0 deletions compaction/table_state.hh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public:
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0;
virtual reader_permit make_compaction_reader_permit() const = 0;
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
virtual api::timestamp_type min_memtable_timestamp() const = 0;
};

}
Expand Down
4 changes: 4 additions & 0 deletions database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ private:
std::optional<shared_future<>> _flush_coalescing;
seastar::scheduling_group _compaction_scheduling_group;
table_stats& _table_stats;
public:
using iterator = decltype(_memtables)::iterator;
using const_iterator = decltype(_memtables)::const_iterator;
public:
memtable_list(
seal_immediate_fn_type seal_immediate_fn,
Expand Down Expand Up @@ -726,6 +729,7 @@ public:
using const_mutation_partition_ptr = std::unique_ptr<const mutation_partition>;
using const_row_ptr = std::unique_ptr<const row>;
memtable& active_memtable() { return _memtables->active_memtable(); }
api::timestamp_type min_memtable_timestamp() const;
const row_cache& get_row_cache() const {
return _cache;
}
Expand Down
17 changes: 17 additions & 0 deletions table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "db/commitlog/commitlog.hh"

#include <boost/range/algorithm/remove_if.hpp>
#include <boost/range/algorithm.hpp>

static logging::logger tlogger("table");
static seastar::metrics::label column_family_label("cf");
Expand Down Expand Up @@ -275,6 +276,19 @@ future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db
return _counter_cell_locks->lock_cells(m.decorated_key(), partition_cells_range(m.partition()), timeout);
}

api::timestamp_type table::min_memtable_timestamp() const {
if (_memtables->empty()) {
return api::max_timestamp;
}

return *boost::range::min_element(
*_memtables
| boost::adaptors::transformed(
[](const shared_memtable& m) { return m->get_min_timestamp(); }
)
);
}

// Not performance critical. Currently used for testing only.
future<bool>
table::for_all_partitions_slow(schema_ptr s, reader_permit permit, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
Expand Down Expand Up @@ -2400,6 +2414,9 @@ class table::table_state : public compaction::table_state {
sstables::sstable_writer_config configure_writer(sstring origin) const override {
return _t.get_sstables_manager().configure_writer(std::move(origin));
}
api::timestamp_type min_memtable_timestamp() const override {
return _t.min_memtable_timestamp();
}
};

compaction::table_state& table::as_table_state() const noexcept {
Expand Down
68 changes: 68 additions & 0 deletions test/boost/memtable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
#include <seastar/testing/thread_test_case.hh>
#include "schema_builder.hh"
#include <seastar/util/closeable.hh>
#include "service/migration_manager.hh"

#include <seastar/core/thread.hh>
#include "memtable.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/flat_mutation_reader_assertions.hh"
Expand Down Expand Up @@ -702,4 +704,70 @@ SEASTAR_TEST_CASE(memtable_flush_compresses_mutations) {
}, db_config);
}

SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) {
auto db_config = make_shared<db::config>();
db_config->enable_cache.set(false);
return do_with_cql_env_thread([](cql_test_env& env) {
database& db = env.local_db();
service::migration_manager& mm = env.migration_manager().local();

sstring ks_name = "ks";
sstring table_name = "table_name";

schema_ptr s = schema_builder(ks_name, table_name)
.with_column(to_bytes("pk"), int32_type, column_kind::partition_key)
.with_column(to_bytes("ck"), int32_type, column_kind::clustering_key)
.with_column(to_bytes("id"), int32_type)
.set_gc_grace_seconds(1)
.build();
mm.announce_new_column_family(s).get();

table& t = db.find_column_family(ks_name, table_name);

dht::decorated_key pk = dht::decorate_key(*s, partition_key::from_single_value(*s, serialized(1)));
clustering_key ck_to_delete = clustering_key::from_single_value(*s, serialized(2));
clustering_key ck = clustering_key::from_single_value(*s, serialized(3));

api::timestamp_type insertion_timestamp_before_delete = api::new_timestamp();
forward_jump_clocks(1s);
api::timestamp_type deletion_timestamp = api::new_timestamp();
forward_jump_clocks(1s);
api::timestamp_type insertion_timestamp_after_delete = api::new_timestamp();

mutation m_delete = mutation(s, pk);
m_delete.partition().apply_delete(
*s,
ck_to_delete,
tombstone{deletion_timestamp, gc_clock::now()});
t.apply(m_delete);

// Insert data that won't be removed by tombstone to prevent compaction from skipping whole partition
mutation m_insert = mutation(s, pk);
m_insert.set_clustered_cell(ck, to_bytes("id"), data_value(3), insertion_timestamp_after_delete);
t.apply(m_insert);

// Flush and wait until the gc_grace_seconds pass
t.flush().get();
forward_jump_clocks(2s);

// Apply the past mutation to memtable to simulate repair. This row should be deleted by tombstone
mutation m_past_insert = mutation(s, pk);
m_past_insert.set_clustered_cell(
ck_to_delete,
to_bytes("id"),
data_value(4),
insertion_timestamp_before_delete);
t.apply(m_past_insert);

// Trigger compaction. If all goes well, compaction should check if a relevant row is in the memtable
// and should not purge the tombstone.
t.compact_all_sstables().get();

// If we get additional row (1, 2, 4), that means the tombstone was purged and data was resurrected
assert_that(env.execute_cql(format("SELECT * FROM {}.{};", ks_name, table_name)).get0())
.is_rows()
.with_rows_ignore_order({
{serialized(1), serialized(3), serialized(3)},
});
}, db_config);
}
4 changes: 4 additions & 0 deletions test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ class table_state_for_test : public table_state {
sstables::sstable_writer_config configure_writer(sstring origin) const override {
return _env.manager().configure_writer(std::move(origin));
}

api::timestamp_type min_memtable_timestamp() const override {
return _t->min_memtable_timestamp();
}
};

static std::unique_ptr<table_state> make_table_state_for_test(column_family_for_tests& t, test_env& env) {
Expand Down

0 comments on commit 242e191

Please sign in to comment.