Skip to content

Commit

Permalink
fix: optimize batch commit mutations
Browse files Browse the repository at this point in the history
write the pending bsos immediately when committing a batch instead of
queueing them in the batch_bsos, so we don't pay twice for their
mutations

also

- optimize batch append to insert all items in one write
- interleave BsoExpiry w/ user_collections

Closes #333, #318
  • Loading branch information
pjenvey committed Nov 26, 2019
1 parent 3de7f41 commit 5dd3c65
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 50 deletions.
3 changes: 2 additions & 1 deletion spanner-2019-10-01.ddl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ CREATE TABLE bsos (
INTERLEAVE IN user_collections;

CREATE INDEX BsoExpiry
ON bsos(expiry);
ON bsos(expiry),
INTERLEAVE IN user_collections;

CREATE TABLE collections (
collection_id INT64 NOT NULL,
Expand Down
116 changes: 81 additions & 35 deletions src/db/spanner/batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};

use googleapis_raw::spanner::v1::type_pb::TypeCode;
use googleapis_raw::spanner::v1::type_pb::{StructType, Type, TypeCode};
use protobuf::{
well_known_types::{ListValue, Value},
RepeatedField,
};
use uuid::Uuid;

use super::support::null_value;
use super::support::{null_value, struct_type_field};
use super::{
models::{Result, SpannerDb, DEFAULT_BSO_TTL, PRETOUCH_TS},
support::as_value,
Expand Down Expand Up @@ -213,41 +217,83 @@ pub fn do_append(
batch_id: String,
bsos: Vec<params::PostCollectionBso>,
) -> Result<()> {
for bso in bsos {
let mut sqlparams = params! {
"fxa_uid" => user_id.fxa_uid.clone(),
"fxa_kid" => user_id.fxa_kid.clone(),
"collection_id" => collection_id.to_string(),
"batch_id" => batch_id.clone(),
"batch_bso_id" => bso.id,
};

sqlparams.insert(
"sortindex".to_string(),
bso.sortindex
// Pass an array of struct objects as @values (for UNNEST), e.g.:
// [("<fxa_uid>", "<fxa_kid>", 101, "ba1", "bso1", NULL, "payload1", NULL),
// ("<fxa_uid>", "<fxa_kid>", 101, "ba1", "bso2", NULL, "payload2", NULL)]
// https://cloud.google.com/spanner/docs/structs#creating_struct_objects
let rows: Vec<_> = bsos
.into_iter()
.map(|bso| {
let sortindex = bso
.sortindex
.map(|sortindex| as_value(sortindex.to_string()))
.unwrap_or_else(null_value),
);
sqlparams.insert(
"payload".to_string(),
bso.payload.map(as_value).unwrap_or_else(null_value),
);
sqlparams.insert(
"ttl".to_string(),
bso.ttl
.unwrap_or_else(null_value);
let payload = bso.payload.map(as_value).unwrap_or_else(null_value);
let ttl = bso
.ttl
.map(|ttl| as_value(ttl.to_string()))
.unwrap_or_else(null_value),
);
.unwrap_or_else(null_value);

let mut row = ListValue::new();
row.set_values(RepeatedField::from_vec(vec![
as_value(user_id.fxa_uid.clone()),
as_value(user_id.fxa_kid.clone()),
as_value(collection_id.to_string()),
as_value(batch_id.clone()),
as_value(bso.id),
sortindex,
payload,
ttl,
]));
let mut value = Value::new();
value.set_list_value(row);
value
})
.collect();

let mut list_values = ListValue::new();
list_values.set_values(RepeatedField::from_vec(rows));
let mut values = Value::new();
values.set_list_value(list_values);

// values' type is an ARRAY of STRUCTs
let mut param_type = Type::new();
param_type.set_code(TypeCode::ARRAY);
let mut array_type = Type::new();
array_type.set_code(TypeCode::STRUCT);

// STRUCT requires definition of all its field types
let mut struct_type = StructType::new();
let fields = vec![
("fxa_uid", TypeCode::STRING),
("fxa_kid", TypeCode::STRING),
("collection_id", TypeCode::INT64),
("batch_id", TypeCode::STRING),
("batch_bso_id", TypeCode::STRING),
("sortindex", TypeCode::INT64),
("payload", TypeCode::STRING),
("ttl", TypeCode::INT64),
]
.into_iter()
.map(|(name, field_type)| struct_type_field(name, field_type))
.collect();
struct_type.set_fields(RepeatedField::from_vec(fields));
array_type.set_struct_type(struct_type);
param_type.set_array_element_type(array_type);

let mut sqlparams = HashMap::new();
sqlparams.insert("values".to_owned(), values);
let mut sqlparam_types = HashMap::new();
sqlparam_types.insert("values".to_owned(), param_type);
db.sql(
"INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id,
sortindex, payload, ttl)
SELECT * FROM UNNEST(@values)",
)?
.params(sqlparams)
.param_types(sqlparam_types)
.execute(&db.conn)?;

db.sql(
"INSERT INTO batch_bsos (fxa_uid, fxa_kid, collection_id, batch_id, batch_bso_id,
sortindex, payload, ttl)
VALUES (@fxa_uid, @fxa_kid, @collection_id, @batch_id, @batch_bso_id,
@sortindex, @payload, @ttl)",
)?
.params(sqlparams)
.execute(&db.conn)?;
}
Ok(())
}

Expand Down
10 changes: 9 additions & 1 deletion src/db/spanner/support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
web::extractors::HawkIdentifier,
};

use googleapis_raw::spanner::v1::type_pb::{Type, TypeCode};
use googleapis_raw::spanner::v1::type_pb::{StructType_Field, Type, TypeCode};

type ParamValue = protobuf::well_known_types::Value;

Expand All @@ -39,6 +39,14 @@ pub fn as_type(v: TypeCode) -> Type {
t.set_code(v);
t
}

pub fn struct_type_field(name: &str, field_type: TypeCode) -> StructType_Field {
let mut field = StructType_Field::new();
field.set_name(name.to_owned());
field.set_field_type(as_type(field_type));
field
}

pub fn as_list_value(
string_values: impl Iterator<Item = String>,
) -> protobuf::well_known_types::Value {
Expand Down
60 changes: 47 additions & 13 deletions src/web/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ pub fn post_collection_batch(
}))
};

let commit = breq.commit;
let db = coll.db.clone();
let user_id = coll.user_id.clone();
let collection = coll.collection.clone();
Expand All @@ -252,23 +253,56 @@ pub fn post_collection_batch(
let mut failed = coll.bsos.invalid.clone();
let bso_ids: Vec<_> = coll.bsos.valid.iter().map(|bso| bso.id.clone()).collect();

coll.db
.append_to_batch(params::AppendToBatch {
if commit && !coll.bsos.valid.is_empty() {
// There's pending items to append to the batch but since we're
// committing, write them to bsos immediately. Otherwise under
// Spanner we would pay twice the mutations for those pending
// items (once writing them to to batch_bsos, then again
// writing them to bsos)

// NOTE: Unfortunately this means we make two calls to
// touch_collection (in post_bsos and then commit_batch). The
// second touch is redundant, writing the same timestamp
Either::A(
coll.db
.post_bsos(params::PostBsos {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
// XXX: why does BatchBsoBody exist (it's the same struct
// as PostCollectionBso)?
bsos: coll
.bsos
.valid
.into_iter()
.map(|batch_bso| params::PostCollectionBso {
id: batch_bso.id,
sortindex: batch_bso.sortindex,
payload: batch_bso.payload,
ttl: batch_bso.ttl,
})
.collect(),
failed: Default::default(),
})
.and_then(|_| future::ok(())),
)
} else {
Either::B(coll.db.append_to_batch(params::AppendToBatch {
user_id: coll.user_id.clone(),
collection: coll.collection.clone(),
id: id.clone(),
bsos: coll.bsos.valid.into_iter().map(From::from).collect(),
})
.then(move |result| {
match result {
Ok(_) => success.extend(bso_ids),
Err(e) if e.is_conflict() => return future::err(e),
Err(_) => {
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
}
};
future::ok((id, success, failed))
})
}))
}
.then(move |result| {
match result {
Ok(_) => success.extend(bso_ids),
Err(e) if e.is_conflict() => return future::err(e),
Err(_) => {
failed.extend(bso_ids.into_iter().map(|id| (id, "db error".to_owned())))
}
};
future::ok((id, success, failed))
})
})
.map_err(From::from)
.and_then(move |(id, success, failed)| {
Expand Down

0 comments on commit 5dd3c65

Please sign in to comment.