diff --git a/src/standalone.rs b/src/standalone.rs index 3f8e874..40e058c 100644 --- a/src/standalone.rs +++ b/src/standalone.rs @@ -45,13 +45,17 @@ pub enum RedisDataSrcError { /// other external data sources. pub struct RedisDataConn { pool: r2d2::Pool, - force_back_vec: Vec Result<(), Err>>, + pre_commit_vec: Vec Result<(), Err>>>, + post_commit_vec: Vec Result<(), Err>>>, + force_back_vec: Vec Result<(), Err>>>, } impl RedisDataConn { fn new(pool: r2d2::Pool) -> Self { Self { pool, + pre_commit_vec: Vec::new(), + post_commit_vec: Vec::new(), force_back_vec: Vec::new(), } } @@ -65,16 +69,58 @@ impl RedisDataConn { .map_err(|e| Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e)) } + /// Adds a function to the list of "pre commit" operations. + /// + /// The provided function will be called after all other database update processes have + /// finished, and right before their commit processes are made, + pub fn add_pre_commit(&mut self, f: F) + where + F: FnMut(&mut redis::Connection) -> Result<(), Err> + 'static + { + self.pre_commit_vec.push(Box::new(f)); + } + + /// Adds a function to the list of "post commit" operations. + /// + /// The provided function will be called as post-transaction processes. + pub fn add_post_commit(&mut self, f: F) + where + F: FnMut(&mut redis::Connection) -> Result<(), Err> + 'static + { + self.post_commit_vec.push(Box::new(f)); + } + /// Adds a function to the list of "force back" operations. /// /// The provided function will be called if the transaction needs to be reverted /// due to a failure in a subsequent operation. - pub fn add_force_back(&mut self, f: fn(&mut redis::Connection) -> Result<(), Err>) { - self.force_back_vec.push(f); + pub fn add_force_back(&mut self, f: F) + where + F: FnMut(&mut redis::Connection) -> Result<(), Err> + 'static + { + self.force_back_vec.push(Box::new(f)); } } impl DataConn for RedisDataConn { + /// Executes all functions registered as "pre commit" processes. + /// + /// This method is called after all other database update processes have finished and right + /// before their commit processes are made, Since Redis does not have a rollback feature, + /// it is possible to maintain transactional consistency by performing updates at this timing, + /// as long as the updated data are not searched for within the transaction. + fn pre_commit(&mut self, _ag: &mut AsyncGroup) -> Result<(), Err> { + match self.pool.get() { + Ok(mut conn) => { + for f in self.pre_commit_vec.iter_mut() { + f(&mut conn)?; + } + Ok(()) + } + Err(e) => Err(Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e)), + } + } + /// Commits the transaction. /// /// Note that Redis does not have a native rollback mechanism. All changes are committed @@ -84,6 +130,25 @@ impl DataConn for RedisDataConn { Ok(()) } + /// Executes all functions registered as "post commit" processes. + /// + /// This method is called as post-transaction processes. Since Redis does not have a rollback + /// feature, it's acceptable to perform update processes at this point that can be manually + /// recovered later, even if they fail. + fn post_commit(&mut self, _ag: &mut AsyncGroup) { + match self.pool.get() { + Ok(mut conn) => { + for f in self.post_commit_vec.iter_mut() { + let _ = f(&mut conn); + } + } + Err(e) => { + // for error notification + let _ = Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e); + } + }; + } + /// Rolls back the transaction. /// /// This method is provided to satisfy the `DataConn` trait but does not perform any action @@ -106,7 +171,7 @@ impl DataConn for RedisDataConn { fn force_back(&mut self, _ag: &mut AsyncGroup) { match self.pool.get() { Ok(mut conn) => { - for f in self.force_back_vec.iter() { + for f in self.force_back_vec.iter_mut() { let _ = f(&mut conn); } } @@ -226,8 +291,8 @@ mod test_redis { #[overridable] trait RedisSampleDataAcc: DataAcc { fn get_sample_key(&mut self) -> Result, Err> { - let redis_dc = self.get_data_conn::("redis")?; - let mut conn = redis_dc.get_connection()?; + let data_conn = self.get_data_conn::("redis")?; + let mut conn = data_conn.get_connection()?; let rslt: redis::RedisResult> = conn.get("sample"); return match rslt { Ok(opt) => Ok(opt), @@ -235,16 +300,16 @@ mod test_redis { }; } fn set_sample_key(&mut self, val: &str) -> Result<(), Err> { - let redis_dc = self.get_data_conn::("redis")?; - let mut conn = redis_dc.get_connection()?; + let data_conn = self.get_data_conn::("redis")?; + let mut conn = data_conn.get_connection()?; return match conn.set("sample", val) { Ok(()) => Ok(()), Err(e) => Err(Err::with_source(SampleError::FailToSetValue, e)), }; } fn del_sample_key(&mut self) -> Result<(), Err> { - let redis_dc = self.get_data_conn::("redis")?; - let mut conn = redis_dc.get_connection()?; + let data_conn = self.get_data_conn::("redis")?; + let mut conn = data_conn.get_connection()?; return match conn.del("sample") { Ok(()) => Ok(()), Err(e) => Err(Err::with_source(SampleError::FailToDelValue, e)), @@ -252,14 +317,14 @@ mod test_redis { } fn set_sample_key_with_force_back(&mut self, val: &str) -> Result<(), Err> { - let redis_dc = self.get_data_conn::("redis")?; - let mut conn = redis_dc.get_connection()?; + let data_conn = self.get_data_conn::("redis")?; + let mut conn = data_conn.get_connection()?; if let Err(e) = conn.set::<&str, &str, ()>("sample_force_back", val) { return Err(Err::with_source(SampleError::FailToSetValue, e)); } - redis_dc.add_force_back(|conn| { + data_conn.add_force_back(|conn| { let r: redis::RedisResult<()> = conn.del("sample_force_back"); match r { Ok(()) => Ok(()), @@ -271,7 +336,7 @@ mod test_redis { return Err(Err::with_source(SampleError::FailToSetValue, e)); } - redis_dc.add_force_back(|conn| { + data_conn.add_force_back(|conn| { let r: redis::RedisResult<()> = conn.del("sample_force_back_2"); match r { Ok(()) => Ok(()), @@ -281,6 +346,36 @@ mod test_redis { Ok(()) } + + fn set_sample_key_in_pre_commit(&mut self, val: &str) -> Result<(), Err> { + let data_conn = self.get_data_conn::("redis")?; + + let val_owned = val.to_string(); + + data_conn.add_pre_commit(move |conn| { + if let Err(e) = conn.set::<&str, &str, ()>("sample_pre_commit", &val_owned) { + return Err(Err::with_source(SampleError::FailToSetValue, e)); + } + Ok(()) + }); + + Ok(()) + } + + fn set_sample_key_in_post_commit(&mut self, val: &str) -> Result<(), Err> { + let data_conn = self.get_data_conn::("redis")?; + + let val_owned = val.to_string(); + + data_conn.add_post_commit(move |conn| { + if let Err(e) = conn.set::<&str, &str, ()>("sample_post_commit", &val_owned) { + return Err(Err::with_source(SampleError::FailToSetValue, e)); + } + Ok(()) + }); + + Ok(()) + } } impl RedisSampleDataAcc for DataHub {} @@ -290,6 +385,8 @@ mod test_redis { fn set_sample_key(&mut self, value: &str) -> Result<(), Err>; fn del_sample_key(&mut self) -> Result<(), Err>; fn set_sample_key_with_force_back(&mut self, val: &str) -> Result<(), Err>; + fn set_sample_key_in_pre_commit(&mut self, val: &str) -> Result<(), Err>; + fn set_sample_key_in_post_commit(&mut self, val: &str) -> Result<(), Err>; } #[override_with(RedisSampleDataAcc)] impl SampleData for DataHub {} @@ -365,20 +462,28 @@ mod test_redis { } } - fn sample_logic_with_force_back_and_commit(data: &mut impl SampleData) -> Result<(), Err> { + fn sample_logic_in_txn_and_commit(data: &mut impl SampleData) -> Result<(), Err> { data.set_sample_key_with_force_back("Good Morning")?; Ok(()) } - fn sample_logic_with_force_back_and_force_back(data: &mut impl SampleData) -> Result<(), Err> { + fn sample_logic_in_txn_and_force_back(data: &mut impl SampleData) -> Result<(), Err> { data.set_sample_key_with_force_back("Good Afternoon")?; Err(Err::new("XXX")) } + fn sample_logic_in_txn_and_pre_commit(data: &mut impl SampleData) -> Result<(), Err> { + data.set_sample_key_in_pre_commit("Good Evening")?; + Ok(()) + } + fn sample_logic_in_txn_and_post_commit(data: &mut impl SampleData) -> Result<(), Err> { + data.set_sample_key_in_post_commit("Good Night")?; + Ok(()) + } #[test] fn test_txn_and_commit() { let mut data = DataHub::new(); data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/2")); - if let Err(err) = sabi::txn!(sample_logic_with_force_back_and_commit, data) { + if let Err(err) = sabi::txn!(sample_logic_in_txn_and_commit, data) { panic!("{:?}", err); } @@ -392,12 +497,42 @@ mod test_redis { let _: redis::RedisResult<()> = conn.del("sample_force_back"); } + #[test] + fn tst_txn_and_pre_commit() { + let mut data = DataHub::new(); + data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/4")); + + if let Err(err) = sabi::txn!(sample_logic_in_txn_and_pre_commit, data) { + panic!("{:?}", err); + } + + let client = redis::Client::open("redis://127.0.0.1:6379/4").unwrap(); + let mut conn = client.get_connection().unwrap(); + let s: redis::RedisResult> = conn.get("sample_pre_commit"); + assert_eq!(s.unwrap().unwrap(), "Good Evening"); + } + + #[test] + fn tst_txn_and_post_commit() { + let mut data = DataHub::new(); + data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/5")); + + if let Err(err) = sabi::txn!(sample_logic_in_txn_and_post_commit, data) { + panic!("{:?}", err); + } + + let client = redis::Client::open("redis://127.0.0.1:6379/5").unwrap(); + let mut conn = client.get_connection().unwrap(); + let s: redis::RedisResult> = conn.get("sample_post_commit"); + assert_eq!(s.unwrap().unwrap(), "Good Night"); + } + #[test] fn test_txn_and_force_back() { let mut data = DataHub::new(); data.uses("redis", RedisDataSrc::new("redis://127.0.0.1:6379/3")); - if let Err(err) = sabi::txn!(sample_logic_with_force_back_and_force_back, data) { + if let Err(err) = sabi::txn!(sample_logic_in_txn_and_force_back, data) { assert_eq!(err.reason::<&str>().unwrap(), &"XXX"); } else { panic!();