Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: further special case another form of an incomplete router record #655

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 41 additions & 15 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,25 @@ pub fn retryable_error(metrics: Arc<StatsdClient>) -> impl Fn(&grpcio::Error) ->
}
}

/// Determine if a router record is "incomplete" (doesn't include [User]
/// columns):
///
/// They can be incomplete for a couple reasons:
///
/// 1) A migration code bug caused a few incomplete migrations where
/// `add_channels` and `increment_storage` calls occurred when the migration's
/// initial `add_user` was never completed:
/// https://github.com/mozilla-services/autopush-rs/pull/640
///
/// 2) When router TTLs are eventually enabled: `add_channel` and
/// `increment_storage` can write cells with later expiry times than the other
/// router cells
fn is_incomplete_router_record(cells: &HashMap<String, Vec<cell::Cell>>) -> bool {
cells
.keys()
.all(|k| ["current_timestamp", "version"].contains(&k.as_str()) || k.starts_with("chid:"))
}

fn call_opts(metadata: Metadata) -> ::grpcio::CallOption {
::grpcio::CallOption::default().headers(metadata)
}
Expand Down Expand Up @@ -849,23 +868,20 @@ impl DbClient for BigTableClientImpl {
let connected_at_cell = match row.take_required_cell("connected_at") {
Ok(cell) => cell,
Err(_) => {
if !row.cells.keys().all(|k| k.starts_with("chid:")) {
if !is_incomplete_router_record(&row.cells) {
return Err(DbError::Integrity(
"Expected column: connected_at".to_owned(),
Some(format!("{row:#?}")),
));
}
// Special case for:
// 1) A migration code bug caused some channels to be migrated
// after a user record failed to be added. Returning None will
// eventually re-trigger their migration:
// Special case incomplete records: they're equivalent to no
// user exists. Incompletes caused by the migration bug in #640
// will have their migration re-triggered by returning None:
// https://github.com/mozilla-services/autopush-rs/pull/640
// 2) When router TTLs are enabled: add_channel can write chid
// cells with later expiry times than the other router cells
trace!("🉑 Dropping a chid-only user record for {}", row_key);
trace!("🉑 Dropping an incomplete user record for {}", row_key);
self.metrics
.incr_with_tags("database.drop_user")
.with_tag("reason", "chid_only")
.with_tag("reason", "incomplete_record")
.send();
self.remove_user(uaid).await?;
return Ok(None);
Expand Down Expand Up @@ -1708,6 +1724,21 @@ mod tests {
client.remove_user(&uaid).await.unwrap();
}

#[actix_rt::test]
async fn lingering_current_timestamp() {
let client = new_client().unwrap();
let uaid = gen_test_uaid();
client.remove_user(&uaid).await.unwrap();

client
.increment_storage(&uaid, ms_since_epoch())
.await
.unwrap();
assert!(client.get_user(&uaid).await.unwrap().is_none());

client.remove_user(&uaid).await.unwrap();
}

#[actix_rt::test]
async fn lingering_chid_w_version_record() {
let client = new_client().unwrap();
Expand All @@ -1717,12 +1748,7 @@ mod tests {

client.add_channel(&uaid, &chid).await.unwrap();
assert!(client.remove_channel(&uaid, &chid).await.unwrap());

let e = client.get_user(&uaid).await.unwrap_err();
let DbError::Integrity(_, Some(row)) = e else {
panic!("Expected DbError::Integrity");
};
assert!(row.contains("version"));
assert!(client.get_user(&uaid).await.unwrap().is_none());

client.remove_user(&uaid).await.unwrap();
}
Expand Down