Skip to content

Commit

Permalink
Merge "lwt: for each statement in cas_request provide a row in CAS re…
Browse files Browse the repository at this point in the history
…sult set" from Pavel Solodovnikov

Previously batch statement result set included rows for only
those updates which have a prefetch data present (i.e. there
was an "old" (pre-existing) row for a key).

Also, these rows were sorted not in the order in which statements
appear in the batch, but in the order of updated clustering keys.

If we have a batch which updates a few non-existent keys, then
it's impossible to figure out which update inserted a new key
by looking at the query response. Not only because the responses
may not correspond to the order of statements in the batch, but
even some rows may not show up in the result set at all.

Please see #7113 on Github for detailed description
of the problem:
#7113

The patch set proposes the following fix:

For conditional batch statements the result set now always
includes a row for each LWT statement, in the same order
in which individual statements appear in the batch.

This way we can always tell which update did actually insert
a new key or update the existing one.

Technically, the following changes were made:
 * `update_parameters::prefetch_data::row::is_in_cas_result_set`
   member removed as well as the supporting code in
   `cas_request::applies_to` which iterated through cas updates
   and marked individual `prefetch_data` rows as "need to be in
   cas result set".
 * `cas_request::applies_to` substantially simplified since it
   doesn't do anything more than checking `stmt.applies_to()`
   in short-circuiting manner.
 * `modification_statement::build_cas_result_set` method moved
   to `cas_request`. This allows to easily iterate through
   individual `cas_row_update` instances and preserve the order
   of the rows in the result set.
 * A little helper `cas_request::find_old_row`
   is introduced to find a row in `prefetch_data` based on the
   (pk, ck) combination obtained from the current `cas_request`
   and a given `cas_row_update`.
 * A few tests for the issue #7113 are written, other lwt-batch-related
   tests adjusted accordingly.
  • Loading branch information
tgrabiec committed Sep 4, 2020
2 parents e88b8a9 + 92fd515 commit bcdcf06
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 110 deletions.
2 changes: 1 addition & 1 deletion cql3/statements/batch_statement.cc
Expand Up @@ -388,7 +388,7 @@ future<shared_ptr<cql_transport::messages::result_message>> batch_statement::exe
return proxy.cas(schema, request, request->read_command(proxy), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
cl_for_paxos, cl_for_learn, batch_timeout, cas_timeout).then([this, request] (bool is_applied) {
return modification_statement::build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied, request->rows());
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
}

Expand Down
144 changes: 95 additions & 49 deletions cql3/statements/cas_request.cc
Expand Up @@ -42,6 +42,9 @@
#include "modification_statement.hh"
#include "cas_request.hh"
#include <seastar/core/sleep.hh>
#include "cql3/result_set.hh"
#include "transport/messages/result_message.hh"
#include "types/map.hh"

namespace cql3::statements {

Expand Down Expand Up @@ -120,60 +123,16 @@ lw_shared_ptr<query::read_command> cas_request::read_command(service::storage_pr
}

bool cas_request::applies_to() const {

const partition_key& pkey = _key.front().start()->value().key().value();
const clustering_key empty_ckey = clustering_key::make_empty();
bool applies = true;
bool is_cas_result_set_empty = true;
bool has_static_column_conditions = false;
for (const cas_row_update& op: _updates) {
if (op.statement.has_conditions() == false) {
if (!op.statement.has_conditions()) {
continue;
}
if (op.statement.has_static_column_conditions()) {
has_static_column_conditions = true;
}
// If a statement has only static columns conditions, we must ignore its clustering columns
// restriction when choosing a row to check the conditions, i.e. choose any partition row,
// because any of them must have static columns and that's all we need to know if the
// statement applies. For example, the following update must successfully apply (effectively
// turn into INSERT), because, although the table doesn't have any regular rows matching the
// statement clustering column restriction, the static row matches the statement condition:
// CREATE TABLE t(p int, c int, s int static, v int, PRIMARY KEY(p, c));
// INSERT INTO t(p, s) VALUES(1, 1);
// UPDATE t SET v=1 WHERE p=1 AND c=1 IF s=1;
// Another case when we pass an empty clustering key prefix is apparently when the table
// doesn't have any clustering key columns and the clustering key range is empty (open
// ended on both sides).
const auto& ckey = !op.statement.has_only_static_column_conditions() && op.ranges.front().start() ?
op.ranges.front().start()->value() : empty_ckey;
const auto* row = _rows.find_row(pkey, ckey);
if (row) {
row->is_in_cas_result_set = true;
is_cas_result_set_empty = false;
// No need to check subsequent conditions as we have already failed the current one.
if (!op.statement.applies_to(find_old_row(op), op.options)) {
return false;
}
if (!applies) {
// No need to check this condition as we have already failed a previous one.
// Continuing the loop just to set is_in_cas_result_set flag for all involved
// statements, which is necessary to build the CAS result set.
continue;
}
applies = op.statement.applies_to(row, op.options);
}
if (has_static_column_conditions && is_cas_result_set_empty) {
// If none of the fetched rows matches clustering key restrictions and hence none of them is
// included into the CAS result set, but there is a static column condition in the CAS batch,
// we must still include the static row into the result set. Consider the following example:
// CREATE TABLE t(p int, c int, s int static, v int, PRIMARY KEY(p, c));
// INSERT INTO t(p, s) VALUES(1, 1);
// DELETE v FROM t WHERE p=1 AND c=1 IF v=1 AND s=1;
// In this case the conditional DELETE must return [applied=False, v=null, s=1].
const auto* row = _rows.find_row(pkey, empty_ckey);
if (row) {
row->is_in_cas_result_set = true;
}
}
return applies;
return true;
}

std::optional<mutation> cas_request::apply(foreign_ptr<lw_shared_ptr<query::result>> qr,
Expand All @@ -186,4 +145,91 @@ std::optional<mutation> cas_request::apply(foreign_ptr<lw_shared_ptr<query::resu
}
}

const update_parameters::prefetch_data::row* cas_request::find_old_row(const cas_row_update& op) const {
static const clustering_key empty_ckey = clustering_key::make_empty();
const partition_key& pkey = _key.front().start()->value().key().value();
// If a statement has only static columns conditions, we must ignore its clustering columns
// restriction when choosing a row to check the conditions, i.e. choose any partition row,
// because any of them must have static columns and that's all we need to know if the
// statement applies. For example, the following update must successfully apply (effectively
// turn into INSERT), because, although the table doesn't have any regular rows matching the
// statement clustering column restriction, the static row matches the statement condition:
// CREATE TABLE t(p int, c int, s int static, v int, PRIMARY KEY(p, c));
// INSERT INTO t(p, s) VALUES(1, 1);
// UPDATE t SET v=1 WHERE p=1 AND c=1 IF s=1;
// Another case when we pass an empty clustering key prefix is apparently when the table
// doesn't have any clustering key columns and the clustering key range is empty (open
// ended on both sides).
const clustering_key& ckey = !op.statement.has_only_static_column_conditions() && op.ranges.front().start() ?
op.ranges.front().start()->value() : empty_ckey;
return _rows.find_row(pkey, ckey);
}

seastar::shared_ptr<cql_transport::messages::result_message>
cas_request::build_cas_result_set(seastar::shared_ptr<cql3::metadata> metadata,
const column_set& columns,
bool is_applied) const {
const partition_key& pkey = _key.front().start()->value().key().value();
const clustering_key empty_ckey = clustering_key::make_empty();
auto result_set = std::make_unique<cql3::result_set>(metadata);

for (const cas_row_update& op: _updates) {
// Construct the result set row
std::vector<bytes_opt> rs_row;
rs_row.reserve(metadata->value_count());
rs_row.emplace_back(boolean_type->decompose(is_applied));
// Get old row from prefetched data for the row update
const auto* old_row = find_old_row(op);
if (!old_row) {
if (!op.statement.has_static_column_conditions()) {
// In case there is no old row, leave all other columns null
// so that we can infer whether the update attempts to insert a
// non-existing row.
rs_row.resize(metadata->value_count());
result_set->add_row(std::move(rs_row));
continue;
}
// If none of the fetched rows matches clustering key restrictions,
// but there is a static column condition in the CAS batch,
// we must still include the static row into the result set. Consider the following example:
// CREATE TABLE t(p int, c int, s int static, v int, PRIMARY KEY(p, c));
// INSERT INTO t(p, s) VALUES(1, 1);
// DELETE v FROM t WHERE p=1 AND c=1 IF v=1 AND s=1;
// In this case the conditional DELETE must return [applied=False, v=null, s=1].
old_row = _rows.find_row(pkey, empty_ckey);
if (!old_row) {
// In case there is no old row, leave all other columns null
// so that we can infer whether the update attempts to insert a
// non-existing row.
rs_row.resize(metadata->value_count());
result_set->add_row(std::move(rs_row));
continue;
}
}
// Fill in the cells from prefetch data (old row) into the result set row
for (ordinal_column_id id = columns.find_first(); id != column_set::npos; id = columns.find_next(id)) {
const auto it = old_row->cells.find(id);
if (it == old_row->cells.end()) {
rs_row.emplace_back(bytes_opt{});
continue;
}
const data_value& cell = it->second;
const abstract_type& cell_type = *cell.type();
const abstract_type& column_type = *_rows.schema->column_at(id).type;

if (column_type.is_listlike() && cell_type.is_map()) {
// List/sets are fetched as maps, but need to be stored as sets.
const listlike_collection_type_impl& list_type = static_cast<const listlike_collection_type_impl&>(column_type);
const map_type_impl& map_type = static_cast<const map_type_impl&>(cell_type);
rs_row.emplace_back(list_type.serialize_map(map_type, cell));
} else {
rs_row.emplace_back(cell_type.decompose(cell));
}
}
result_set->add_row(std::move(rs_row));
}
cql3::result result(std::move(result_set));
return seastar::make_shared<cql_transport::messages::result_message::rows>(std::move(result));
}

} // end of namespace "cql3::statements"
12 changes: 12 additions & 0 deletions cql3/statements/cas_request.hh
Expand Up @@ -98,9 +98,21 @@ public:
virtual std::optional<mutation> apply(foreign_ptr<lw_shared_ptr<query::result>> qr,
const query::partition_slice& slice, api::timestamp_type ts) override;

/// Build a result set with prefetched rows, but return only
/// the columns required by CAS.
///
/// Each cas_row_update provides a row in the result set.
/// Rows are ordered the same way as the individual statements appear
/// in case of batch statement.
seastar::shared_ptr<cql_transport::messages::result_message>
build_cas_result_set(seastar::shared_ptr<cql3::metadata> metadata,
const column_set& mask, bool is_applied) const;

private:
bool applies_to() const;
std::optional<mutation> apply_updates(api::timestamp_type t) const;
/// Find a row in prefetch_data which matches primary key identifying a given `cas_row_update`
const update_parameters::prefetch_data::row* find_old_row(const cas_row_update& op) const;
};

} // end of namespace "cql3::statements"
49 changes: 1 addition & 48 deletions cql3/statements/modification_statement.cc
Expand Up @@ -355,57 +355,10 @@ modification_statement::execute_with_condition(service::storage_proxy& proxy, se
return proxy.cas(s, request, request->read_command(proxy), request->key(),
{read_timeout, qs.get_permit(), qs.get_client_state(), qs.get_trace_state()},
cl_for_paxos, cl_for_learn, statement_timeout, cas_timeout).then([this, request] (bool is_applied) {
return build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied, request->rows());
return request->build_cas_result_set(_metadata, _columns_of_cas_result_set, is_applied);
});
}

seastar::shared_ptr<cql_transport::messages::result_message>
modification_statement::build_cas_result_set(seastar::shared_ptr<cql3::metadata> metadata,
const column_set& columns,
bool is_applied,
const update_parameters::prefetch_data& rows) {

auto result_set = std::make_unique<cql3::result_set>(metadata);
for (const auto& it : rows.rows) {
const update_parameters::prefetch_data::row& cell_map = it.second;
if (!cell_map.is_in_cas_result_set) {
continue;
}
std::vector<bytes_opt> row;
row.reserve(metadata->value_count());
row.emplace_back(boolean_type->decompose(is_applied));
for (ordinal_column_id id = columns.find_first(); id != column_set::npos; id = columns.find_next(id)) {
const auto it = cell_map.cells.find(id);
if (it == cell_map.cells.end()) {
row.emplace_back(bytes_opt{});
} else {
const data_value& cell = it->second;
const abstract_type& cell_type = *cell.type();
const abstract_type& column_type = *rows.schema->column_at(id).type;

if (column_type.is_listlike() && cell_type.is_map()) {
// List/sets are fetched as maps, but need to be stored as sets.
const listlike_collection_type_impl& list_type = static_cast<const listlike_collection_type_impl&>(column_type);
const map_type_impl& map_type = static_cast<const map_type_impl&>(cell_type);
row.emplace_back(list_type.serialize_map(map_type, cell));
} else {
row.emplace_back(cell_type.decompose(cell));
}
}
}
result_set->add_row(std::move(row));
}
if (result_set->empty()) {
// Is the case when, e.g., IF EXISTS or IF NOT EXISTS finds no row.
std::vector<bytes_opt> row;
row.emplace_back(boolean_type->decompose(is_applied));
row.resize(metadata->value_count());
result_set->add_row(std::move(row));
}
cql3::result result(std::move(result_set));
return seastar::make_shared<cql_transport::messages::result_message::rows>(std::move(result));
}

void modification_statement::build_cas_result_set_metadata() {

std::vector<lw_shared_ptr<column_specification>> columns;
Expand Down
8 changes: 1 addition & 7 deletions cql3/statements/modification_statement.hh
Expand Up @@ -207,13 +207,7 @@ public:
// CAS statement returns a result set. Prepare result set metadata
// so that get_result_metadata() returns a meaningful value.
void build_cas_result_set_metadata();
// Build a result set with prefetched rows, but return only
// the columns required by CAS. Static since reused by BATCH
// CAS.
static seastar::shared_ptr<cql_transport::messages::result_message>
build_cas_result_set(seastar::shared_ptr<cql3::metadata> metadata,
const column_set& mask, bool is_applied,
const update_parameters::prefetch_data& rows);

public:
virtual dht::partition_range_vector build_partition_keys(const query_options& options, const json_cache_opt& json_cache) const;
virtual query::clustering_row_ranges create_clustering_ranges(const query_options& options, const json_cache_opt& json_cache) const;
Expand Down
4 changes: 0 additions & 4 deletions cql3/update_parameters.hh
Expand Up @@ -96,10 +96,6 @@ public:
struct row {
// Order CAS columns by ordinal column id.
std::map<ordinal_column_id, data_value> cells;
// Set if the statement is used for checking conditions of a CAS request.
// Only those statements that have this flag set should be included into
// the CAS result set.
mutable bool is_in_cas_result_set = false;
// Return true if this row has at least one static column set.
bool has_static_columns(const schema& schema) const {
if (!schema.has_static_columns()) {
Expand Down

0 comments on commit bcdcf06

Please sign in to comment.