Skip to content

Commit

Permalink
fix: fix add/update_user version field handling on migrations (#619)
Browse files Browse the repository at this point in the history
- explicitly add a version (which there is None from DynamoDB) in dual
mode migration
- Bigtable::add_user now writes the initial version passed in
- Bigtable::update_user continues always writing a new version but now
modifies the &mut User upon success

Closes: SYNC-4147
  • Loading branch information
pjenvey committed Feb 16, 2024
1 parent d41db4b commit 1ff88e7
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl UnidentifiedClient {
}
user.connected_at = connected_at;
user.set_last_connect();
if !self.app_state.db.update_user(&user).await? {
if !self.app_state.db.update_user(&mut user).await? {
let _ = self.app_state.metrics.incr("ua.already_connected");
return Err(SMErrorKind::AlreadyConnected.into());
}
Expand Down
2 changes: 1 addition & 1 deletion autoendpoint/src/routers/adm/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Router for AdmRouter {
);
user.router_data = Some(router_data);

if !self.db.update_user(&user).await? {
if !self.db.update_user(&mut user).await? {
// Unlikely to occur on mobile records
return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into());
}
Expand Down
4 changes: 2 additions & 2 deletions autoendpoint/src/routes/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ pub async fn update_token_route(
let router_data = router.register(&router_data_input, &path_args.app_id)?;

// Update the user in the database
let user = User {
let mut user = User {
uaid: path_args.uaid,
router_type: path_args.router_type.to_string(),
router_data: Some(router_data),
..Default::default()
};
trace!("🌍 Updating user with UAID {}", user.uaid);
trace!("🌍 user = {:?}", user);
if !app_state.db.update_user(&user).await? {
if !app_state.db.update_user(&mut user).await? {
// Unlikely to occur on mobile records
return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into());
}
Expand Down
52 changes: 36 additions & 16 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,12 @@ impl BigTableClientImpl {
Ok(notif)
}

fn user_to_row(&self, user: &User) -> Row {
/// Return a Row for writing from a [User] and a `version`
///
/// `version` is specified as an argument (ignoring [User::version]) so
/// that [update_user] may specify a new version to write before modifying
/// the [User] struct
fn user_to_row(&self, user: &User, version: &Uuid) -> Row {
let row_key = user.uaid.simple().to_string();
let mut row = Row::new(row_key);
let expiry = std::time::SystemTime::now() + Duration::from_secs(MAX_ROUTER_TTL);
Expand Down Expand Up @@ -611,8 +616,12 @@ impl BigTableClientImpl {
});
};

// Always write a newly generated version
cells.push(new_version_cell(expiry));
cells.push(cell::Cell {
qualifier: "version".to_owned(),
value: (*version).into(),
timestamp: expiry,
..Default::default()
});

row.add_cells(ROUTER_FAMILY, cells);
row
Expand Down Expand Up @@ -663,7 +672,12 @@ impl DbClient for BigTableClientImpl {
/// add user to the database
async fn add_user(&self, user: &User) -> DbResult<()> {
trace!("🉑 Adding user");
let row = self.user_to_row(user);
let Some(ref version) = user.version else {
return Err(DbError::General(
"add_user expected a user version field".to_owned(),
));
};
let row = self.user_to_row(user, version);

// Only add when the user doesn't already exist
let mut row_key_filter = RowFilter::default();
Expand All @@ -679,18 +693,24 @@ impl DbClient for BigTableClientImpl {
/// BigTable doesn't really have the concept of an "update". You simply write the data and
/// the individual cells create a new version. Depending on the garbage collection rules for
/// the family, these can either persist or be automatically deleted.
async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
let Some(ref version) = user.version else {
return Err(DbError::General("Expected a user version field".to_owned()));
return Err(DbError::General(
"update_user expected a user version field".to_owned(),
));
};

let mut filters = vec![router_gc_policy_filter()];
filters.extend(version_filter(version));
let filter = filter_chain(filters);

Ok(self
.check_and_mutate_row(self.user_to_row(user), filter, true)
.await?)
let new_version = Uuid::new_v4();
// Always write a newly generated version
let row = self.user_to_row(user, &new_version);

let predicate_matched = self.check_and_mutate_row(row, filter, true).await?;
user.version = Some(new_version);
Ok(predicate_matched)
}

async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>> {
Expand Down Expand Up @@ -1286,11 +1306,11 @@ mod tests {
// now ensure that we can update a user that's after the time we set
// prior. first ensure that we can't update a user that's before the
// time we set prior to the last write
let updated = User {
let mut updated = User {
connected_at,
..test_user.clone()
};
let result = client.update_user(&updated).await;
let result = client.update_user(&mut updated).await;
assert!(result.is_ok());
assert!(!result.unwrap());

Expand All @@ -1299,11 +1319,11 @@ mod tests {
assert_eq!(fetched.connected_at, fetched2.connected_at);

// and make sure we can update a record with a later connected_at time.
let updated = User {
let mut updated = User {
connected_at: fetched.connected_at + 300,
..fetched2
};
let result = client.update_user(&updated).await;
let result = client.update_user(&mut updated).await;
assert!(result.is_ok());
assert!(result.unwrap());
assert_ne!(
Expand Down Expand Up @@ -1477,13 +1497,13 @@ mod tests {
client.remove_user(&uaid).await.unwrap();

client.add_user(&user).await.unwrap();
let user = client.get_user(&uaid).await.unwrap().unwrap();
assert!(client.update_user(&user).await.unwrap());
let mut user = client.get_user(&uaid).await.unwrap().unwrap();
assert!(client.update_user(&mut user.clone()).await.unwrap());

let fetched = client.get_user(&uaid).await.unwrap().unwrap();
assert_ne!(user.version, fetched.version);
// should now fail w/ a stale version
assert!(!client.update_user(&user).await.unwrap());
assert!(!client.update_user(&mut user).await.unwrap());

client.remove_user(&uaid).await.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/src/db/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub trait DbClient: Send + Sync {
/// update will not occur if the user does not already exist, has a
/// different router type, or has a newer `connected_at` timestamp.
// TODO: make the bool a #[must_use]
async fn update_user(&self, user: &User) -> DbResult<bool>;
async fn update_user(&self, user: &mut User) -> DbResult<bool>;

/// Read a user from the database
async fn get_user(&self, uaid: &Uuid) -> DbResult<Option<User>>;
Expand Down
8 changes: 6 additions & 2 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl DbClient for DualClientImpl {
Ok(result)
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
// If the UAID is in the allowance, move them to the new data store
let (target, is_primary) = self.allot(&user.uaid).await?;
let result = target.update_user(user).await?;
Expand All @@ -181,9 +181,13 @@ impl DbClient for DualClientImpl {
Ok(None) => {
if is_primary {
// The user wasn't in the current primary, so fetch them from the secondary.
if let Ok(Some(user)) = self.secondary.get_user(uaid).await {
if let Ok(Some(mut user)) = self.secondary.get_user(uaid).await {
// copy the user record over to the new data store.
debug!("⚖ Found user record in secondary, moving to primary");
// Users read from DynamoDB lack a version field needed
// for Bigtable
debug_assert!(user.version.is_none());
user.version = Some(Uuid::new_v4());
self.primary.add_user(&user).await?;
let channels = self.secondary.get_channels(uaid).await?;
self.primary.add_channels(uaid, channels).await?;
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl DbClient for DdbClientImpl {
Ok(())
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
let mut user_map = serde_dynamodb::to_hashmap(&user)?;
user_map.remove("uaid");
let input = UpdateItemInput {
Expand Down
2 changes: 1 addition & 1 deletion autopush-common/src/db/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl DbClient for Arc<MockDbClient> {
Arc::as_ref(self).add_user(user).await
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
async fn update_user(&self, user: &mut User) -> DbResult<bool> {
Arc::as_ref(self).update_user(user).await
}

Expand Down

0 comments on commit 1ff88e7

Please sign in to comment.