Skip to content

Commit

Permalink
Add a checktable method for recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ekmartin committed Oct 30, 2017
1 parent b2e4af4 commit 1ad6b46
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 17 deletions.
36 changes: 35 additions & 1 deletion src/checktable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use petgraph::graph::NodeIndex;

use std::collections::HashMap;
use std::collections::hash_map::{Entry, HashMap};
use std::fmt;
use std::fmt::Debug;

Expand Down Expand Up @@ -285,6 +285,40 @@ impl CheckTable {
}
}

pub fn recover(
&mut self,
base: NodeIndex,
columns: usize,
) -> (i64, Option<Box<HashMap<domain::Index, i64>>>) {
// Take timestamp
let ts = self.next_timestamp;
self.next_timestamp += 1;

// Compute the previous timestamp that each domain will see before getting this one
let prev_times = self.compute_previous_timestamps(Some(base));

// Update checktables
self.last_base = Some(base);
self.toplevel.insert(base, ts);

// Set the timestamp for each column:
let t = &mut self.granular.entry(base).or_default();
for i in 0..columns {
match t.entry(i) {
Entry::Occupied(mut entry) => {
let g = entry.get_mut();
assert!(g.0.is_empty(), "checktable should be empty before recovery");
g.1 = ts;
}
Entry::Vacant(entry) => {
entry.insert((HashMap::new(), ts));
}
}
}

(ts, prev_times)
}

pub fn apply_unconditional(
&mut self,
base: NodeIndex,
Expand Down
12 changes: 12 additions & 0 deletions src/checktable/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub struct TimestampReply {

service! {
rpc apply_batch(request: TimestampRequest) -> Option<TimestampReply>;
rpc recover(base: NodeIndex, columns: usize)
-> (i64, Option<Box<HashMap<domain::Index, i64>>>);
rpc claim_replay_timestamp(tag: Tag) -> (i64, Option<Box<HashMap<domain::Index, i64>>>);
rpc track(token_generator: TokenGenerator);
rpc perform_migration(deps: HashMap<domain::Index, (IngressFromBase, EgressForBase)>)
Expand Down Expand Up @@ -106,4 +108,14 @@ impl FutureService for CheckTableServer {
fn validate_token(&self, token: Token) -> Self::ValidateTokenFut {
Ok(self.checktable.lock().unwrap().validate_token(&token))
}

type RecoverFut = Result<(i64, Option<Box<HashMap<domain::Index, i64>>>), Never>;
fn recover(&self, base: NodeIndex, columns: usize) -> Self::RecoverFut {
Ok(
self.checktable
.lock()
.unwrap()
.recover(base, columns),
)
}
}
19 changes: 5 additions & 14 deletions src/flow/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,18 @@ pub fn retrieve_recovery_packets(
acc
}))
// Then create Packet objects from the data:
.enumerate()
.map(|(i, data)| {
.map(|data| {
let link = Link::new(*local_addr, *local_addr);
if node.is_transactional() {
let id = checktable::TransactionId(i as u64);
let transactions = vec![(id, data.clone(), None)];
let request = checktable::service::TimestampRequest {
transactions,
base: global_addr,
};
let (ts, prevs) = checktable
.recover(global_addr, node.fields().len())
.unwrap();

let reply = checktable.apply_batch(request).unwrap().unwrap();
Packet::Transaction {
link,
data,
tracer: None,
state: TransactionState::Committed(
reply.timestamp,
global_addr,
reply.prevs,
),
state: TransactionState::Committed(ts, global_addr, prevs),
}
} else {
Packet::Message {
Expand Down
4 changes: 2 additions & 2 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,11 @@ fn it_recovers_persisted_logs_w_transactions() {
g.recover();

for i in 1..10 {
let price = i * 10;
let b = i * 10;
let (result, _token) = getter.transactional_lookup(&i.into()).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0][0], i.into());
assert_eq!(result[0][1], price.into());
assert_eq!(result[0][1], b.into());
}
}

Expand Down

0 comments on commit 1ad6b46

Please sign in to comment.