Skip to content

Commit

Permalink
replica: make_[multishard_]streaming_reader(): make compaction_time m…
Browse files Browse the repository at this point in the history
…andatory

Now that all users have opted in unconditionally, there is no point in
keeping this optional. Make it mandatory to make sure there are no
opt-out by mistake.
The global override via enable_compacting_data_for_streaming_and_repair
config item still remains, allowing compaction to be force turned-off.
  • Loading branch information
denesb committed Jul 27, 2023
1 parent fdaf908 commit b599f15
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
8 changes: 4 additions & 4 deletions replica/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1536,10 +1536,10 @@ class streaming_reader_lifecycle_policy
};
distributed<replica::database>& _db;
table_id _table_id;
std::optional<gc_clock::time_point> _compaction_time;
gc_clock::time_point _compaction_time;
std::vector<reader_context> _contexts;
public:
streaming_reader_lifecycle_policy(distributed<replica::database>& db, table_id table_id, std::optional<gc_clock::time_point> compaction_time)
streaming_reader_lifecycle_policy(distributed<replica::database>& db, table_id table_id, gc_clock::time_point compaction_time)
: _db(db)
, _table_id(table_id)
, _compaction_time(compaction_time)
Expand Down Expand Up @@ -2911,7 +2911,7 @@ void database::unplug_view_update_generator() noexcept {
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator,
std::optional<gc_clock::time_point> compaction_time) {
gc_clock::time_point compaction_time) {

auto& table = db.local().find_column_family(schema);
auto erm = table.get_effective_replication_map();
Expand All @@ -2932,7 +2932,7 @@ flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::da
}

flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time)
schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time)
{
const auto table_id = schema->id();
const auto& full_slice = schema->full_slice();
Expand Down
12 changes: 6 additions & 6 deletions replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -691,21 +691,21 @@ public:
// When compaction_time is engaged, the reader's output will be compacted, with the provided query time.
// This compaction doesn't do tombstone garbage collection.
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit,
const dht::partition_range_vector& ranges, std::optional<gc_clock::time_point> compaction_time) const;
const dht::partition_range_vector& ranges, gc_clock::time_point compaction_time) const;

// Single range overload.
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice,
mutation_reader::forwarding fwd_mr,
std::optional<gc_clock::time_point> compaction_time) const;
gc_clock::time_point compaction_time) const;

flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time) {
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time) {
return make_streaming_reader(schema, std::move(permit), range, schema->full_slice(), mutation_reader::forwarding::no, compaction_time);
}

// Stream reader from the given sstables
flat_mutation_reader_v2 make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables, std::optional<gc_clock::time_point> compaction_time) const;
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const;

// Make a reader which reads only from the row-cache.
// The reader doens't populate the cache, it reads only what is in the cache
Expand Down Expand Up @@ -1786,9 +1786,9 @@ future<> start_large_data_handler(sharded<replica::database>& db);
// Opt-in for compacting the output by passing `compaction_time`, see
// make_streaming_reader() for more details.
flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db, schema_ptr schema, reader_permit permit,
std::function<std::optional<dht::partition_range>()> range_generator, std::optional<gc_clock::time_point> compaction_time);
std::function<std::optional<dht::partition_range>()> range_generator, gc_clock::time_point compaction_time);

flat_mutation_reader_v2 make_multishard_streaming_reader(distributed<replica::database>& db,
schema_ptr schema, reader_permit permit, const dht::partition_range& range, std::optional<gc_clock::time_point> compaction_time);
schema_ptr schema, reader_permit permit, const dht::partition_range& range, gc_clock::time_point compaction_time);

bool is_internal_keyspace(std::string_view name);
12 changes: 6 additions & 6 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ sstables::shared_sstable table::make_streaming_staging_sstable() {
return make_streaming_sstable_for_write(sstables::staging_dir);
}

static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_v2 underlying, const compaction_manager& cm, std::optional<gc_clock::time_point> compaction_time, bool compaction_enabled) {
if (!compaction_time || !compaction_enabled) {
static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_v2 underlying, const compaction_manager& cm, gc_clock::time_point compaction_time, bool compaction_enabled) {
if (!compaction_enabled) {
return underlying;
}
return make_compacting_reader(
std::move(underlying),
*compaction_time,
compaction_time,
[] (const dht::decorated_key&) { return api::min_timestamp; }, // disable tombstone purging
cm.get_tombstone_gc_state(),
streamed_mutation::forwarding::no);
Expand All @@ -307,7 +307,7 @@ static flat_mutation_reader_v2 maybe_compact_for_streaming(flat_mutation_reader_
flat_mutation_reader_v2
table::make_streaming_reader(schema_ptr s, reader_permit permit,
const dht::partition_range_vector& ranges,
std::optional<gc_clock::time_point> compaction_time) const {
gc_clock::time_point compaction_time) const {
auto& slice = s->full_slice();

auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
Expand All @@ -328,7 +328,7 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
}

flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, std::optional<gc_clock::time_point> compaction_time) const {
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr, gc_clock::time_point compaction_time) const {
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;

Expand All @@ -345,7 +345,7 @@ flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_p
}

flat_mutation_reader_v2 table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
lw_shared_ptr<sstables::sstable_set> sstables, std::optional<gc_clock::time_point> compaction_time) const {
lw_shared_ptr<sstables::sstable_set> sstables, gc_clock::time_point compaction_time) const {
auto& slice = schema->full_slice();
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;
Expand Down

0 comments on commit b599f15

Please sign in to comment.