Skip to content

Commit

Permalink
Only set timestamp for tracked columns
Browse files Browse the repository at this point in the history
  • Loading branch information
ekmartin committed Oct 27, 2017
1 parent 95f0c57 commit c6eef11
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 30 deletions.
23 changes: 5 additions & 18 deletions src/checktable/mod.rs
Expand Up @@ -6,7 +6,7 @@

use petgraph::graph::NodeIndex;

use std::collections::hash_map::{Entry, HashMap};
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;

Expand Down Expand Up @@ -285,11 +285,7 @@ impl CheckTable {
}
}

pub fn recover(
&mut self,
base: NodeIndex,
columns: usize,
) -> (i64, Option<Box<HashMap<domain::Index, i64>>>) {
pub fn recover(&mut self, base: NodeIndex) -> (i64, Option<Box<HashMap<domain::Index, i64>>>) {
// Take timestamp
let ts = self.next_timestamp;
self.next_timestamp += 1;
Expand All @@ -301,19 +297,10 @@ impl CheckTable {
self.last_base = Some(base);
self.toplevel.insert(base, ts);

// Set the timestamp for each column:
let t = &mut self.granular.entry(base).or_default();
for i in 0..columns {
match t.entry(i) {
Entry::Occupied(mut entry) => {
let g = entry.get_mut();
assert!(g.0.is_empty(), "checktable should be empty before recovery");
g.1 = ts;
}
Entry::Vacant(entry) => {
entry.insert((HashMap::new(), ts));
}
}
for (_column, g) in t.iter_mut() {
assert!(g.0.is_empty(), "checktable should be empty before recovery");
g.1 = ts;
}

(ts, prev_times)
Expand Down
11 changes: 3 additions & 8 deletions src/checktable/service.rs
Expand Up @@ -29,7 +29,7 @@ pub struct TimestampReply {

service! {
rpc apply_batch(request: TimestampRequest) -> Option<TimestampReply>;
rpc recover(base: NodeIndex, columns: usize)
rpc recover(base: NodeIndex)
-> (i64, Option<Box<HashMap<domain::Index, i64>>>);
rpc claim_replay_timestamp(tag: Tag) -> (i64, Option<Box<HashMap<domain::Index, i64>>>);
rpc track(token_generator: TokenGenerator);
Expand Down Expand Up @@ -110,12 +110,7 @@ impl FutureService for CheckTableServer {
}

type RecoverFut = Result<(i64, Option<Box<HashMap<domain::Index, i64>>>), Never>;
fn recover(&self, base: NodeIndex, columns: usize) -> Self::RecoverFut {
Ok(
self.checktable
.lock()
.unwrap()
.recover(base, columns),
)
fn recover(&self, base: NodeIndex) -> Self::RecoverFut {
Ok(self.checktable.lock().unwrap().recover(base))
}
}
5 changes: 1 addition & 4 deletions src/flow/persistence/mod.rs
Expand Up @@ -150,10 +150,7 @@ pub fn retrieve_recovery_packets(
.map(|data| {
let link = Link::new(*local_addr, *local_addr);
if node.is_transactional() {
let (ts, prevs) = checktable
.recover(global_addr, node.fields().len())
.unwrap();

let (ts, prevs) = checktable.recover(global_addr).unwrap();
Packet::Transaction {
link,
data,
Expand Down

0 comments on commit c6eef11

Please sign in to comment.