From c6eef11315b60c5e7ac29b3c536573f9a27cfa9b Mon Sep 17 00:00:00 2001 From: Martin Ek Date: Fri, 27 Oct 2017 15:42:36 +0200 Subject: [PATCH] Only set timestamp for tracked columns --- src/checktable/mod.rs | 23 +++++------------------ src/checktable/service.rs | 11 +++-------- src/flow/persistence/mod.rs | 5 +---- 3 files changed, 9 insertions(+), 30 deletions(-) diff --git a/src/checktable/mod.rs b/src/checktable/mod.rs index ceb0a50ef..1d7a0159d 100644 --- a/src/checktable/mod.rs +++ b/src/checktable/mod.rs @@ -6,7 +6,7 @@ use petgraph::graph::NodeIndex; -use std::collections::hash_map::{Entry, HashMap}; +use std::collections::HashMap; use std::fmt; use std::fmt::Debug; @@ -285,11 +285,7 @@ impl CheckTable { } } - pub fn recover( - &mut self, - base: NodeIndex, - columns: usize, - ) -> (i64, Option>>) { + pub fn recover(&mut self, base: NodeIndex) -> (i64, Option>>) { // Take timestamp let ts = self.next_timestamp; self.next_timestamp += 1; @@ -301,19 +297,10 @@ impl CheckTable { self.last_base = Some(base); self.toplevel.insert(base, ts); - // Set the timestamp for each column: let t = &mut self.granular.entry(base).or_default(); - for i in 0..columns { - match t.entry(i) { - Entry::Occupied(mut entry) => { - let g = entry.get_mut(); - assert!(g.0.is_empty(), "checktable should be empty before recovery"); - g.1 = ts; - } - Entry::Vacant(entry) => { - entry.insert((HashMap::new(), ts)); - } - } + for (_column, g) in t.iter_mut() { + assert!(g.0.is_empty(), "checktable should be empty before recovery"); + g.1 = ts; } (ts, prev_times) diff --git a/src/checktable/service.rs b/src/checktable/service.rs index 4864df548..6c99fa985 100644 --- a/src/checktable/service.rs +++ b/src/checktable/service.rs @@ -29,7 +29,7 @@ pub struct TimestampReply { service! { rpc apply_batch(request: TimestampRequest) -> Option; - rpc recover(base: NodeIndex, columns: usize) + rpc recover(base: NodeIndex) -> (i64, Option>>); rpc claim_replay_timestamp(tag: Tag) -> (i64, Option>>); rpc track(token_generator: TokenGenerator); @@ -110,12 +110,7 @@ impl FutureService for CheckTableServer { } type RecoverFut = Result<(i64, Option>>), Never>; - fn recover(&self, base: NodeIndex, columns: usize) -> Self::RecoverFut { - Ok( - self.checktable - .lock() - .unwrap() - .recover(base, columns), - ) + fn recover(&self, base: NodeIndex) -> Self::RecoverFut { + Ok(self.checktable.lock().unwrap().recover(base)) } } diff --git a/src/flow/persistence/mod.rs b/src/flow/persistence/mod.rs index 51237619e..df43968bf 100644 --- a/src/flow/persistence/mod.rs +++ b/src/flow/persistence/mod.rs @@ -150,10 +150,7 @@ pub fn retrieve_recovery_packets( .map(|data| { let link = Link::new(*local_addr, *local_addr); if node.is_transactional() { - let (ts, prevs) = checktable - .recover(global_addr, node.fields().len()) - .unwrap(); - + let (ts, prevs) = checktable.recover(global_addr).unwrap(); Packet::Transaction { link, data,