Skip to content

Commit

Permalink
fix: "recycle" the user object in update_token_route (#621)
Browse files Browse the repository at this point in the history
needed for bigtable's optimistic locking via version

Closes: SYNC-4148
  • Loading branch information
pjenvey committed Feb 20, 2024
1 parent 1ff88e7 commit 4bd4827
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 59 deletions.
10 changes: 6 additions & 4 deletions autoendpoint/src/extractors/registration_path_args_with_uaid.rs
Expand Up @@ -9,12 +9,14 @@ use futures::future::LocalBoxFuture;
use futures::FutureExt;
use uuid::Uuid;

use autopush_common::db::User;

/// An extension of `RegistrationPathArgs` which requires a `uaid` path arg.
/// The `uaid` is verified by checking if the user exists in the database.
pub struct RegistrationPathArgsWithUaid {
pub router_type: RouterType,
pub app_id: String,
pub uaid: Uuid,
pub user: User,
}

impl FromRequest for RegistrationPathArgsWithUaid {
Expand All @@ -37,14 +39,14 @@ impl FromRequest for RegistrationPathArgsWithUaid {
.map_err(|_| ApiErrorKind::NoUser)?;

// Verify that the user exists
if app_state.db.get_user(&uaid).await?.is_none() {
let Some(user) = app_state.db.get_user(&uaid).await? else {
return Err(ApiErrorKind::NoUser.into());
}
};

Ok(Self {
router_type: path_args.router_type,
app_id: path_args.app_id,
uaid,
user,
})
}
.boxed_local()
Expand Down
65 changes: 29 additions & 36 deletions autoendpoint/src/routes/registration.rs
Expand Up @@ -84,8 +84,9 @@ pub async fn unregister_user_route(
path_args: RegistrationPathArgsWithUaid,
app_state: Data<AppState>,
) -> ApiResult<HttpResponse> {
debug!("🌍 Unregistering UAID {}", path_args.uaid);
app_state.db.remove_user(&path_args.uaid).await?;
let uaid = path_args.user.uaid;
debug!("🌍 Unregistering UAID {uaid}");
app_state.db.remove_user(&uaid).await?;
Ok(HttpResponse::Ok().finish())
}

Expand All @@ -98,29 +99,28 @@ pub async fn update_token_route(
app_state: Data<AppState>,
) -> ApiResult<HttpResponse> {
// Re-register with router
debug!(
"🌍 Updating the token of UAID {} with the {} router",
path_args.uaid, path_args.router_type
);
let RegistrationPathArgsWithUaid {
router_type,
app_id,
mut user,
} = path_args;
let uaid = user.uaid;
debug!("🌍 Updating the token of UAID {uaid} with the {router_type} router");
trace!("token = {}", router_data_input.token);
let router = routers.get(path_args.router_type);
let router_data = router.register(&router_data_input, &path_args.app_id)?;
let router_data = router.register(&router_data_input, &app_id)?;

// Update the user in the database
let mut user = User {
uaid: path_args.uaid,
router_type: path_args.router_type.to_string(),
router_data: Some(router_data),
..Default::default()
};
trace!("🌍 Updating user with UAID {}", user.uaid);
trace!("🌍 user = {:?}", user);
user.router_type = path_args.router_type.to_string();
user.router_data = Some(router_data);
trace!("🌍 Updating user with UAID {uaid}");
trace!("🌍 user = {user:?}");
if !app_state.db.update_user(&mut user).await? {
// Unlikely to occur on mobile records
return Err(ApiErrorKind::General("Conditional update failed".to_owned()).into());
}

trace!("🌍 Finished updating token for UAID {}", user.uaid);
trace!("🌍 Finished updating token for UAID {uaid}");
Ok(HttpResponse::Ok().finish())
}

Expand All @@ -132,26 +132,24 @@ pub async fn new_channel_route(
app_state: Data<AppState>,
) -> ApiResult<HttpResponse> {
// Add the channel
debug!("🌍 Adding a channel to UAID {}", path_args.uaid);
let uaid = path_args.user.uaid;
debug!("🌍 Adding a channel to UAID {uaid}");
let channel_data = channel_data.map(Json::into_inner).unwrap_or_default();
let channel_id = channel_data.channel_id.unwrap_or_else(Uuid::new_v4);
trace!("🌍 channel_id = {}", channel_id);
app_state
.db
.add_channel(&path_args.uaid, &channel_id)
.await?;
trace!("🌍 channel_id = {channel_id}");
app_state.db.add_channel(&uaid, &channel_id).await?;

// Make the endpoint URL
trace!("🌍 Creating endpoint for the new channel");
let endpoint_url = make_endpoint(
&path_args.uaid,
&uaid,
&channel_id,
channel_data.key.as_deref(),
app_state.settings.endpoint_url().as_str(),
&app_state.fernet,
)
.map_err(ApiErrorKind::EndpointUrl)?;
trace!("endpoint = {}", endpoint_url);
trace!("endpoint = {endpoint_url}");

Ok(HttpResponse::Ok().json(serde_json::json!({
"channelID": channel_id,
Expand All @@ -165,11 +163,12 @@ pub async fn get_channels_route(
path_args: RegistrationPathArgsWithUaid,
app_state: Data<AppState>,
) -> ApiResult<HttpResponse> {
debug!("🌍 Getting channel IDs for UAID {}", path_args.uaid);
let channel_ids = app_state.db.get_channels(&path_args.uaid).await?;
let uaid = path_args.user.uaid;
debug!("🌍 Getting channel IDs for UAID {uaid}");
let channel_ids = app_state.db.get_channels(&uaid).await?;

Ok(HttpResponse::Ok().json(serde_json::json!({
"uaid": path_args.uaid,
"uaid": uaid,
"channelIDs": channel_ids
})))
}
Expand All @@ -187,17 +186,11 @@ pub async fn unregister_channel_route(
.expect("{chid} must be part of the path")
.parse::<Uuid>()
.map_err(|_| ApiErrorKind::NoSubscription)?;

debug!(
"🌍 Unregistering CHID {} for UAID {}",
channel_id, path_args.uaid
);
let uaid = path_args.user.uaid;
debug!("🌍 Unregistering CHID {channel_id} for UAID {uaid}");

incr_metric("ua.command.unregister", &app_state.metrics, &request);
let channel_did_exist = app_state
.db
.remove_channel(&path_args.uaid, &channel_id)
.await?;
let channel_did_exist = app_state.db.remove_channel(&uaid, &channel_id).await?;

if channel_did_exist {
Ok(HttpResponse::Ok().finish())
Expand Down
40 changes: 21 additions & 19 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Expand Up @@ -23,6 +23,7 @@ use crate::db::{
client::{DbClient, FetchMessageResponse},
error::{DbError, DbResult},
DbSettings, Notification, NotificationRecord, User, MAX_CHANNEL_TTL, MAX_ROUTER_TTL,
USER_RECORD_VERSION,
};

pub use self::metadata::MetadataBuilder;
Expand Down Expand Up @@ -581,6 +582,22 @@ impl BigTableClientImpl {
timestamp: expiry,
..Default::default()
},
cell::Cell {
qualifier: "record_version".to_owned(),
value: user
.record_version
.unwrap_or(USER_RECORD_VERSION)
.to_be_bytes()
.to_vec(),
timestamp: expiry,
..Default::default()
},
cell::Cell {
qualifier: "version".to_owned(),
value: (*version).into(),
timestamp: expiry,
..Default::default()
},
];

if let Some(router_data) = &user.router_data {
Expand All @@ -607,21 +624,6 @@ impl BigTableClientImpl {
..Default::default()
});
};
if let Some(record_version) = user.record_version {
cells.push(cell::Cell {
qualifier: "record_version".to_owned(),
value: record_version.to_be_bytes().to_vec(),
timestamp: expiry,
..Default::default()
});
};

cells.push(cell::Cell {
qualifier: "version".to_owned(),
value: (*version).into(),
timestamp: expiry,
..Default::default()
});

row.add_cells(ROUTER_FAMILY, cells);
row
Expand Down Expand Up @@ -727,6 +729,10 @@ impl DbClient for BigTableClientImpl {
"connected_at",
)?,
router_type: to_string(row.take_required_cell("router_type")?.value, "router_type")?,
record_version: Some(to_u64(
row.take_required_cell("record_version")?.value,
"record_version",
)?),
version: Some(
row.take_required_cell("version")?
.value
Expand All @@ -748,10 +754,6 @@ impl DbClient for BigTableClientImpl {
result.node_id = Some(to_string(cell.value, "node_id")?);
}

if let Some(cell) = row.take_cell("record_version") {
result.record_version = Some(to_u64(cell.value, "record_version")?)
}

if let Some(cell) = row.take_cell("current_timestamp") {
result.current_timestamp = Some(to_u64(cell.value, "current_timestamp")?)
}
Expand Down
2 changes: 2 additions & 0 deletions autopush-common/src/db/dual/mod.rs
Expand Up @@ -190,6 +190,8 @@ impl DbClient for DualClientImpl {
user.version = Some(Uuid::new_v4());
self.primary.add_user(&user).await?;
let channels = self.secondary.get_channels(uaid).await?;
// NOTE: add_channels doesn't write a new version:
// user.version is still valid
self.primary.add_channels(uaid, channels).await?;
return Ok(Some(user));
}
Expand Down

0 comments on commit 4bd4827

Please sign in to comment.