Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 153 additions & 18 deletions src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ pub enum RedisDataSrcError {
/// other external data sources.
pub struct RedisDataConn {
pool: r2d2::Pool<redis::Client>,
force_back_vec: Vec<fn(&mut redis::Connection) -> Result<(), Err>>,
pre_commit_vec: Vec<Box<dyn FnMut(&mut redis::Connection) -> Result<(), Err>>>,
post_commit_vec: Vec<Box<dyn FnMut(&mut redis::Connection) -> Result<(), Err>>>,
force_back_vec: Vec<Box<dyn FnMut(&mut redis::Connection) -> Result<(), Err>>>,
}

impl RedisDataConn {
fn new(pool: r2d2::Pool<redis::Client>) -> Self {
Self {
pool,
pre_commit_vec: Vec::new(),
post_commit_vec: Vec::new(),
force_back_vec: Vec::new(),
}
}
Expand All @@ -65,16 +69,58 @@ impl RedisDataConn {
.map_err(|e| Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e))
}

/// Adds a function to the list of "pre commit" operations.
///
/// The provided function will be called after all other database update processes have
/// finished, and right before their commit processes are made,
pub fn add_pre_commit<F>(&mut self, f: F)
where
F: FnMut(&mut redis::Connection) -> Result<(), Err> + 'static
{
self.pre_commit_vec.push(Box::new(f));
}

/// Adds a function to the list of "post commit" operations.
///
/// The provided function will be called as post-transaction processes.
pub fn add_post_commit<F>(&mut self, f: F)
where
F: FnMut(&mut redis::Connection) -> Result<(), Err> + 'static
{
self.post_commit_vec.push(Box::new(f));
}

/// Adds a function to the list of "force back" operations.
///
/// The provided function will be called if the transaction needs to be reverted
/// due to a failure in a subsequent operation.
pub fn add_force_back(&mut self, f: fn(&mut redis::Connection) -> Result<(), Err>) {
self.force_back_vec.push(f);
pub fn add_force_back<F>(&mut self, f: F)
where
F: FnMut(&mut redis::Connection) -> Result<(), Err> + 'static
{
self.force_back_vec.push(Box::new(f));
}
}

impl DataConn for RedisDataConn {
/// Executes all functions registered as "pre commit" processes.
///
/// This method is called after all other database update processes have finished and right
/// before their commit processes are made, Since Redis does not have a rollback feature,
/// it is possible to maintain transactional consistency by performing updates at this timing,
/// as long as the updated data are not searched for within the transaction.
fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> {
match self.pool.get() {
Ok(mut conn) => {
for f in self.pre_commit_vec.iter_mut() {
f(&mut conn)?;
}
Ok(())
}
Err(e) => Err(Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e)),
}
}

/// Commits the transaction.
///
/// Note that Redis does not have a native rollback mechanism. All changes are committed
Expand All @@ -84,6 +130,25 @@ impl DataConn for RedisDataConn {
Ok(())
}

/// Executes all functions registered as "post commit" processes.
///
/// This method is called as post-transaction processes. Since Redis does not have a rollback
/// feature, it's acceptable to perform update processes at this point that can be manually
/// recovered later, even if they fail.
fn post_commit(&mut self, _ag: &mut AsyncGroup) {
match self.pool.get() {
Ok(mut conn) => {
for f in self.post_commit_vec.iter_mut() {
let _ = f(&mut conn);
}
}
Err(e) => {
// for error notification
let _ = Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e);
}
};
}

/// Rolls back the transaction.
///
/// This method is provided to satisfy the `DataConn` trait but does not perform any action
Expand All @@ -106,7 +171,7 @@ impl DataConn for RedisDataConn {
fn force_back(&mut self, _ag: &mut AsyncGroup) {
match self.pool.get() {
Ok(mut conn) => {
for f in self.force_back_vec.iter() {
for f in self.force_back_vec.iter_mut() {
let _ = f(&mut conn);
}
}
Expand Down Expand Up @@ -226,40 +291,40 @@ mod test_redis {
#[overridable]
trait RedisSampleDataAcc: DataAcc {
fn get_sample_key(&mut self) -> Result<Option<String>, Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = redis_dc.get_connection()?;
let data_conn = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = data_conn.get_connection()?;
let rslt: redis::RedisResult<Option<String>> = conn.get("sample");
return match rslt {
Ok(opt) => Ok(opt),
Err(e) => Err(Err::with_source(SampleError::FailToGetValue, e)),
};
}
fn set_sample_key(&mut self, val: &str) -> Result<(), Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = redis_dc.get_connection()?;
let data_conn = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = data_conn.get_connection()?;
return match conn.set("sample", val) {
Ok(()) => Ok(()),
Err(e) => Err(Err::with_source(SampleError::FailToSetValue, e)),
};
}
fn del_sample_key(&mut self) -> Result<(), Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = redis_dc.get_connection()?;
let data_conn = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = data_conn.get_connection()?;
return match conn.del("sample") {
Ok(()) => Ok(()),
Err(e) => Err(Err::with_source(SampleError::FailToDelValue, e)),
};
}

fn set_sample_key_with_force_back(&mut self, val: &str) -> Result<(), Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = redis_dc.get_connection()?;
let data_conn = self.get_data_conn::<RedisDataConn>("redis")?;
let mut conn = data_conn.get_connection()?;

if let Err(e) = conn.set::<&str, &str, ()>("sample_force_back", val) {
return Err(Err::with_source(SampleError::FailToSetValue, e));
}

redis_dc.add_force_back(|conn| {
data_conn.add_force_back(|conn| {
let r: redis::RedisResult<()> = conn.del("sample_force_back");
match r {
Ok(()) => Ok(()),
Expand All @@ -271,7 +336,7 @@ mod test_redis {
return Err(Err::with_source(SampleError::FailToSetValue, e));
}

redis_dc.add_force_back(|conn| {
data_conn.add_force_back(|conn| {
let r: redis::RedisResult<()> = conn.del("sample_force_back_2");
match r {
Ok(()) => Ok(()),
Expand All @@ -281,6 +346,36 @@ mod test_redis {

Ok(())
}

fn set_sample_key_in_pre_commit(&mut self, val: &str) -> Result<(), Err> {
let data_conn = self.get_data_conn::<RedisDataConn>("redis")?;

let val_owned = val.to_string();

data_conn.add_pre_commit(move |conn| {
if let Err(e) = conn.set::<&str, &str, ()>("sample_pre_commit", &val_owned) {
return Err(Err::with_source(SampleError::FailToSetValue, e));
}
Ok(())
});

Ok(())
}

fn set_sample_key_in_post_commit(&mut self, val: &str) -> Result<(), Err> {
let data_conn = self.get_data_conn::<RedisDataConn>("redis")?;

let val_owned = val.to_string();

data_conn.add_post_commit(move |conn| {
if let Err(e) = conn.set::<&str, &str, ()>("sample_post_commit", &val_owned) {
return Err(Err::with_source(SampleError::FailToSetValue, e));
}
Ok(())
});

Ok(())
}
}
impl RedisSampleDataAcc for DataHub {}

Expand All @@ -290,6 +385,8 @@ mod test_redis {
fn set_sample_key(&mut self, value: &str) -> Result<(), Err>;
fn del_sample_key(&mut self) -> Result<(), Err>;
fn set_sample_key_with_force_back(&mut self, val: &str) -> Result<(), Err>;
fn set_sample_key_in_pre_commit(&mut self, val: &str) -> Result<(), Err>;
fn set_sample_key_in_post_commit(&mut self, val: &str) -> Result<(), Err>;
}
#[override_with(RedisSampleDataAcc)]
impl SampleData for DataHub {}
Expand Down Expand Up @@ -365,20 +462,28 @@ mod test_redis {
}
}

fn sample_logic_with_force_back_and_commit(data: &mut impl SampleData) -> Result<(), Err> {
fn sample_logic_in_txn_and_commit(data: &mut impl SampleData) -> Result<(), Err> {
data.set_sample_key_with_force_back("Good Morning")?;
Ok(())
}
fn sample_logic_with_force_back_and_force_back(data: &mut impl SampleData) -> Result<(), Err> {
fn sample_logic_in_txn_and_force_back(data: &mut impl SampleData) -> Result<(), Err> {
data.set_sample_key_with_force_back("Good Afternoon")?;
Err(Err::new("XXX"))
}
fn sample_logic_in_txn_and_pre_commit(data: &mut impl SampleData) -> Result<(), Err> {
data.set_sample_key_in_pre_commit("Good Evening")?;
Ok(())
}
fn sample_logic_in_txn_and_post_commit(data: &mut impl SampleData) -> Result<(), Err> {
data.set_sample_key_in_post_commit("Good Night")?;
Ok(())
}

#[test]
fn test_txn_and_commit() {
let mut data = DataHub::new();
data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/2"));
if let Err(err) = sabi::txn!(sample_logic_with_force_back_and_commit, data) {
if let Err(err) = sabi::txn!(sample_logic_in_txn_and_commit, data) {
panic!("{:?}", err);
}

Expand All @@ -392,12 +497,42 @@ mod test_redis {
let _: redis::RedisResult<()> = conn.del("sample_force_back");
}

#[test]
fn tst_txn_and_pre_commit() {
let mut data = DataHub::new();
data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/4"));

if let Err(err) = sabi::txn!(sample_logic_in_txn_and_pre_commit, data) {
panic!("{:?}", err);
}

let client = redis::Client::open("redis://127.0.0.1:6379/4").unwrap();
let mut conn = client.get_connection().unwrap();
let s: redis::RedisResult<Option<String>> = conn.get("sample_pre_commit");
assert_eq!(s.unwrap().unwrap(), "Good Evening");
}

#[test]
fn tst_txn_and_post_commit() {
let mut data = DataHub::new();
data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/5"));

if let Err(err) = sabi::txn!(sample_logic_in_txn_and_post_commit, data) {
panic!("{:?}", err);
}

let client = redis::Client::open("redis://127.0.0.1:6379/5").unwrap();
let mut conn = client.get_connection().unwrap();
let s: redis::RedisResult<Option<String>> = conn.get("sample_post_commit");
assert_eq!(s.unwrap().unwrap(), "Good Night");
}

#[test]
fn test_txn_and_force_back() {
let mut data = DataHub::new();
data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/3"));

if let Err(err) = sabi::txn!(sample_logic_with_force_back_and_force_back, data) {
if let Err(err) = sabi::txn!(sample_logic_in_txn_and_force_back, data) {
assert_eq!(err.reason::<&str>().unwrap(), &"XXX");
} else {
panic!();
Expand Down