Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add warning to rewritten queries with WHERE #909

Merged
merged 5 commits into from
Nov 11, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 60 additions & 9 deletions shotover-proxy/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use cassandra_protocol::frame::{Opcode, Version};
use cassandra_protocol::query::QueryParams;
use cassandra_protocol::types::CBytesShort;
use cql3_parser::cassandra_statement::CassandraStatement;
use cql3_parser::common::{FQName, Identifier};
use cql3_parser::select::SelectElement;
use cql3_parser::common::{FQName, Identifier, Operand, RelationElement, RelationOperator};
use cql3_parser::select::{Select, SelectElement};
use futures::future::try_join_all;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
Expand Down Expand Up @@ -604,25 +604,37 @@ impl CassandraSinkCluster {
))
}

/// Returns any information required to correctly rewrite the response.
/// Will also perform minor modifications to the query required for the rewrite.
fn get_rewrite_table(&self, request: &mut Message, index: usize) -> Option<TableToRewrite> {
if let Some(Frame::Cassandra(cassandra)) = request.frame() {
// No need to handle Batch as selects can only occur on Query
if let CassandraOperation::Query { query, .. } = &cassandra.operation {
if let CassandraStatement::Select(select) = query.as_ref() {
if let CassandraOperation::Query { query, .. } = &mut cassandra.operation {
if let CassandraStatement::Select(select) = query.as_mut() {
let ty = if self.local_table == select.table_name {
RewriteTableTy::Local
} else if self.peers_table == select.table_name
|| self.peers_v2_table == select.table_name
{
// TODO: fail if WHERE exists
RewriteTableTy::Peers
} else {
return None;
};

let warnings = if Self::has_no_where_clause(ty, select) {
vec![]
} else {
select.where_clause.clear();
vec![format!(
"WHERE clause on the query was ignored. Shotover does not support WHERE clauses on queries against {}",
rukai marked this conversation as resolved.
Show resolved Hide resolved
select.table_name
)]
};

return Some(TableToRewrite {
index,
ty,
warnings,
version: cassandra.version,
selects: select.columns.clone(),
});
Expand All @@ -632,29 +644,62 @@ impl CassandraSinkCluster {
None
}

fn has_no_where_clause(ty: RewriteTableTy, select: &Select) -> bool {
select.where_clause.is_empty()
// Most drivers do `FROM system.local WHERE key = 'local'` when determining the topology.
// I'm not sure why they do that it seems to have no affect as there is only ever one row and its key is always 'local'.
// Maybe it was a workaround for an old version of cassandra that got copied around?
// To keep warning noise down we consider it as having no where clause.
|| (ty == RewriteTableTy::Local
&& select.where_clause
== [RelationElement {
obj: Operand::Column(Identifier::Unquoted("key".to_string())),
oper: RelationOperator::Equal,
value: Operand::Const("'local'".to_owned()),
}])
}

async fn rewrite_table(
&mut self,
table: TableToRewrite,
responses: &mut Vec<Message>,
) -> Result<()> {
fn get_warnings(message: &mut Message) -> Vec<String> {
if let Some(Frame::Cassandra(frame)) = message.frame() {
frame.warnings.clone()
} else {
vec![]
}
}

if table.index + 1 < responses.len() {
let peers_response = responses.remove(table.index + 1);
let mut peers_response = responses.remove(table.index + 1);

// Include warnings from every message that gets combined into the final message + any extra warnings noted in the TableToRewrite
let mut warnings = get_warnings(&mut peers_response);
warnings.extend(table.warnings.clone());

match table.ty {
RewriteTableTy::Local => {
if let Some(local_response) = responses.get_mut(table.index) {
self.rewrite_table_local(table, local_response, peers_response)
warnings.extend(get_warnings(local_response));

self.rewrite_table_local(table, local_response, peers_response, warnings)
.await?;
local_response.invalidate_cache();
}
}
RewriteTableTy::Peers => {
if table.index + 1 < responses.len() {
let local_response = responses.remove(table.index + 1);
let mut local_response = responses.remove(table.index + 1);
if let Some(client_peers_response) = responses.get_mut(table.index) {
warnings.extend(get_warnings(&mut local_response));
warnings.extend(get_warnings(client_peers_response));

let mut nodes = parse_system_nodes(peers_response)?;
nodes.extend(parse_system_nodes(local_response)?);

self.rewrite_table_peers(table, client_peers_response, nodes)
self.rewrite_table_peers(table, client_peers_response, nodes, warnings)
.await?;
client_peers_response.invalidate_cache();
}
Expand All @@ -671,6 +716,7 @@ impl CassandraSinkCluster {
table: TableToRewrite,
peers_response: &mut Message,
nodes: Vec<NodeInfo>,
warnings: Vec<String>,
) -> Result<()> {
let mut data_center_alias = "data_center";
let mut rack_alias = "rack";
Expand Down Expand Up @@ -724,6 +770,7 @@ impl CassandraSinkCluster {
}

if let Some(Frame::Cassandra(frame)) = peers_response.frame() {
frame.warnings = warnings;
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
Expand Down Expand Up @@ -820,6 +867,7 @@ impl CassandraSinkCluster {
table: TableToRewrite,
local_response: &mut Message,
peers_response: Message,
warnings: Vec<String>,
) -> Result<()> {
let mut peers = parse_system_nodes(peers_response)?;
peers.retain(|node| {
Expand Down Expand Up @@ -867,6 +915,7 @@ impl CassandraSinkCluster {
if let CassandraOperation::Result(CassandraResult::Rows { rows, metadata }) =
&mut frame.operation
{
frame.warnings = warnings;
// The local_response message is guaranteed to come from a node that is in our configured data_center/rack.
// That means we can leave fields like rack and data_center alone and get exactly what we want.
for row in rows {
Expand Down Expand Up @@ -950,8 +999,10 @@ struct TableToRewrite {
ty: RewriteTableTy,
version: Version,
selects: Vec<SelectElement>,
warnings: Vec<String>,
}

#[derive(PartialEq, Clone, Copy)]
enum RewriteTableTy {
Local,
Peers,
Expand Down