New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recovery #37
Recovery #37
Conversation
src/flow/persistence/mod.rs
Outdated
base: global_addr, | ||
}; | ||
|
||
let reply = self.checktable.apply_batch(request).unwrap().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this should handle None
as well, like in merge_transactional_packets
. I thought maybe it didn't have to since all the transactions are already committed, but that might be a false assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not be possible to get back None
since the packet doesn't have a token (and thus is guaranteed to commit).
However, we may not want to use apply_batch since it is actually a somewhat expensive operation. In this specific case, it should be possible to write something simpler that doesn't painstakingly simulate all the granular checktables since the beginning of time. Plus, then you wouldn't have to deal with the TransactionId stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used checktable.apply_unconditional
originally - not sure if that's better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good start!
I recommend not using your own GroupCommitQueueSet for dealing with recovery. Pretty much all it does is collect packets from multiple base nodes and batch them together to be released into the graph all in a single packet. However, the base logs are already grouped by base node, so that packet is unnecessary. I think it should be enough to just do the batch assembly yourself using the chunks
command.
Also, I need to think through the interaction between this change and normal domain transaction processing. I think everything is fine, but it can be somewhat subtle.
src/flow/mod.rs
Outdated
for index in indices { | ||
let domain = self.domains.get_mut(&index).unwrap(); | ||
domain.send(box payload::Packet::StartRecovery).unwrap(); | ||
domain.wait_for_ack().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to have domains recover sequentially? Could they all do so in parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
EDIT: As in that doing so in parallel sounds like a good idea.
src/flow/persistence/mod.rs
Outdated
Packet::Message { ref data, .. } => data, | ||
Packet::Transaction { ref data, .. } => LogEntry { | ||
records: Cow::Borrowed(data), | ||
packet_type: PacketType::Transaction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is necessary to store the packet type on a per message basis. All writes to a base node should be of the same type (and you can check which it is by seeing if the base node is transactional)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, didn't think about that. Good point!
src/flow/persistence/mod.rs
Outdated
base: global_addr, | ||
}; | ||
|
||
let reply = self.checktable.apply_batch(request).unwrap().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not be possible to get back None
since the packet doesn't have a token (and thus is guaranteed to commit).
However, we may not want to use apply_batch since it is actually a somewhat expensive operation. In this specific case, it should be possible to write something simpler that doesn't painstakingly simulate all the granular checktables since the beginning of time. Plus, then you wouldn't have to deal with the TransactionId stuff.
src/flow/persistence/mod.rs
Outdated
continue; | ||
} | ||
|
||
let file = File::open(path).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of checking whether the file exists and then unwrapping, it may be better to just try opening the log and catching any errors that triggers.
Regarding |
e944aa0
to
62cc763
Compare
src/flow/persistence/mod.rs
Outdated
.lines() | ||
.flat_map(|line| { | ||
let line = line.unwrap(); | ||
let entries: Vec<Records> = serde_json::from_str(&line).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the system was part way through writing a line when it crashed, then deserialization may fail during recovery. It is worth handling this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
src/flow/persistence/mod.rs
Outdated
base: global_addr, | ||
}; | ||
|
||
let reply = checktable.apply_batch(request).unwrap().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to avoid getting a separate timestamp for every packet. Instead, merge packets from the same base node and assign the combination a single timestamp. Based on other experiments, ~512 packets to a batch will probably provide good efficiency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I guess it doesn't hurt to merge regular packets as well, and not just transactional ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 57cd580
src/flow/domain/mod.rs
Outdated
self.transaction_state.get_checktable().clone(), | ||
); | ||
|
||
packets.into_iter().for_each(|packet| self.handle(packet)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is kind of wasteful to construct a Vec if you are going to immediately call into_iter on it. Instead, you could just return an iterator of all the packets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed - but I couldn't find a clean way of returning an iterator because of the borrow on node
. Tried something like this:
nodes
.iter()
.filter_map(|(_index, node)| {
let node = node.borrow();
let local_addr = node.local_addr();
let global_addr = node.global_addr();
let path = params.log_path(&local_addr, domain_index, domain_shard);
let file = match File::open(&path) {
Ok(f) => f,
Err(ref e) if e.kind() == ErrorKind::NotFound => return None,
Err(e) => panic!("Could not open log file {:?}: {}", path, e),
};
let iter = BufReader::new(file)
.lines()
.filter_map(|line| {
let line = line.unwrap();
let entries: Result<Vec<Records>, _> = serde_json::from_str(&line);
entries.ok()
})
.flat_map(|r| r)
// Merge packets into batches of RECOVERY_BATCH_SIZE:
.chunks(RECOVERY_BATCH_SIZE)
.into_iter()
.map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| {
acc.append(data);
acc
}))
// Then create Packet objects from the data:
.map(|data| {
let link = Link::new(*local_addr, *local_addr);
if node.is_transactional() {
let (ts, prevs) = checktable
.recover(global_addr, node.fields().len())
.unwrap();
box Packet::Transaction {
link,
data,
tracer: None,
state: TransactionState::Committed(ts, global_addr, prevs),
}
} else {
box Packet::Message {
link,
data,
tracer: None,
}
}
});
Some(iter)
})
.flat_map(|i| i)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the issue. Could something like the following work?
fn handle_recovery(&mut self) {
let addresses: Vec<_> = nodes.iter().map(|n| (n.local_addr(), n.global_addr())).collect();
for (local, global) in addresses {
for packet in persistence::retrieve_recovery_packets(...) {
self.handle(packet);
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fintelia: I ended up moving the retrieve_recovery_packets
logic up to handle_recovery
- some of the arguments ended up looking a bit weird, so not sure if it was a very good abstraction anyway. Let me know if you disagree though and I'll take a second look at it.
For reference I also bumped into a separate issue when I tried returning an iterator from retrieve_recovery_packets
: .chunks()
returns IntoChunks
- not an iterable which meant that the value here didn't last as long as 'a
.
c6eef11
to
d3dbd8a
Compare
src/checktable/mod.rs
Outdated
self.toplevel.insert(base, ts); | ||
|
||
let t = &mut self.granular.entry(base).or_default(); | ||
for (_column, g) in t.iter_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fintelia: I updated this so it only sets timestamps for tracked columns, like we talked about.
d3dbd8a
to
f6c89e8
Compare
@ekmartin There was just a rather large refactor that landed on master. Could you try to rebase/merge the changes into your branch? A lot of things moved around so let me know if you need a hand finding something. Notable moves: src/flow/domain/mod.rs -> dataflow/src/domain/mod.rs |
47832b1
to
6fbae6f
Compare
@fintelia: I rebased and added a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in principle; will leave some higher-level comments in PR.
dataflow/src/domain/mod.rs
Outdated
// Merge packets into batches of RECOVERY_BATCH_SIZE: | ||
.chunks(RECOVERY_BATCH_SIZE) | ||
.into_iter() | ||
.map(|chunk| chunk.fold(Records::default(), |mut acc, ref mut data| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOI, why is this step required? AIUI, the original Vec<Records>
exists because each line can contain several records (one originally-written batch per line?), so the real type is Vec<Vec<Vec<DataType>>>
. The flat_map
removes the outermost layer, so we now basically have a single, large Vec<Vec<DataType>>
, which we then chunk into batches. This turns into a new iterator over these batched chunks (iterator over type &[Vec<DataType>]
, within which each chunk is concerted into a single Records
structure (= Vec<Vec<DataType>>
). Is that correct?
(In other words, the extra map
/fold
is required to deal with the slice type and get back to Records
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After taking a second look at this I'm pretty sure that the original implementation is actually wrong, thanks for asking about it.
Since it only called flat_map
once, it would essentially chunk based on Records
- and not Record
:
.filter_map(|line| {
let line = line.unwrap();
let entries: Result<Vec<Records>, _> = serde_json::from_str(&line);
entries.ok()
})
// At this point we have an iterator over `Vec<Records>` (lines in the log)
.flat_map(|r| r)
// We've now gone from an iterator over `Vec<Records>` to an iterator over `Records`
.chunks(...)
// Each chunk now has a list of `Records`, which are flattened into one by the `.fold`.
It should be fixed in cc996f2 - which adds a second flat_map
, and collects each chunk into a Records
before passing it on to the packet.
dataflow/src/domain/mod.rs
Outdated
|
||
let file = match File::open(&path) { | ||
Ok(f) => f, | ||
Err(ref e) if e.kind() == ErrorKind::NotFound => continue, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the log file doesn't exist, we skip recovering this node -- is this the right behavior? It seems like you could easily end up with a partially recovered database. Perhaps we should at least print a warning here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning here was to avoid crashing when .recovery()
is called the first time a Soup application starts up (since there are no logs at that point). A warning might be better, but that would mean outputting that when the application starts up the very first time as well, which might be slightly confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to accept a warning ("No logs for node {} found, starting out empty") on first run, I think!
After all, the common case for startup of a useful database is that it already has data in it ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed! Added a warning in 9bf5859.
Looks good! A few questions to check that I got the right understanding:
Some funky subtleties might exist if the write that raced ahead of the recovery is a deletion for something that exists in the to-be-recovered state. In practice, this can't happen currently, because a restart/recovery invalidates client mutators and getters (thus no deletions can be in flight), and because a post-restart deletion racing ahead of the recovery would fail on an empty post-restart base node. |
8c0d498
to
cc996f2
Compare
@ms705: That sounds about right! I left a comment regarding 1 here.
Yep, that's how I understand it as well. Since Making sure that no other packets can arrive before recovery starts altogether sounds like a good future improvement, perhaps by moving from an explicit |
a2cd7db
to
9bf5859
Compare
@ekmartin I think we're happy to merge this now 👍 I'll fix the new conflicts later tonight (and merge) unless you get there first. |
9bf5859
to
bf19ca7
Compare
@ms705: rebased 👍 |
The recovery tests started failing randomly at this assertion after rebasing (repeated the tests successfully 100 times pre-rebase to compare) - I'll investigate. |
src/tests.rs
Outdated
@@ -53,6 +54,12 @@ fn get_log_name(base: &str) -> String { | |||
format!("{}-{}-{}", base, current_time.sec, current_time.nsec) | |||
} | |||
|
|||
fn delete_log_files(log_name: String) { | |||
for log_path in glob::glob(&format!("./{}-*.json", log_name)).unwrap() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this is going to come back to bite us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still trying to figure out the test bug I mentioned above, but I realized that I probably wasn't actually taking down the graph before running .recover()
in the tests. Now that I do DeleteOnExit
naturally won't work, since that means the logs are deleted when the first graph is dropped.
I'm currently prefixing the logs with each tests' name, and added a timestamp in 458f854 to ensure that panics wouldn't break future test runs (since the test files wouldn't get cleaned up).
Why do you feel like this might cause issues in the future? Definitely open to improvement suggestions!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just get nervous when I see anything that does rm
and contains a *
. I'm sure it's fine, especially given that you both prefix and suffix the glob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does delete_log_files
get called from in the new world order? Is it just important for tests (where timestamps in the file names would avoid clashes, even though the developer would have to manually clean the logs), or does it serve a more general purpose?
Merging #43 should have fixed the failures that you saw. |
This ensures that subsequent test runs always use new log files, even if the previous test run caused a panic.
70cbe9b
to
3b76e91
Compare
Wraps the log file name in a LogName struct that implements Drop. This ensures that created log files are deleted when the struct goes out of scope (i.e. when the test ends).
👍 @jonhoo? |
dataflow/src/checktable/mod.rs
Outdated
@@ -285,6 +285,27 @@ impl CheckTable { | |||
} | |||
} | |||
|
|||
pub fn recover(&mut self, base: NodeIndex) -> (i64, Option<Box<HashMap<domain::Index, i64>>>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like a comment over this function explaining exactly what it does.
I think what it does is reserve a single timestamp for the recovery, and then make it the latest timestamp for every base, essentially to bootstrap the timestamping logic. However, I don't know why it is necessary? Aren't the bases all synchronized at the beginning with ts=0
? When the system crashes and comes back up, there are no writes anywhere in the graph, so taking a time seems unnecessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It actually reserves a timestamp for the recovery of a specific base, and then marks that time as the latest timestamp for all columns in that base. This is necessary because at ts=0
all base nodes are empty, and the recovery messages flowing through the graph are implemented as writes (and thus each must have some distinct timestamp assigned to them).
Cargo.toml
Outdated
@@ -78,6 +78,8 @@ default-features = false | |||
backtrace = { version = "0.3.2", features = ["serialize-serde"] } | |||
toml = "0.4.1" | |||
diff = "0.1.10" | |||
time = "0.1.38" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an unfortunate extra dependency. Still reading, so don't know yet why it's there. Maybe there's a good reason. But if there isn't, we should try to remove it.
@@ -30,6 +34,7 @@ pub struct Config { | |||
} | |||
|
|||
const BATCH_SIZE: usize = 256; | |||
const RECOVERY_BATCH_SIZE: usize = 512; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not larger? We know nothing else is happening in the graph until it has recovered anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fintelia suggested 512 - I'm not completely sure what the trade-offs are here. What do you think would be a good number?
dataflow/src/domain/mod.rs
Outdated
.iter() | ||
.map(|(index, node)| { | ||
let n = node.borrow(); | ||
(index.clone(), n.global_addr(), n.is_transactional()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need the clone
here, because index
is Copy
. I think you can just do *index
, or match against &index
in the map pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed! Didn't need the .clone
at all, like you said.
dataflow/src/domain/mod.rs
Outdated
let n = node.borrow(); | ||
(index.clone(), n.global_addr(), n.is_transactional()) | ||
}) | ||
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why collect if you're directly iterating over it below anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifetime issues: self.handle()
can't by called while self.nodes
is borrowed
dataflow/src/domain/mod.rs
Outdated
.collect(); | ||
|
||
for (local_addr, global_addr, is_transactional) in node_info { | ||
let checktable = self.transaction_state.get_checktable(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move outside the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
dataflow/src/domain/mod.rs
Outdated
BufReader::new(file) | ||
.lines() | ||
.filter_map(|line| { | ||
let line = line.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this an expect
instead, and then state why we're expecting it to not fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, given that you're already using filter_map
, just return None
if line.is_err()
. Something like:
line.ok().and_then(|line| -> Option<Vec<Records>> { serde_json::from_str(&line).ok() })
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to an .expect
as I thought it might be good to blow up in the case that this ever happens (IO-errors, or invalid UTF-8 in the file - if I understand it correctly).
Let me know if you disagree though.
https://github.com/mit-pdos/distributary/pull/37/files#diff-f3207323d6f34ad3c124fbcb4cb87a40R1612
let data: Records = chunk.collect(); | ||
let link = Link::new(local_addr, local_addr); | ||
if is_transactional { | ||
let (ts, prevs) = checktable.recover(global_addr).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. Shouldn't it be possible to just claim a single timestamp for the entire recovery (across all base nodes)? @fintelia may be able to shed some light.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a limitation of our transaction logic. Every timestamped message must originate from only a single base node. Further, every message from a base must also have its own distinct timestamp (which is why there can't be one timestamp per base either).
} | ||
} | ||
}) | ||
.for_each(|packet| self.handle(box packet)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this could deadlock if you had an A-B-A domain assignment. But we already disallow that elsewhere, so probably not an issue.
examples/basic-recipe.rs
Outdated
|
||
// Then create a new vote: | ||
println!("Casting vote..."); | ||
let uid = time::get_time().sec; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the time
crate needed for this? Why is SystemTime::now().duration_since(UNIX_EPOCH).as_secs()
not sufficient using std::time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea - fixed in a618597.
src/controller/mod.rs
Outdated
let node = &self.ingredients[*index]; | ||
let domain = self.domains.get_mut(&node.domain()).unwrap(); | ||
domain.wait_for_ack().unwrap(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this can deadlock. Imagine two domains that both have egresses to the other. Both are told to recover at the same time. If there's enough data, they will eventually fill up the channels between one another, and deadlock.
I think you'll have to wait for an ack from each node recovery sequentially unfortunately. At least until we can come up with a cleverer scheme.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's unfortunate :/ changed back to sequential recovery in 8126b20.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that sequential recovery is the safe option. Thinking about it, though, I don't think there is (currently) any way that the described deadlock would occur: it would require that a new domain adds a node into an existing domain
below its own nodes: i.e., an existing A -> B
relation turns into either A -> B -> A
(not permitted) or independent A -> B
, B -> A
paths (which require B
to add an edge, and thus a node, into A
). I don't think our current assignment logic can ever do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm hesitant to say that the current logic can't do that actually. It does funky things when there are joins (which I think essentially ends up as placing the join in the same domain as one of its ancestors).
src/tests.rs
Outdated
// Suffixes the given log prefix with a timestamp, ensuring that | ||
// subsequent test runs do not reuse log files in the case of failures. | ||
fn get_log_name(prefix: &str) -> String { | ||
let current_time = time::get_time(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See note further up about using std::time
for this.
@@ -376,6 +409,181 @@ fn it_works_with_arithmetic_aliases() { | |||
} | |||
|
|||
#[test] | |||
fn it_recovers_persisted_logs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this test.
See inline comments. Overall looks good though! |
This was done to avoid deadlocks in situations where two recovering domains have egresses to each other. See mit-pdos#37 (comment)
I think we're ready to merge this, so I will unless I hear any objections :-) |
My only hesitation is #37 (comment) |
That sounds like a broader limitation of the transactional logic, though, so not something we would want to fix within the scope of this PR. I agree that it would be nice to recover at a single timestamp or a single timestamp per base, but I suspect that doing so would involve some invasive changes unrelated to the basic recovery implemented here. Let's merge this first, and then improve it. |
@ms705 It is not clear to me that we'd even want recovery to use only a single timestamp. Recall that our buffering logic means that that there can be no pipeline parallelism in the processing of messages for a single timestamp |
@fintelia Good point! This would also cause the timestamp-per-base scheme to buffer all writes from a single base until another base (with a "prior" timestamp) has finished recovering, which would in effect serialize the recovery at joins. |
This adds a
Blender.recover()
method that goes through and replays the log entries it finds for its base nodes.persistence::Parameters
now has an optionallog_prefix
that'll be used to replacesoup
in e.g.soup-log_0_0-0.json
. Without this parallel tests would sometimes log to the same file - at the same time.Domain.handle
. Transactional packets are first passed throughchecktable.apply_batch
to retrieve a timestamp. I'm not sure if the latter is completely correct though - cc @fintelia.I've added a couple of tests, but let me know if there's anything else I should cover.
I also changed the basic example so that it calls
.recover()
and votes with a unique timestamp (so that the count increments each time it's run). Let me know if I should move it to another example or something though.