Skip to content

Commit

Permalink
fix: convert user_id into bigint
Browse files Browse the repository at this point in the history
The MySQL schema defines userid columns as 32 bit integers inteded for use by the token server which expects 64 bit integers.
This change creates a new migration to convert the columns and to modify the `src/db/mysql/schema.rs` to reflect those changes.

Closes #470
  • Loading branch information
Emmanuel-Melon committed Apr 3, 2020
1 parent 75ba0e1 commit ab2606d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
10 changes: 5 additions & 5 deletions src/db/mysql/batch.rs
Expand Up @@ -15,7 +15,7 @@ use super::{
use crate::db::{params, results, DbError, DbErrorKind, BATCH_LIFETIME};

pub fn create(db: &MysqlDb, params: params::CreateBatch) -> Result<results::CreateBatch> {
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let timestamp = db.timestamp().as_i64();
let bsos = bsos_to_batch_string(&params.bsos)?;
Expand All @@ -40,7 +40,7 @@ pub fn create(db: &MysqlDb, params: params::CreateBatch) -> Result<results::Crea

pub fn validate(db: &MysqlDb, params: params::ValidateBatch) -> Result<bool> {
let id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let exists = batches::table
.select(sql::<Integer>("1"))
Expand All @@ -55,7 +55,7 @@ pub fn validate(db: &MysqlDb, params: params::ValidateBatch) -> Result<bool> {

pub fn append(db: &MysqlDb, params: params::AppendToBatch) -> Result<()> {
let id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
let bsos = bsos_to_batch_string(&params.bsos)?;
let affected_rows = update(batches::table)
Expand All @@ -81,7 +81,7 @@ pub struct Batch {

pub fn get(db: &MysqlDb, params: params::GetBatch) -> Result<Option<results::GetBatch>> {
let id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
Ok(batches::table
.select((batches::id, batches::bsos, batches::expiry))
Expand All @@ -100,7 +100,7 @@ pub fn get(db: &MysqlDb, params: params::GetBatch) -> Result<Option<results::Get

pub fn delete(db: &MysqlDb, params: params::DeleteBatch) -> Result<()> {
let id = decode_id(&params.id)?;
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = db.get_collection_id(&params.collection)?;
diesel::delete(batches::table)
.filter(batches::user_id.eq(&user_id))
Expand Down
64 changes: 32 additions & 32 deletions src/db/mysql/models.rs
Expand Up @@ -141,7 +141,7 @@ impl MysqlDb {
/// than explicit locking, but our ops team have expressed concerns about
/// the efficiency of that approach at scale.
pub fn lock_for_read_sync(&self, params: params::LockCollection) -> Result<()> {
let user_id = params.user_id.legacy_id as u32;
let user_id = params.user_id.legacy_id as i64;
let collection_id =
self.get_collection_id(&params.collection)
.or_else(|e| match e.kind() {
Expand All @@ -156,7 +156,7 @@ impl MysqlDb {
.session
.borrow()
.coll_locks
.get(&(user_id, collection_id))
.get(&(user_id as u32, collection_id))
.is_some()
{
return Ok(());
Expand All @@ -166,7 +166,7 @@ impl MysqlDb {
self.begin(false)?;
let modified = user_collections::table
.select(user_collections::modified)
.filter(user_collections::user_id.eq(user_id as i32))
.filter(user_collections::user_id.eq(user_id))
.filter(user_collections::collection_id.eq(collection_id))
.lock_in_share_mode()
.first(&self.conn)
Expand All @@ -176,24 +176,24 @@ impl MysqlDb {
self.session
.borrow_mut()
.coll_modified_cache
.insert((user_id, collection_id), modified);
.insert((user_id as u32, collection_id), modified); // why does it still expect a u32 int?
}
// XXX: who's responsible for unlocking (removing the entry)
self.session
.borrow_mut()
.coll_locks
.insert((user_id, collection_id), CollectionLock::Read);
.insert((user_id as u32, collection_id), CollectionLock::Read);
Ok(())
}

pub fn lock_for_write_sync(&self, params: params::LockCollection) -> Result<()> {
let user_id = params.user_id.legacy_id as u32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_or_create_collection_id(&params.collection)?;
if let Some(CollectionLock::Read) = self
.session
.borrow()
.coll_locks
.get(&(user_id, collection_id))
.get(&(user_id as u32, collection_id))
{
Err(DbError::internal("Can't escalate read-lock to write-lock"))?
}
Expand All @@ -202,7 +202,7 @@ impl MysqlDb {
self.begin(true)?;
let modified = user_collections::table
.select(user_collections::modified)
.filter(user_collections::user_id.eq(user_id as i32))
.filter(user_collections::user_id.eq(user_id))
.filter(user_collections::collection_id.eq(collection_id))
.for_update()
.first(&self.conn)
Expand All @@ -216,12 +216,12 @@ impl MysqlDb {
self.session
.borrow_mut()
.coll_modified_cache
.insert((user_id, collection_id), modified);
.insert((user_id as u32, collection_id), modified);
}
self.session
.borrow_mut()
.coll_locks
.insert((user_id, collection_id), CollectionLock::Write);
.insert((user_id as u32, collection_id), CollectionLock::Write);
Ok(())
}

Expand Down Expand Up @@ -264,15 +264,15 @@ impl MysqlDb {
collection_id = COLLECTION_ID,
modified = LAST_MODIFIED
))
.bind::<Integer, _>(user_id)
.bind::<BigInt, _>(user_id as i64)
.bind::<Integer, _>(TOMBSTONE)
.bind::<BigInt, _>(self.timestamp().as_i64())
.execute(&self.conn)?;
Ok(())
}

pub fn delete_storage_sync(&self, user_id: HawkIdentifier) -> Result<()> {
let user_id = user_id.legacy_id as i32;
let user_id = user_id.legacy_id as i64;
self.begin(true)?;
// Delete user data.
delete(bso::table)
Expand All @@ -292,14 +292,14 @@ impl MysqlDb {
&self,
params: params::DeleteCollection,
) -> Result<SyncTimestamp> {
let user_id = params.user_id.legacy_id;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection)?;
let mut count = delete(bso::table)
.filter(bso::user_id.eq(user_id as i32))
.filter(bso::user_id.eq(user_id))
.filter(bso::collection_id.eq(&collection_id))
.execute(&self.conn)?;
count += delete(user_collections::table)
.filter(user_collections::user_id.eq(user_id as i32))
.filter(user_collections::user_id.eq(user_id))
.filter(user_collections::collection_id.eq(&collection_id))
.execute(&self.conn)?;
if count == 0 {
Expand Down Expand Up @@ -433,7 +433,7 @@ impl MysqlDb {
);

sql_query(q)
.bind::<Integer, _>(user_id as i32) // XXX:
.bind::<BigInt, _>(user_id as i64) // XXX:
.bind::<Integer, _>(&collection_id)
.bind::<Text, _>(&bso.id)
.bind::<Nullable<Integer>, _>(sortindex)
Expand All @@ -447,7 +447,7 @@ impl MysqlDb {
}

pub fn get_bsos_sync(&self, params: params::GetBsos) -> Result<results::GetBsos> {
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection)?;
let BsoQueryParams {
newer,
Expand Down Expand Up @@ -524,7 +524,7 @@ impl MysqlDb {
}

pub fn get_bso_ids_sync(&self, params: params::GetBsos) -> Result<results::GetBsoIds> {
let user_id = params.user_id.legacy_id as i32;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection)?;
let BsoQueryParams {
newer,
Expand Down Expand Up @@ -593,7 +593,7 @@ impl MysqlDb {
}

pub fn get_bso_sync(&self, params: params::GetBso) -> Result<Option<results::GetBso>> {
let user_id = params.user_id.legacy_id;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection)?;
Ok(bso::table
.select((
Expand All @@ -603,7 +603,7 @@ impl MysqlDb {
bso::sortindex,
bso::expiry,
))
.filter(bso::user_id.eq(user_id as i32))
.filter(bso::user_id.eq(user_id))
.filter(bso::collection_id.eq(&collection_id))
.filter(bso::id.eq(&params.id))
.filter(bso::expiry.ge(self.timestamp().as_i64()))
Expand All @@ -615,7 +615,7 @@ impl MysqlDb {
let user_id = params.user_id.legacy_id;
let collection_id = self.get_collection_id(&params.collection)?;
let affected_rows = delete(bso::table)
.filter(bso::user_id.eq(user_id as i32))
.filter(bso::user_id.eq(user_id as i64))
.filter(bso::collection_id.eq(&collection_id))
.filter(bso::id.eq(params.id))
.filter(bso::expiry.gt(&self.timestamp().as_i64()))
Expand All @@ -627,10 +627,10 @@ impl MysqlDb {
}

pub fn delete_bsos_sync(&self, params: params::DeleteBsos) -> Result<results::DeleteBsos> {
let user_id = params.user_id.legacy_id;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection)?;
delete(bso::table)
.filter(bso::user_id.eq(user_id as i32))
.filter(bso::user_id.eq(user_id))
.filter(bso::collection_id.eq(&collection_id))
.filter(bso::id.eq_any(params.ids))
.execute(&self.conn)?;
Expand Down Expand Up @@ -671,7 +671,7 @@ impl MysqlDb {
}

pub fn get_storage_timestamp_sync(&self, user_id: HawkIdentifier) -> Result<SyncTimestamp> {
let user_id = user_id.legacy_id as i32;
let user_id = user_id.legacy_id as i64;
let modified = user_collections::table
.select(max(user_collections::modified))
.filter(user_collections::user_id.eq(user_id))
Expand All @@ -696,19 +696,19 @@ impl MysqlDb {
}
user_collections::table
.select(user_collections::modified)
.filter(user_collections::user_id.eq(user_id as i32))
.filter(user_collections::user_id.eq(user_id as i64))
.filter(user_collections::collection_id.eq(collection_id))
.first(&self.conn)
.optional()?
.ok_or_else(|| DbErrorKind::CollectionNotFound.into())
}

pub fn get_bso_timestamp_sync(&self, params: params::GetBsoTimestamp) -> Result<SyncTimestamp> {
let user_id = params.user_id.legacy_id;
let user_id = params.user_id.legacy_id as i64;
let collection_id = self.get_collection_id(&params.collection)?;
let modified = bso::table
.select(bso::modified)
.filter(bso::user_id.eq(user_id as i32))
.filter(bso::user_id.eq(user_id))
.filter(bso::collection_id.eq(&collection_id))
.filter(bso::id.eq(&params.id))
.first::<i64>(&self.conn)
Expand All @@ -730,7 +730,7 @@ impl MysqlDb {
user_id = USER_ID,
modified = LAST_MODIFIED
))
.bind::<Integer, _>(user_id.legacy_id as i32)
.bind::<BigInt, _>(user_id.legacy_id as i64)
.bind::<Integer, _>(TOMBSTONE)
.load::<UserCollectionsResult>(&self.conn)?
.into_iter()
Expand Down Expand Up @@ -806,7 +806,7 @@ impl MysqlDb {
modified = LAST_MODIFIED
);
sql_query(upsert)
.bind::<Integer, _>(user_id as i32)
.bind::<BigInt, _>(user_id as i64)
.bind::<Integer, _>(&collection_id)
.bind::<BigInt, _>(&self.timestamp().as_i64())
.bind::<BigInt, _>(&self.timestamp().as_i64())
Expand All @@ -820,7 +820,7 @@ impl MysqlDb {
) -> Result<results::GetStorageUsage> {
let total_size = bso::table
.select(sql::<Nullable<BigInt>>("SUM(LENGTH(payload))"))
.filter(bso::user_id.eq(user_id.legacy_id as i32))
.filter(bso::user_id.eq(user_id.legacy_id as i64))
.filter(bso::expiry.gt(&self.timestamp().as_i64()))
.get_result::<Option<i64>>(&self.conn)?;
Ok(total_size.unwrap_or_default() as u64)
Expand All @@ -832,7 +832,7 @@ impl MysqlDb {
) -> Result<results::GetCollectionUsage> {
let counts = bso::table
.select((bso::collection_id, sql::<BigInt>("SUM(LENGTH(payload))")))
.filter(bso::user_id.eq(user_id.legacy_id as i32))
.filter(bso::user_id.eq(user_id.legacy_id as i64))
.filter(bso::expiry.gt(&self.timestamp().as_i64()))
.group_by(bso::collection_id)
.load(&self.conn)?
Expand All @@ -853,7 +853,7 @@ impl MysqlDb {
collection_id = COLLECTION_ID
)),
))
.filter(bso::user_id.eq(user_id.legacy_id as i32))
.filter(bso::user_id.eq(user_id.legacy_id as i64))
.filter(bso::expiry.gt(&self.timestamp().as_i64()))
.group_by(bso::collection_id)
.load(&self.conn)?
Expand Down

0 comments on commit ab2606d

Please sign in to comment.