diff --git a/executors/src/eoa/store/hydrate.rs b/executors/src/eoa/store/hydrate.rs index 97a118e..ba2aade 100644 --- a/executors/src/eoa/store/hydrate.rs +++ b/executors/src/eoa/store/hydrate.rs @@ -80,11 +80,39 @@ impl EoaExecutorStore { ); } - let results: Vec = pipe.query_async(&mut self.redis.clone()).await?; + let results: Vec> = pipe.query_async(&mut self.redis.clone()).await?; let mut hydrated = Vec::with_capacity(dehydrated.len()); + + let mut deletion_pipe = twmq::redis::pipe(); + for (d, r) in dehydrated.into_iter().zip(results.iter()) { - hydrated.push(d.hydrate(serde_json::from_str::(r)?)); + match r { + Some(r) => { + hydrated.push(d.hydrate(serde_json::from_str::(r)?)) + } + None => { + // delete this transaction entry from pending, borrowed, submitted + deletion_pipe.zrem( + self.keys.pending_transactions_zset_name(), + d.transaction_id(), + ); + deletion_pipe.hdel( + self.keys.borrowed_transactions_hashmap_name(), + d.transaction_id(), + ); + tracing::warn!( + "Transaction {} data was missing, deleting transaction from redis", + d.transaction_id() + ); + } + } + } + + if !deletion_pipe.is_empty() { + deletion_pipe + .query_async::<()>(&mut self.redis.clone()) + .await?; } Ok(hydrated)