Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce end_median for dual. #663

Merged
merged 5 commits into from
Mar 19, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 59 additions & 16 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub struct DualClientImpl {
/// Hex value to use to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
median: Option<u8>,
/// Ending octet to use for more distributed account migration
end_median: Option<u8>,
metrics: Arc<StatsdClient>,
}

Expand All @@ -54,10 +56,15 @@ pub struct DualDbSettings {
/// as well as account and channel additions/deletions.
#[serde(default = "default_true")]
write_to_secondary: bool,
/// Hex value to use to specify the first byte of the median offset.
/// Hex value to specify the first byte of the median offset.
/// e.g. "0a" will start from include all UUIDs upto and including "0a"
#[serde(default)]
median: Option<String>,
/// Hex value to specify the last byte of the median offset to include.
/// this value is "OR"ed withe "median" to produce a more distributed set of
/// uaids to migrate
#[serde(default)]
end_median: Option<String>,
}

impl DualClientImpl {
Expand Down Expand Up @@ -96,6 +103,25 @@ impl DualClientImpl {
} else {
None
};
// determine which uaids to move based on the last byte of their UAID.
// This should reduce the hot table problem.
let end_median = if let Some(end_median) = db_settings.end_median {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also emit end_median in the startup metric as another tag alongside median

let end_median = hex::decode(end_median).map_err(|e| {
DbError::General(format!(
"Could not parse end_median string. Please use a valid Hex identifier: {:?}",
e,
))
})?[0];
debug!(
"⚖ Setting end_median to {:02} ({})",
hex::encode([end_median]),
&end_median
);
Some(end_median)
} else {
None
};

let primary = BigTableClientImpl::new(metrics.clone(), &db_settings.primary)?;
let secondary = DdbClientImpl::new(metrics.clone(), &db_settings.secondary)?;
debug!("⚖ Got primary and secondary");
Expand All @@ -110,6 +136,7 @@ impl DualClientImpl {
primary,
secondary: secondary.clone(),
median,
end_median,
write_to_secondary: db_settings.write_to_secondary,
metrics,
})
Expand All @@ -119,19 +146,26 @@ impl DualClientImpl {
/// Wrapper functions to allow us to change which data store system actually manages the
/// user allocation routing table.
impl DualClientImpl {
fn should_migrate(&self, uaid: &Uuid) -> bool {
let bytes = uaid.as_bytes();
let mut result: bool = false;
if let Some(median) = self.median {
result |= bytes.first() <= Some(&median);
};
if let Some(end_median) = self.end_median {
result |= bytes.last() <= Some(&end_median);
}
result
}
/// Route and assign a user to the appropriate back end based on the defined
/// allowance
/// Returns the dbclient to use and whether or not it's the primary database.
async fn allot<'a>(&'a self, uaid: &Uuid) -> DbResult<(Box<&'a dyn DbClient>, bool)> {
let target: (Box<&'a dyn DbClient>, bool) = if let Some(median) = self.median {
if uaid.as_bytes()[0] <= median {
debug!("⚖ Routing user to Bigtable");
(Box::new(&self.primary), true)
} else {
(Box::new(&self.secondary), false)
}
} else {
let target: (Box<&'a dyn DbClient>, bool) = if self.should_migrate(uaid) {
debug!("⚖ Routing user to Bigtable");
(Box::new(&self.primary), true)
} else {
(Box::new(&self.secondary), false)
};
debug!("⚖ alloting to {}", target.0.name());
Ok(target)
Expand Down Expand Up @@ -472,7 +506,7 @@ mod test {
use serde_json::json;
use std::str::FromStr;

fn test_args(median: Option<&str>) -> String {
fn test_args(median: Option<&str>, end_median: Option<&str>) -> String {
json!({
"primary": {
"dsn": "grpc://bigtable.googleapis.com", // Note that this is the general endpoint.
Expand All @@ -490,6 +524,7 @@ mod test {
}).to_string(),
},
"median": median.to_owned(),
"end_median": end_median.to_owned(),
"write_to_secondary": false,
})
.to_string()
Expand All @@ -499,7 +534,7 @@ mod test {
/// documentation for how the db_settings argument should be structured
#[test]
fn arg_parsing() -> DbResult<()> {
let arg_str = test_args(None);
let arg_str = test_args(None, None);
// the output string looks like:
/*
"{\"primary\":{\"db_settings\":\"{\\\"message_family\\\":\\\"message\\\",\\\"router_family\\\":\\\"router\\\",\\\"table_name\\\":\\\"projects/some-project/instances/some-instance/tables/some-table\\\"}\",\"dsn\":\"grpc://bigtable.googleapis.com\"},\"secondary\":{\"db_settings\":\"{\\\"message_table\\\":\\\"test_message\\\",\\\"router_table\\\":\\\"test_router\\\"}\",\"dsn\":\"http://localhost:8000/\"}}"
Expand Down Expand Up @@ -527,24 +562,32 @@ mod test {

#[actix_rt::test]
async fn allocation() -> DbResult<()> {
let arg_str = test_args(Some("0A"));
let arg_str = test_args(Some("0A"), Some("88"));
let metrics = Arc::new(StatsdClient::builder("", NopMetricSink).build());
let dual_settings = DbSettings {
dsn: Some("dual".to_owned()),
db_settings: arg_str,
};
let dual = DualClientImpl::new(metrics, &dual_settings)?;

// Should be included.
let low_uaid = Uuid::from_str("04DDDDDD-2040-4b4d-be3d-a340fc2d15a6").unwrap();
// Should be excluded.
let hi_uaid = Uuid::from_str("0BDDDDDD-2040-4b4d-be3d-a340fc2d15a6").unwrap();
// Should be included (Note: high end median)
let low_uaid = Uuid::from_str("04DDDDDD-1234-1234-1234-0000000000CC").unwrap();
let (result, is_primary) = dual.allot(&low_uaid).await?;
assert_eq!(result.name(), dual.primary.name());
assert!(is_primary);

// Should be excluded (Note: high end_median)
let hi_uaid = Uuid::from_str("0BDDDDDD-1234-1234-1234-0000000000CC").unwrap();
let (result, is_primary) = dual.allot(&hi_uaid).await?;
assert_eq!(result.name(), dual.secondary.name());
assert!(!is_primary);

// Should be included (Note: high median with low end median)
let hi_end_uaid = Uuid::from_str("0BDDDDDD-1234-1234-1234-000000000080").unwrap();
let (result, is_primary) = dual.allot(&hi_end_uaid).await?;
assert_eq!(result.name(), dual.primary.name());
assert!(is_primary);

Ok(())
}
}