Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add temporary sentry tags for the mutation limit issue #373

Merged
merged 1 commit into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 109 additions & 4 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,107 @@ pub fn delete(db: &SpannerDb, params: params::DeleteBatch) -> Result<()> {
Ok(())
}

pub fn _audit_commit_err<T>(
db: &SpannerDb,
params: &params::CommitBatch,
result: Result<T>,
) -> Result<T> {
if let Err(ref e) = result {
if let DbErrorKind::SpannerGrpc(grpce) = e.kind() {
_audit_commit_err2(db, params, grpce)?
}
}
result
}

pub fn _audit_commit_err2(
db: &SpannerDb,
params: &params::CommitBatch,
grpce: &grpcio::Error,
) -> Result<()> {
match grpce {
grpcio::Error::RpcFailure(ref status) | grpcio::Error::RpcFinished(Some(ref status))
if status.status == grpcio::RpcStatusCode::INVALID_ARGUMENT
&& grpce.to_string().to_lowercase().contains("mutation") =>
{
let result = db
.sql(
"SELECT COUNT(*)
FROM batch_bsos
WHERE fxa_uid = @fxa_uid
AND fxa_kid = @fxa_kid
AND collection_id = @collection_id
AND batch_id = @batch_id",
)?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
"collection_id" => db.get_collection_id(&params.collection)?.to_string(),
"batch_id" => params.batch.id.clone(),
})
.execute(&db.conn)?
.one()?;
let batch_bsos_len = result[0]
.get_string_value()
.parse::<i64>()
.map_err(|e| DbErrorKind::Integrity(e.to_string()))?;

let mutations = db
.session
.borrow()
.mutations
.as_ref()
.cloned()
.unwrap_or_else(|| vec![]);
let mut mutations_total_values_len = 0;
let mut mutations_insert_values_len = 0;
for mutation in &mutations {
if mutation.has_insert() {
let values_len = mutation.get_insert().values.len();
mutations_insert_values_len += values_len;
mutations_total_values_len += values_len;
} else {
mutations_total_values_len += mutation.get_update().values.len();
}
}

let mut event = sentry::integrations::failure::event_from_fail(grpce);
event.extra.insert(
"legacy_uid".to_owned(),
params.user_id.legacy_id.to_string().into(),
);
event
.extra
.insert("ua".to_owned(), params.user_id.ua.clone().into());
event
.extra
.insert("collection".to_owned(), params.collection.clone().into());
event
.extra
.insert("batch_id".to_owned(), params.batch.id.clone().into());
event.extra.insert(
"batch_bsos_len".to_owned(),
batch_bsos_len.to_string().into(),
);
event.extra.insert(
"mutations_len".to_owned(),
mutations.len().to_string().into(),
);
event.extra.insert(
"mutations_total_values_len".to_owned(),
mutations_total_values_len.to_string().into(),
);
event.extra.insert(
"mutations_insert_values_len".to_owned(),
mutations_insert_values_len.to_string().into(),
);
sentry::capture_event(event);
}
_ => (),
}
Ok(())
}

pub fn commit(db: &SpannerDb, params: params::CommitBatch) -> Result<results::CommitBatch> {
let mut metrics = db.metrics.clone();
metrics.start_timer("storage.spanner.apply_batch", None);
Expand All @@ -164,7 +265,8 @@ pub fn commit(db: &SpannerDb, params: params::CommitBatch) -> Result<results::Co
let as_rfc3339 = timestamp.as_rfc3339()?;
// First, UPDATE existing rows in the bsos table with any new values
// supplied in this batch
db.sql(include_str!("batch_commit_update.sql"))?
let result = db
.sql(include_str!("batch_commit_update.sql"))?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
Expand All @@ -175,11 +277,13 @@ pub fn commit(db: &SpannerDb, params: params::CommitBatch) -> Result<results::Co
.param_types(param_types! {
"timestamp" => TypeCode::TIMESTAMP,
})
.execute(&db.conn)?;
.execute(&db.conn);
_audit_commit_err(db, &params, result)?;

// Then INSERT INTO SELECT remaining rows from this batch into the bsos
// table (that didn't already exist there)
db.sql(include_str!("batch_commit_insert.sql"))?
let result = db
.sql(include_str!("batch_commit_insert.sql"))?
.params(params! {
"fxa_uid" => params.user_id.fxa_uid.clone(),
"fxa_kid" => params.user_id.fxa_kid.clone(),
Expand All @@ -192,7 +296,8 @@ pub fn commit(db: &SpannerDb, params: params::CommitBatch) -> Result<results::Co
"timestamp" => TypeCode::TIMESTAMP,
"default_bso_ttl" => TypeCode::INT64,
})
.execute(&db.conn)?;
.execute(&db.conn);
_audit_commit_err(db, &params, result)?;

delete(
db,
Expand Down
6 changes: 3 additions & 3 deletions src/db/spanner/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub const MAX_SPANNER_LOAD_SIZE: usize = 100_000_000;

/// Per session Db metadata
#[derive(Debug, Default)]
struct SpannerDbSession {
pub(super) struct SpannerDbSession {
/// CURRENT_TIMESTAMP() from Spanner, used for timestamping this session's
/// operations
timestamp: Option<SyncTimestamp>,
Expand All @@ -78,7 +78,7 @@ struct SpannerDbSession {
transaction: Option<TransactionSelector>,
/// Behind Vec so commit can take() it (maybe commit() should consume self
/// instead?)
mutations: Option<Vec<Mutation>>,
pub(super) mutations: Option<Vec<Mutation>>,
in_write_transaction: bool,
execute_sql_count: u64,
}
Expand All @@ -97,7 +97,7 @@ pub struct SpannerDbInner {
pub(super) conn: Conn,

thread_pool: Arc<::tokio_threadpool::ThreadPool>,
session: RefCell<SpannerDbSession>,
pub(super) session: RefCell<SpannerDbSession>,
}

impl fmt::Debug for SpannerDbInner {
Expand Down
12 changes: 11 additions & 1 deletion src/web/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ pub struct HawkIdentifier {
/// For NoSQL database backends that require randomly distributed primary keys
pub fxa_uid: String,
pub fxa_kid: String,
pub ua: String,
}

impl HawkIdentifier {
Expand All @@ -891,6 +892,7 @@ impl HawkIdentifier {
legacy_id: 0,
fxa_uid: "cmd".to_owned(),
fxa_kid: "cmd".to_owned(),
ua: "".to_owned(),
}
}

Expand Down Expand Up @@ -938,7 +940,13 @@ impl HawkIdentifier {
.ok_or_else(|| -> ApiError { HawkErrorKind::MissingHeader.into() })?
.to_str()
.map_err(|e| -> ApiError { HawkErrorKind::Header(e).into() })?;
let identifier = Self::generate(&state.secrets, method, auth_header, ci, uri)?;
let ua = match msg.headers().get("user-agent") {
Some(value) => value
.to_str()
.map_err(|e| -> ApiError { HawkErrorKind::Header(e).into() })?,
_ => "",
};
let identifier = Self::generate(&state.secrets, method, auth_header, ua, ci, uri)?;
msg.extensions_mut().insert(identifier.clone());
Ok(identifier)
}
Expand All @@ -947,6 +955,7 @@ impl HawkIdentifier {
secrets: &Secrets,
method: &str,
header: &str,
ua: &str,
connection_info: &ConnectionInfo,
uri: &Uri,
) -> Result<Self, Error> {
Expand All @@ -965,6 +974,7 @@ impl HawkIdentifier {
legacy_id: payload.user_id,
fxa_uid: payload.fxa_uid,
fxa_kid: payload.fxa_kid,
ua: ua.to_owned(),
};
Ok(user_id)
}
Expand Down