Skip to content

Commit

Permalink
compaction: do not write expired cell as dead cell if it can be purge…
Browse files Browse the repository at this point in the history
…d right away

When compacting a fully expired sstable, we're not allowing that sstable
to be purged because expired cell is *unconditionally* converted into a
dead cell. Why not check if the expired cell can be purged instead using
gc before and max purgeable timestamp?

Currently, we need two compactions to get rid of a fully expired sstable
which cells could have always been purged.

look at this sstable with expired cell:
  {
    "partition" : {
      "key" : [ "2" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 120,
        "liveness_info" : { "tstamp" : "2017-04-09T17:07:12.702597Z",
"ttl" : 20, "expires_at" : "2017-04-09T17:07:32Z", "expired" : true },
        "cells" : [
          { "name" : "country", "value" : "1" },
        ]

now this sstable data after first compaction:
[shard 0] compaction - Compacted 1 sstables to [...]. 120 bytes to 79
(~65% of original) in 229ms = 0.000328997MB/s.

  {
    ...
    "rows" : [
      {
        "type" : "row",
        "position" : 79,
        "cells" : [
          { "name" : "country", "deletion_info" :
{ "local_delete_time" : "2017-04-09T17:07:12Z" },
            "tstamp" : "2017-04-09T17:07:12.702597Z"
          },
        ]

now another compaction will actually get rid of data:
compaction - Compacted 1 sstables to []. 79 bytes to 0 (~0% of original)
in 1ms = 0MB/s. ~2 total partitions merged to 0

NOTE:
It's a waste of time to wait for second compaction because the expired
cell could have been purged at first compaction because it satisfied
gc_before and max purgeable timestamp.

Fixes #2249, #2253

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20170413001049.9663-1-raphaelsc@scylladb.com>
  • Loading branch information
raphaelsc authored and avikivity committed Apr 13, 2017
1 parent ecb8ee4 commit a6f8f4f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 5 deletions.
11 changes: 9 additions & 2 deletions mutation_partition.cc
Expand Up @@ -1534,12 +1534,19 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb,
const column_definition& def = s.column_at(kind, id);
if (def.is_atomic()) {
atomic_cell_view cell = c.as_atomic_cell();
auto can_erase_cell = [&] {
return cell.deletion_time() < gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
};

if (cell.is_covered_by(tomb, def.is_counter())) {
erase = true;
} else if (cell.has_expired(query_time)) {
c = atomic_cell::make_dead(cell.timestamp(), cell.deletion_time());
erase = can_erase_cell();
if (!erase) {
c = atomic_cell::make_dead(cell.timestamp(), cell.deletion_time());
}
} else if (!cell.is_live()) {
erase = cell.deletion_time() < gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
erase = can_erase_cell();
} else {
any_live |= true;
}
Expand Down
72 changes: 72 additions & 0 deletions tests/sstable_datafile_test.cc
Expand Up @@ -2284,13 +2284,39 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
return m;
};

auto make_expiring = [&] (partition_key key, bool ttl) {
mutation m(key, s);
m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)),
gc_clock::now().time_since_epoch().count(), gc_clock::duration(ttl));
return m;
};

auto make_delete = [&] (partition_key key) {
mutation m(key, s);
tombstone tomb(next_timestamp(), gc_clock::now());
m.partition().apply(tomb);
return m;
};

auto assert_that_produces_dead_cell = [&] (auto& sst, partition_key& key) {
auto reader = make_lw_shared(sstable_reader(sst, s));
(*reader)().then([&key] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([reader, s, &key] (mutation_opt m) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, key));
auto& rows = m->partition().clustered_rows();
BOOST_REQUIRE_EQUAL(rows.calculate_size(), 1);
auto& row = rows.begin()->row();
auto& cells = row.cells();
BOOST_REQUIRE_EQUAL(cells.size(), 1);
BOOST_REQUIRE(!cells.cell_at(s->get_column_definition("value")->id).as_atomic_cell().is_live());
return (*reader)();
}).then([reader, s] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
}).get();
};

auto alpha = partition_key::from_exploded(*s, {to_bytes("alpha")});
auto beta = partition_key::from_exploded(*s, {to_bytes("beta")});

Expand Down Expand Up @@ -2369,6 +2395,52 @@ SEASTAR_TEST_CASE(tombstone_purge_test) {
.produces(mut3)
.produces_end_of_stream();
}

{
// check that expired cell will not be purged if it will ressurect overwritten data.
auto mut1 = make_insert(alpha);
auto mut2 = make_expiring(alpha, 1);

auto sst1 = make_sstable_containing(sst_gen, {mut1});
auto sst2 = make_sstable_containing(sst_gen, {mut2});

forward_jump_clocks(std::chrono::seconds(5));

auto result = compact({sst1, sst2}, {sst2});
BOOST_REQUIRE_EQUAL(1, result.size());
assert_that_produces_dead_cell(result[0], alpha);

result = compact({sst1, sst2}, {sst1, sst2});
BOOST_REQUIRE_EQUAL(0, result.size());
}
{
auto mut1 = make_insert(alpha);
auto mut2 = make_expiring(beta, 1);

auto sst1 = make_sstable_containing(sst_gen, {mut1});
auto sst2 = make_sstable_containing(sst_gen, {mut2});

forward_jump_clocks(std::chrono::seconds(5));

auto result = compact({sst1, sst2}, {sst2});
BOOST_REQUIRE_EQUAL(0, result.size());
}
{
auto mut1 = make_insert(alpha);
auto mut2 = make_expiring(alpha, 1);
auto mut3 = make_insert(beta);

auto sst1 = make_sstable_containing(sst_gen, {mut1});
auto sst2 = make_sstable_containing(sst_gen, {mut2, mut3});

forward_jump_clocks(std::chrono::seconds(5));

auto result = compact({sst1, sst2}, {sst1, sst2});
BOOST_REQUIRE_EQUAL(1, result.size());
assert_that(sstable_reader(result[0], s))
.produces(mut3)
.produces_end_of_stream();
}
});
}

Expand Down
12 changes: 9 additions & 3 deletions types.cc
Expand Up @@ -2149,14 +2149,20 @@ bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_
std::vector<std::pair<bytes, atomic_cell>> survivors;
for (auto&& name_and_cell : cells) {
atomic_cell& cell = name_and_cell.second;
auto cannot_erase_cell = [&] {
return cell.deletion_time() >= gc_before || !can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
};

if (cell.is_covered_by(tomb, false)) {
continue;
}
if (cell.has_expired(query_time)) {
survivors.emplace_back(std::make_pair(
std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time())));
if (cannot_erase_cell()) {
survivors.emplace_back(std::make_pair(
std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time())));
}
} else if (!cell.is_live()) {
if (cell.deletion_time() >= gc_before || !can_gc(tombstone(cell.timestamp(), cell.deletion_time()))) {
if (cannot_erase_cell()) {
survivors.emplace_back(std::move(name_and_cell));
}
} else {
Expand Down

0 comments on commit a6f8f4f

Please sign in to comment.