Skip to content

Commit

Permalink
Merge 'Introduce SELECT MUTATION FRAGMENTS statement' from Botond D…
Browse files Browse the repository at this point in the history
…énes

SELECT MUTATION FRAGMENTS is a new select statement sub-type, which allows dumping the underling mutations making up the data of a given table. The output of this statement is mutation-fragments presented as CQL rows. Each row corresponds to a mutation-fragment. Subsequently, the output of this statement has a schema that is different than that of the underlying table.  The output schema is derived from the table's schema, as following:
* The table's partition key is copied over as-is
* The clustering key is formed from the following columns:
    - mutation_source (text): the kind of the mutation source, one of: memtable, row-cache or sstable; and the identifier of the individual mutation source.
    - partition_region (int): represents the enum with the same name.
    - the copy of the table's clustering columns
    - position_weight (int): -1, 0 or 1, has the same meaning as that in position_in_partition, used to disambiguate range tombstone changes with the same clustering key, from rows and from each other.
* The following regular columns:
    - metadata (text): the JSON representation of the mutation-fragment's metadata.
    - value (text): the JSON representation of the mutation-fragment's value.

Data is always read from the local replica, on which the query is executed. Migrating queries between coordinators is frobidden.

More details in the documentation commit (last commit).

Example:
```cql
cqlsh> CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));

cqlsh> DELETE FROM ks.tbl WHERE pk = 0;
cqlsh> DELETE FROM ks.tbl WHERE pk = 0 AND ck > 0 AND ck < 2;
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 0, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 1, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (0, 2, 0);
cqlsh> INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 0, 0);
cqlsh> SELECT * FROM ks.tbl;

 pk | ck | v
----+----+---
  1 |  0 | 0
  0 |  0 | 0
  0 |  1 | 0
  0 |  2 | 0

(4 rows)
cqlsh> SELECT * FROM MUTATION_FRAGMENTS(ks.tbl);

 pk | mutation_source | partition_region | ck | position_weight | metadata                                                                                                                 | mutation_fragment_kind | value
----+-----------------+------------------+----+-----------------+--------------------------------------------------------------------------------------------------------------------------+------------------------+-----------
  1 |      memtable:0 |                0 |    |                 |                                                                                                         {"tombstone":{}} |        partition start |      null
  1 |      memtable:0 |                2 |  0 |               0 | {"marker":{"timestamp":1688122873341627},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122873341627}}} |         clustering row | {"v":"0"}
  1 |      memtable:0 |                3 |    |                 |                                                                                                                     null |          partition end |      null
  0 |      memtable:0 |                0 |    |                 |                                      {"tombstone":{"timestamp":1688122848686316,"deletion_time":"2023-06-30 11:00:48z"}} |        partition start |      null
  0 |      memtable:0 |                2 |  0 |               0 | {"marker":{"timestamp":1688122860037077},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122860037077}}} |         clustering row | {"v":"0"}
  0 |      memtable:0 |                2 |  0 |               1 |                                      {"tombstone":{"timestamp":1688122853571709,"deletion_time":"2023-06-30 11:00:53z"}} | range tombstone change |      null
  0 |      memtable:0 |                2 |  1 |               0 | {"marker":{"timestamp":1688122864641920},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122864641920}}} |         clustering row | {"v":"0"}
  0 |      memtable:0 |                2 |  2 |              -1 |                                                                                                         {"tombstone":{}} | range tombstone change |      null
  0 |      memtable:0 |                2 |  2 |               0 | {"marker":{"timestamp":1688122868706989},"columns":{"v":{"is_live":true,"type":"regular","timestamp":1688122868706989}}} |         clustering row | {"v":"0"}
  0 |      memtable:0 |                3 |    |                 |                                                                                                                     null |          partition end |      null

(10 rows)
```

Perf simple query:
```
/build/release/scylla perf-simple-query -c1 -m2G --duration=60
```

Before:
```
median 141596.39 tps ( 62.1 allocs/op,  13.1 tasks/op,   43688 insns/op,        0 errors)
median absolute deviation: 137.15
maximum: 142173.32
minimum: 140492.37
```
After:
```
median 141889.95 tps ( 62.1 allocs/op,  13.1 tasks/op,   43692 insns/op,        0 errors)
median absolute deviation: 167.04
maximum: 142380.26
minimum: 141025.51
```

Fixes: #11130

Closes #14347

* github.com:scylladb/scylladb:
  docs/operating-scylla/admin-tools: add documentation for the SELECT * FROM MUTATION_FRAGMENTS() statement
  test/topology_custom: add test_select_from_mutation_fragments.py
  test/boost/database_test: add test for mutation_dump/generate_output_schema_from_underlying_schema
  test/cql-pytest: add test_select_mutation_fragments.py
  test/cql-pytest: move scylla_data_dir fixture to conftest.py
  cql3/statements: wire-in mutation_fragments_select_statement
  cql3/restrictions/statement_restrictions: fix indentation
  cql3/restrictions/statement_restrictions: add check_indexes flag
  cql3/statments/select_statement: add mutation_fragments_select_statement
  cql3: add SELECT MUTATION FRAGMENTS select statement sub-type
  service/pager: allow passing a query functor override
  service/storage_proxy: un-embed coordinator_query_options
  replica: add mutation_dump
  replica: extract query_state into own header
  replica/table: add make_nonpopulating_cache_reader()
  replica/table: add select_memtables_as_mutation_sources()
  tools,mutation: extract the low-level json utilities into mutation/json.hh
  tools/json_writer: fold SstableKey() overloads into callers
  tools/json_writer: allow writing metadata and value separately
  tools/json_writer: split mutation_fragment_json_writer in two classes
  tools/json_writer: allow passing custom std::ostream to json_writer
  • Loading branch information
avikivity committed Jul 19, 2023
2 parents c29e7e4 + 718f57c commit 460b28d
Show file tree
Hide file tree
Showing 33 changed files with 2,341 additions and 417 deletions.
1 change: 1 addition & 0 deletions configure.py
Expand Up @@ -724,6 +724,7 @@ def find_headers(repodir, excluded_dirs):
'replica/memtable.cc',
'replica/exceptions.cc',
'replica/dirty_memory_manager.cc',
'replica/mutation_dump.cc',
'mutation/atomic_cell.cc',
'mutation/canonical_mutation.cc',
'mutation/frozen_mutation.cc',
Expand Down
8 changes: 7 additions & 1 deletion cql3/Cql.g
Expand Up @@ -391,7 +391,10 @@ selectStatement returns [std::unique_ptr<raw::select_statement> expr]
( K_DISTINCT { is_distinct = true; } )?
sclause=selectClause
)
K_FROM cf=columnFamilyName
K_FROM (
cf=columnFamilyName
| K_MUTATION_FRAGMENTS '(' cf=columnFamilyName ')' { statement_subtype = raw::select_statement::parameters::statement_subtype::MUTATION_FRAGMENTS; }
)
( K_WHERE w=whereClause { wclause = std::move(w); } )?
( K_GROUP K_BY gbcolumns=listOfIdentifiers)?
( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )?
Expand Down Expand Up @@ -2067,6 +2070,7 @@ basic_unreserved_keyword returns [sstring str]
| K_DESCRIBE
| K_DESC
| K_EXECUTE
| K_MUTATION_FRAGMENTS
) { $str = $k.text; }
;

Expand Down Expand Up @@ -2275,6 +2279,8 @@ K_PRUNE: P R U N E;

K_EXECUTE: E X E C U T E;

K_MUTATION_FRAGMENTS: M U T A T I O N '_' F R A G M E N T S;

// Case-insensitive alpha characters
fragment A: ('a'|'A');
fragment B: ('b'|'B');
Expand Down
42 changes: 27 additions & 15 deletions cql3/restrictions/statement_restrictions.cc
Expand Up @@ -355,9 +355,11 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
prepare_context& ctx,
bool selects_only_static_columns,
bool for_view,
bool allow_filtering)
bool allow_filtering,
check_indexes do_check_indexes)
: statement_restrictions(schema, allow_filtering)
{
_check_indexes = do_check_indexes;
for (auto&& relation_expr : boolean_factors(where_clause)) {
const expr::binary_operator* relation_binop = expr::as_if<expr::binary_operator>(&relation_expr);

Expand All @@ -383,18 +385,25 @@ statement_restrictions::statement_restrictions(data_dictionary::database db,
_clustering_prefix_restrictions = extract_clustering_prefix_restrictions(*_where, _schema);
_partition_range_restrictions = extract_partition_range(*_where, _schema);
}
auto cf = db.find_column_family(schema);
auto& sim = cf.get_index_manager();
const expr::allow_local_index allow_local(
!has_partition_key_unrestricted_components()
&& partition_key_restrictions_is_all_eq());
_has_multi_column = find_binop(_clustering_columns_restrictions, expr::is_multi_column);
_has_queriable_ck_index = clustering_columns_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_pk_index = parition_key_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_regular_index = expr::index_supports_some_column(_nonprimary_key_restrictions, sim, allow_local)
&& !type.is_delete();
if (_check_indexes) {
auto cf = db.find_column_family(schema);
auto& sim = cf.get_index_manager();
const expr::allow_local_index allow_local(
!has_partition_key_unrestricted_components()
&& partition_key_restrictions_is_all_eq());
_has_multi_column = find_binop(_clustering_columns_restrictions, expr::is_multi_column);
_has_queriable_ck_index = clustering_columns_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_pk_index = parition_key_restrictions_have_supporting_index(sim, allow_local)
&& !type.is_delete();
_has_queriable_regular_index = expr::index_supports_some_column(_nonprimary_key_restrictions, sim, allow_local)
&& !type.is_delete();
} else {
_has_queriable_ck_index = false;
_has_queriable_pk_index = false;
_has_queriable_regular_index = false;
}

// At this point, the select statement if fully constructed, but we still have a few things to validate
process_partition_key_restrictions(for_view, allow_filtering);
Expand Down Expand Up @@ -571,9 +580,12 @@ bool statement_restrictions::has_eq_restriction_on_column(const column_definitio
std::vector<const column_definition*> statement_restrictions::get_column_defs_for_filtering(data_dictionary::database db) const {
std::vector<const column_definition*> column_defs_for_filtering;
if (need_filtering()) {
auto cf = db.find_column_family(_schema);
auto& sim = cf.get_index_manager();
auto opt_idx = std::get<0>(find_idx(sim));
std::optional<secondary_index::index> opt_idx;
if (_check_indexes) {
auto cf = db.find_column_family(_schema);
auto& sim = cf.get_index_manager();
opt_idx = std::get<0>(find_idx(sim));
}
auto column_uses_indexing = [&opt_idx] (const column_definition* cdef, const expr::expression* single_col_restr) {
return opt_idx && single_col_restr && is_supported_by(*single_col_restr, *opt_idx);
};
Expand Down
9 changes: 8 additions & 1 deletion cql3/restrictions/statement_restrictions.hh
Expand Up @@ -25,6 +25,10 @@ namespace cql3 {

namespace restrictions {

///In some cases checking if columns have indexes is undesired of even
///impossible, because e.g. the query runs on a pseudo-table, which does not
///have an index-manager, or even a table object.
using check_indexes = bool_class<class check_indexes_tag>;

/**
* The restrictions corresponding to the relations specified on the where-clause of CQL query.
Expand Down Expand Up @@ -110,6 +114,8 @@ private:

bool _partition_range_is_simple; ///< False iff _partition_range_restrictions imply a Cartesian product.


check_indexes _check_indexes = check_indexes::yes;
public:
/**
* Creates a new empty <code>StatementRestrictions</code>.
Expand All @@ -126,7 +132,8 @@ public:
prepare_context& ctx,
bool selects_only_static_columns,
bool for_view = false,
bool allow_filtering = false);
bool allow_filtering = false,
check_indexes do_check_indexes = check_indexes::yes);

const std::vector<expr::expression>& index_restrictions() const;

Expand Down
13 changes: 6 additions & 7 deletions cql3/statements/raw/select_statement.hh
Expand Up @@ -12,6 +12,7 @@

#include "cql3/statements/raw/cf_statement.hh"
#include "cql3/statements/prepared_statement.hh"
#include "cql3/restrictions/statement_restrictions.hh"
#include "cql3/attributes.hh"
#include "db/config.hh"
#include <seastar/core/shared_ptr.hh>
Expand All @@ -24,10 +25,6 @@ namespace selection {
class prepared_selector;
} // namespace selection

namespace restrictions {
class statement_restrictions;
} // namespace restrictions

namespace statements {

namespace raw {
Expand All @@ -49,7 +46,7 @@ public:
class parameters final {
public:
using orderings_type = std::vector<std::pair<shared_ptr<column_identifier::raw>, ordering>>;
enum class statement_subtype { REGULAR, JSON, PRUNE_MATERIALIZED_VIEW };
enum class statement_subtype { REGULAR, JSON, PRUNE_MATERIALIZED_VIEW, MUTATION_FRAGMENTS };
private:
const orderings_type _orderings;
const bool _is_distinct;
Expand All @@ -69,6 +66,7 @@ public:
bool is_distinct() const;
bool allow_filtering() const;
bool is_json() const;
bool is_mutation_fragments() const;
bool bypass_cache() const;
bool is_prune_materialized_view() const;
orderings_type const& orderings() const;
Expand Down Expand Up @@ -110,13 +108,14 @@ private:
prepare_context& ctx,
::shared_ptr<selection::selection> selection,
bool for_view = false,
bool allow_filtering = false);
bool allow_filtering = false,
restrictions::check_indexes do_check_indexes = restrictions::check_indexes::yes);

/** Returns an expression for the limit or nullopt if no limit is set */
std::optional<expr::expression> prepare_limit(data_dictionary::database db, prepare_context& ctx, const std::optional<expr::expression>& limit);

// Checks whether it is legal to have ORDER BY in this statement
static void verify_ordering_is_allowed(const restrictions::statement_restrictions& restrictions);
static void verify_ordering_is_allowed(const parameters& params, const restrictions::statement_restrictions& restrictions);

void handle_unrecognized_ordering_column(const column_identifier& column) const;

Expand Down

0 comments on commit 460b28d

Please sign in to comment.