Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 48 additions & 25 deletions src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,25 @@ pub enum RedisDataSrcError {
/// if an error occurs during a multi-step process or a transaction involving
/// other external data sources.
pub struct RedisDataConn {
conn: r2d2::PooledConnection<redis::Client>,
pool: r2d2::Pool<redis::Client>,
force_back_vec: Vec<fn(&mut redis::Connection) -> Result<(), Err>>,
}

impl RedisDataConn {
fn new(pool: r2d2::Pool<redis::Client>) -> Self {
Self {
pool,
force_back_vec: Vec::new(),
}
}

/// Returns a mutable reference to the underlying Redis connection.
///
/// This reference allows direct access to Redis commands.
pub fn get_connection(&mut self) -> &mut redis::Connection {
&mut self.conn
pub fn get_connection(&mut self) -> Result<r2d2::PooledConnection<redis::Client>, Err> {
self.pool
.get()
.map_err(|e| Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e))
}

/// Adds a function to the list of "force back" operations.
Expand Down Expand Up @@ -95,9 +104,17 @@ impl DataConn for RedisDataConn {
/// This method is intended to be called when a transaction fails to
/// manually revert changes.
fn force_back(&mut self, _ag: &mut AsyncGroup) {
for f in self.force_back_vec.iter() {
let _ = f(&mut self.conn);
}
match self.pool.get() {
Ok(mut conn) => {
for f in self.force_back_vec.iter() {
let _ = f(&mut conn);
}
}
Err(e) => {
// for error notification
let _ = Err::with_source(RedisDataSrcError::FailToGetConnectionFromPool, e);
}
};
}

/// Closes the connection.
Expand Down Expand Up @@ -185,16 +202,7 @@ where
/// `NotSetupYet` error if the data source has not been set up.
fn create_data_conn(&mut self) -> Result<Box<RedisDataConn>, Err> {
if let Some(pool) = self.pool.get() {
return match pool.get() {
Ok(conn) => Ok(Box::new(RedisDataConn {
conn,
force_back_vec: Vec::new(),
})),
Err(e) => Err(Err::with_source(
RedisDataSrcError::FailToGetConnectionFromPool,
e,
)),
};
return Ok(Box::new(RedisDataConn::new(pool.clone())));
} else {
return Err(Err::new(RedisDataSrcError::NotSetupYet));
}
Expand All @@ -219,7 +227,7 @@ mod test_redis {
trait RedisSampleDataAcc: DataAcc {
fn get_sample_key(&mut self) -> Result<Option<String>, Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let conn: &mut redis::Connection = redis_dc.get_connection();
let mut conn = redis_dc.get_connection()?;
let rslt: redis::RedisResult<Option<String>> = conn.get("sample");
return match rslt {
Ok(opt) => Ok(opt),
Expand All @@ -228,15 +236,15 @@ mod test_redis {
}
fn set_sample_key(&mut self, val: &str) -> Result<(), Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let conn: &mut redis::Connection = redis_dc.get_connection();
let mut conn = redis_dc.get_connection()?;
return match conn.set("sample", val) {
Ok(()) => Ok(()),
Err(e) => Err(Err::with_source(SampleError::FailToSetValue, e)),
};
}
fn del_sample_key(&mut self) -> Result<(), Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let conn: &mut redis::Connection = redis_dc.get_connection();
let mut conn = redis_dc.get_connection()?;
return match conn.del("sample") {
Ok(()) => Ok(()),
Err(e) => Err(Err::with_source(SampleError::FailToDelValue, e)),
Expand All @@ -245,9 +253,11 @@ mod test_redis {

fn set_sample_key_with_force_back(&mut self, val: &str) -> Result<(), Err> {
let redis_dc = self.get_data_conn::<RedisDataConn>("redis")?;
let conn: &mut redis::Connection = redis_dc.get_connection();
let mut conn = redis_dc.get_connection()?;

let result = conn.set("sample_force_back", val);
if let Err(e) = conn.set::<&str, &str, ()>("sample_force_back", val) {
return Err(Err::with_source(SampleError::FailToSetValue, e));
}

redis_dc.add_force_back(|conn| {
let r: redis::RedisResult<()> = conn.del("sample_force_back");
Expand All @@ -257,10 +267,19 @@ mod test_redis {
}
});

return match result {
Ok(()) => Ok(()),
Err(e) => Err(Err::with_source(SampleError::FailToSetValue, e)),
};
if let Err(e) = conn.set::<&str, &str, ()>("sample_force_back_2", val) {
return Err(Err::with_source(SampleError::FailToSetValue, e));
}

redis_dc.add_force_back(|conn| {
let r: redis::RedisResult<()> = conn.del("sample_force_back_2");
match r {
Ok(()) => Ok(()),
Err(e) => Err(Err::with_source("fail to force back", e)),
}
});

Ok(())
}
}
impl RedisSampleDataAcc for DataHub {}
Expand Down Expand Up @@ -349,6 +368,8 @@ mod test_redis {
let mut conn = client.get_connection().unwrap();
let s: redis::RedisResult<Option<String>> = conn.get("sample_force_back");
assert_eq!(s.unwrap().unwrap(), "Good Morning");
let s: redis::RedisResult<Option<String>> = conn.get("sample_force_back_2");
assert_eq!(s.unwrap().unwrap(), "Good Morning");

let _: redis::RedisResult<()> = conn.del("sample_force_back");
}
Expand All @@ -367,6 +388,8 @@ mod test_redis {
let mut conn = client.get_connection().unwrap();
let r: redis::RedisResult<Option<String>> = conn.get("sample_force_back");
assert!(r.unwrap().is_none());
let r: redis::RedisResult<Option<String>> = conn.get("sample_force_back_2");
assert!(r.unwrap().is_none());
}

#[test]
Expand Down