Skip to content

Commit

Permalink
fix(redis): delete leftovers after reading them to prevent race condi…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
gakonst committed Oct 3, 2019
1 parent e93f790 commit 02a9718
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
65 changes: 32 additions & 33 deletions crates/interledger-store-redis/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1992,9 +1992,12 @@ impl LeftoversStore for RedisStore {
) -> Box<dyn Future<Item = (Self::AssetType, u8), Error = ()> + Send> {
let mut pipe = redis::pipe();
pipe.atomic();
// get the amounts and instantly delete them
pipe.lrange(uncredited_amount_key(account_id.to_string()), 0, -1);
pipe.del(uncredited_amount_key(account_id.to_string()))
.ignore();
Box::new(
pipe.query_async(self.connection.as_ref().clone())
pipe.query_async(self.connection.clone())
.map_err(move |err| error!("Error getting uncredited_settlement_amount {:?}", err))
.and_then(move |(_, amounts): (_, Vec<AmountWithScale>)| {
// this call will only return 1 element
Expand All @@ -2014,22 +2017,18 @@ impl LeftoversStore for RedisStore {
account_id,
uncredited_settlement_amount
);
let mut pipe = redis::pipe();
pipe.atomic();
// We store these amounts as lists of strings
// because we cannot do BigNumber arithmetic in the store
// When loading the amounts, we convert them to the appropriate data
// type and sum them up.
pipe.rpush(
uncredited_amount_key(account_id),
AmountWithScale {
num: uncredited_settlement_amount.0,
scale: uncredited_settlement_amount.1,
},
)
.ignore();
Box::new(
pipe.query_async(self.connection.as_ref().clone())
// We store these amounts as lists of strings
// because we cannot do BigNumber arithmetic in the store
// When loading the amounts, we convert them to the appropriate data
// type and sum them up.
cmd("RPUSH")
.arg(uncredited_amount_key(account_id))
.arg(AmountWithScale {
num: uncredited_settlement_amount.0,
scale: uncredited_settlement_amount.1,
})
.query_async(self.connection.clone())
.map_err(move |err| error!("Error saving uncredited_settlement_amount: {:?}", err))
.and_then(move |(_conn, _ret): (_, Value)| Ok(())),
)
Expand All @@ -2049,23 +2048,23 @@ impl LeftoversStore for RedisStore {
// save any potential leftovers to the store
let (scaled_amount, precision_loss) =
scale_with_precision_loss(amount.0, local_scale, amount.1);
let mut pipe = redis::pipe();
pipe.atomic();
pipe.del(uncredited_amount_key(account_id)).ignore();
pipe.rpush(
uncredited_amount_key(account_id),
AmountWithScale {
num: precision_loss,
scale: std::cmp::max(local_scale, amount.1),
},
)
.ignore();

pipe.query_async(connection.as_ref().clone())
.map_err(move |err| {
error!("Error saving uncredited_settlement_amount: {:?}", err)
})
.and_then(move |(_conn, _ret): (_, Value)| Ok(scaled_amount))
if precision_loss > BigUint::from(0u32) {
Either::A(
cmd("RPUSH")
.arg(uncredited_amount_key(account_id))
.arg(AmountWithScale {
num: precision_loss,
scale: std::cmp::max(local_scale, amount.1),
})
.query_async(connection.clone())
.map_err(move |err| {
error!("Error saving uncredited_settlement_amount: {:?}", err)
})
.and_then(move |(_conn, _ret): (_, Value)| Ok(scaled_amount)),
)
} else {
Either::B(ok(scaled_amount))
}
}),
)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/interledger-store-redis/tests/settlement_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use bytes::Bytes;
use common::*;
use futures::future::join_all;
use http::StatusCode;
use interledger_service::{Account, AccountStore};
use interledger_api::NodeStore;
use interledger_service::{Account, AccountStore};
use interledger_settlement::{IdempotentStore, LeftoversStore, SettlementAccount, SettlementStore};
use interledger_store_redis::AccountId;
use lazy_static::lazy_static;
Expand Down

0 comments on commit 02a9718

Please sign in to comment.