Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recovery #37

Merged
merged 33 commits into from Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d1d7914
Initial recovery implementation
ekmartin Oct 18, 2017
0a8e44b
Handle recovery in domains
ekmartin Oct 18, 2017
976c1a9
Add helper function for log paths
ekmartin Oct 18, 2017
59b889e
Add a recovery test
ekmartin Oct 18, 2017
5707c08
Remove sleep after .recovery() calls
ekmartin Oct 18, 2017
4de6523
Handle transactional log entries
ekmartin Oct 22, 2017
ce3f847
Use group_commit_queues in handle_recovery
ekmartin Oct 22, 2017
d69815c
Move recovery log to persistence
ekmartin Oct 22, 2017
a4003ef
Use a cow for records
ekmartin Oct 22, 2017
8889c75
Add tcp support to retrieve_recovery_packets
ekmartin Oct 22, 2017
455ddee
Ignore all log files
ekmartin Oct 22, 2017
8335cb4
Use is_transactional instead of LogEntry.packet_type
ekmartin Oct 22, 2017
b81f15f
Move retrieve_recovery_packets out of GroupCommitQueueSet
ekmartin Oct 22, 2017
2f95ebb
Don't use path.exists()
ekmartin Oct 22, 2017
5971ca3
Start base node recoveries in parallel
ekmartin Oct 22, 2017
a71c73c
Ignore JSON parsing errors during recovery
ekmartin Oct 24, 2017
1ff66e9
Merge recovery packets into batches
ekmartin Oct 25, 2017
7486556
Add a checktable method for recovery
ekmartin Oct 25, 2017
4bacd40
Only set timestamp for tracked columns
ekmartin Oct 27, 2017
6daa9d4
Move retrieve_recovery_packets into handle_recovery
ekmartin Nov 6, 2017
f79d6ef
Remove unused PacketType enum
ekmartin Nov 6, 2017
57130b3
Fix ControllerBuilder compatibility
ekmartin Nov 7, 2017
ce619f9
Remove indices vec in recover()
ekmartin Nov 8, 2017
5fed8cd
Properly chunk based on individual records
ekmartin Nov 8, 2017
25c6425
Add sleep() after recovery for Travis
ekmartin Nov 8, 2017
768e968
Add a warning for non-existant log files
ekmartin Nov 8, 2017
0084c57
Add a recovery test with multiple nodes
ekmartin Nov 10, 2017
5a2d135
Suffix test log files with a timestamp
ekmartin Nov 10, 2017
3b76e91
Drop original graph before recovering in tests
ekmartin Nov 10, 2017
03774c5
Clean up persistent log files using Drop in tests
ekmartin Nov 11, 2017
4a5dad0
Remove index.clone() and use expect() for BufReader
ekmartin Nov 12, 2017
a618597
Use SystemTime instead of time
ekmartin Nov 12, 2017
8126b20
Recover each node synchronously
ekmartin Nov 12, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
@@ -1,4 +1,4 @@
soup-log-*.json
*-log-*.json
*.png
*.log
plotting/*.png
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -14,7 +14,7 @@ binaries = ["default"]
generate_mysql_tests = ["default"]

[dependencies]
arccstr = "1.0.2"
arccstr = "1.0.3"
arrayvec = "0.4.0"
bincode = "0.9.0"
buf_redux = "0.6.1"
Expand Down Expand Up @@ -78,6 +78,7 @@ default-features = false
backtrace = { version = "0.3.2", features = ["serialize-serde"] }
toml = "0.4.1"
diff = "0.1.10"
glob = "0.2.11"

[profile.release]
debug=true
Expand Down
8 changes: 6 additions & 2 deletions benchmarks/vote/vote-server.rs
Expand Up @@ -67,8 +67,12 @@ fn main() {

println!("Attempting to start soup on {}", addr);

let persistence_params =
distributary::PersistenceParameters::new(durability, 512, time::Duration::from_millis(1));
let persistence_params = distributary::PersistenceParameters::new(
durability,
512,
time::Duration::from_millis(1),
Some(String::from("vote")),
);

let sock_addr: SocketAddr = addr.parse()
.expect("ADDR must be a valid HOST:PORT combination");
Expand Down
7 changes: 6 additions & 1 deletion benchmarks/vote/vote.rs
Expand Up @@ -249,7 +249,12 @@ fn main() {
DurabilityMode::MemoryOnly
};

let persistence_params = PersistenceParameters::new(mode, queue_length, flush_timeout);
let persistence_params = PersistenceParameters::new(
mode,
queue_length,
flush_timeout,
Some(String::from("vote")),
);

// setup db
let mut s = graph::Setup::default();
Expand Down
23 changes: 23 additions & 0 deletions dataflow/src/checktable/mod.rs
Expand Up @@ -285,6 +285,29 @@ impl CheckTable {
}
}

// Reserve a timestamp for the given base node, and update each column to said timestamp.
// This should be called for each batch of recovery updates.
pub fn recover(&mut self, base: NodeIndex) -> (i64, Option<Box<HashMap<domain::Index, i64>>>) {
// Take timestamp
let ts = self.next_timestamp;
self.next_timestamp += 1;

// Compute the previous timestamp that each domain will see before getting this one
let prev_times = self.compute_previous_timestamps(Some(base));

// Update checktables
self.last_base = Some(base);
self.toplevel.insert(base, ts);

let t = &mut self.granular.entry(base).or_default();
for (_column, g) in t.iter_mut() {
assert!(g.0.is_empty(), "checktable should be empty before recovery");
g.1 = ts;
}

(ts, prev_times)
}

pub fn apply_unconditional(
&mut self,
base: NodeIndex,
Expand Down
6 changes: 6 additions & 0 deletions dataflow/src/checktable/service.rs
Expand Up @@ -29,6 +29,7 @@ pub struct TimestampReply {

service! {
rpc apply_batch(request: TimestampRequest) -> Option<TimestampReply>;
rpc recover(base: NodeIndex) -> (i64, Option<Box<HashMap<domain::Index, i64>>>);
rpc claim_replay_timestamp(tag: Tag) -> (i64, Option<Box<HashMap<domain::Index, i64>>>);
rpc track(token_generator: TokenGenerator);
rpc perform_migration(deps: HashMap<domain::Index, (IngressFromBase, EgressForBase)>)
Expand Down Expand Up @@ -106,4 +107,9 @@ impl FutureService for CheckTableServer {
fn validate_token(&self, token: Token) -> Self::ValidateTokenFut {
Ok(self.checktable.lock().unwrap().validate_token(&token))
}

type RecoverFut = Result<(i64, Option<Box<HashMap<domain::Index, i64>>>), Never>;
fn recover(&self, base: NodeIndex) -> Self::RecoverFut {
Ok(self.checktable.lock().unwrap().recover(base))
}
}
83 changes: 83 additions & 0 deletions dataflow/src/domain/mod.rs
Expand Up @@ -5,6 +5,8 @@ use std::thread;
use std::time;
use std::collections::hash_map::Entry;
use std::rc::Rc;
use std::io::{BufRead, BufReader, ErrorKind};
use std::fs::File;

use std::net::SocketAddr;

Expand All @@ -18,6 +20,8 @@ use transactions;
use persistence;
use debug;
use checktable;
use serde_json;
use itertools::Itertools;
use slog::Logger;
use timekeeper::{RealTime, SimpleTracker, ThreadTime, Timer, TimerSet};
use tarpc::sync::client::{self, ClientExt};
Expand All @@ -30,6 +34,7 @@ pub struct Config {
}

const BATCH_SIZE: usize = 256;
const RECOVERY_BATCH_SIZE: usize = 512;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not larger? We know nothing else is happening in the graph until it has recovered anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fintelia suggested 512 - I'm not completely sure what the trade-offs are here. What do you think would be a good number?


const NANOS_PER_SEC: u64 = 1_000_000_000;
macro_rules! dur_to_ns {
Expand Down Expand Up @@ -803,6 +808,9 @@ impl Domain {
Packet::ReplayPiece { .. } => {
self.handle_replay(m);
}
Packet::StartRecovery { .. } => {
self.handle_recovery();
}
consumed => {
match consumed {
// workaround #16223
Expand Down Expand Up @@ -1566,6 +1574,81 @@ impl Domain {
}
}

fn handle_recovery(&mut self) {
let checktable = self.transaction_state.get_checktable();
let node_info: Vec<_> = self.nodes
.iter()
.map(|(index, node)| {
let n = node.borrow();
(index, n.global_addr(), n.is_transactional())
})
.collect();

for (local_addr, global_addr, is_transactional) in node_info {
let path = self.persistence_parameters.log_path(
&local_addr,
self.index,
self.shard.unwrap_or(0),
);

let file = match File::open(&path) {
Ok(f) => f,
Err(ref e) if e.kind() == ErrorKind::NotFound => {
warn!(
self.log,
"No log file found for node {}, starting out empty",
local_addr
);

continue;
}
Err(e) => panic!("Could not open log file {:?}: {}", path, e),
};

BufReader::new(file)
.lines()
.filter_map(|line| {
let line = line
.expect(&format!("Failed to read line from log file: {:?}", path));
let entries: Result<Vec<Records>, _> = serde_json::from_str(&line);
entries.ok()
})
// Parsing each individual line gives us an iterator over Vec<Records>.
// We're interested in chunking each record, so let's flat_map twice:
// Iter<Vec<Records>> -> Iter<Records> -> Iter<Record>
.flat_map(|r| r)
.flat_map(|r| r)
// Merge individual records into batches of RECOVERY_BATCH_SIZE:
.chunks(RECOVERY_BATCH_SIZE)
.into_iter()
// Then create Packet objects from the data:
.map(|chunk| {
let data: Records = chunk.collect();
let link = Link::new(local_addr, local_addr);
if is_transactional {
let (ts, prevs) = checktable.recover(global_addr).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Shouldn't it be possible to just claim a single timestamp for the entire recovery (across all base nodes)? @fintelia may be able to shed some light.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a limitation of our transaction logic. Every timestamped message must originate from only a single base node. Further, every message from a base must also have its own distinct timestamp (which is why there can't be one timestamp per base either).

Packet::Transaction {
link,
data,
tracer: None,
state: TransactionState::Committed(ts, global_addr, prevs),
}
} else {
Packet::Message {
link,
data,
tracer: None,
}
}
})
.for_each(|packet| self.handle(box packet));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this could deadlock if you had an A-B-A domain assignment. But we already disallow that elsewhere, so probably not an issue.

}

self.control_reply_tx
.send(ControlReplyPacket::ack())
.unwrap();
}

fn handle_replay(&mut self, m: Box<Packet>) {
let tag = m.tag().unwrap();
let mut finished = None;
Expand Down
3 changes: 3 additions & 0 deletions dataflow/src/payload.rs
Expand Up @@ -223,6 +223,9 @@ pub enum Packet {
/// A packet used solely to drive the event loop forward.
Spin,

/// Signal that a base node's domain should start replaying logs.
StartRecovery,

// Transaction time messages
//
/// Instruct domain to flush pending transactions and notify upon completion. `prev_ts` is the
Expand Down