Skip to content

Commit

Permalink
Upgrade raft to: version="0.6" stable version
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Oct 5, 2022
1 parent 91009fa commit 7adbd57
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 52 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -13,11 +13,11 @@ futures = "0.3.16"
async-trait = "0.1.50"
bincode = "1.3"
log = { version = "0.4", features = ["std"] }
raft = { git = "https://github.com/tikv/raft-rs.git", rev = "eec5ead3ebe8f790154c94cb5f5adb0063a674e9", features = ["protobuf-codec"], default-features = false }
raft = { version = "0.6", features = ["protobuf-codec"], default-features = false }
serde = { version = "1.0", features = ["derive"] }
slog = "2"
thiserror = "1.0"
tokio = { version = "1.10", features = ["full"] }
tokio = { version = "1", features = ["full"] }
tonic = "0.6"
prost = "0.9"
protobuf = { version = "2.14.0", features = ["with-bytes", "with-serde"] }
Expand Down
4 changes: 2 additions & 2 deletions src/raft.rs
Expand Up @@ -382,8 +382,8 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
let mut change = ConfChange::default();
change.set_node_id(node_id);
change.set_change_type(ConfChangeType::AddNode);
// change.set_context(prost::bytes::Bytes::from(serialize(&self.addr)?));
change.set_context(serialize(&self.addr)?);
change.set_context(prost::bytes::Bytes::from(serialize(&self.addr)?));
// change.set_context(serialize(&self.addr)?);

let change = RiteraftConfChange {
inner: protobuf::Message::write_to_bytes(&change)?,
Expand Down
102 changes: 56 additions & 46 deletions src/raft_node.rs
Expand Up @@ -802,7 +802,6 @@ impl<S: Store + 'static> RaftNode<S> {
}
}

#[inline]
async fn on_ready(
&mut self,
) -> Result<()> {
Expand All @@ -812,6 +811,21 @@ impl<S: Store + 'static> RaftNode<S> {

let mut ready = self.ready();

if !ready.messages().is_empty() {
// Send out the messages.
self.send_messages(ready.take_messages());
}

if *ready.snapshot() != Snapshot::default() {
let snapshot = ready.snapshot();
self.store.restore(snapshot.get_data()).await?;
let store = self.mut_store();
store.apply_snapshot(snapshot.clone())?;
}

self.handle_committed_entries(ready.take_committed_entries())
.await?;

if !ready.entries().is_empty() {
let entries = ready.entries();
let store = self.mut_store();
Expand All @@ -824,8 +838,29 @@ impl<S: Store + 'static> RaftNode<S> {
store.set_hard_state(hs)?;
}

//for message in ready.take_messages() {
for message in ready.messages.drain(..) {
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
self.send_messages(ready.take_persisted_messages());
}
let mut light_rd = self.advance(ready);

if let Some(commit) = light_rd.commit_index() {
let store = self.mut_store();
store.set_hard_state_comit(commit)?;
}
// Send out the messages.
self.send_messages(light_rd.take_messages());
// Apply all committed entries.
self.handle_committed_entries(light_rd.take_committed_entries())
.await?;
self.advance_apply();

Ok(())
}

fn send_messages(&mut self, msgs: Vec<RaftMessage>) {
for message in msgs {
// for message in ready.messages.drain(..) {
let client_id = message.get_to();
let client = match self.peer(client_id) {
Some(peer) => peer,
Expand All @@ -845,50 +880,25 @@ impl<S: Store + 'static> RaftNode<S> {
// }
tokio::spawn(message_sender.send());
}
}

if !ready.snapshot().is_empty() {
let snapshot = ready.snapshot();
self.store.restore(snapshot.get_data()).await?;
let store = self.mut_store();
store.apply_snapshot(snapshot.clone())?;
}

if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
let store = self.mut_store();
store.set_hard_state(hs)?;
}

if let Some(committed_entries) = ready.committed_entries.take() {
// if let Some(committed_entries) = ready.take_committed_entries() {
//log::info!("on_ready, committed_entries: {}", committed_entries.len());
// let committed_entries_len = committed_entries.len();
// let mut _last_apply_index = 0;
for entry in committed_entries {
//for entry in ready.take_committed_entries() {
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
// _last_apply_index = entry.get_index();
debug!(
"entry.get_entry_type(): {:?}, entry.get_data().is_empty():{}",
entry.get_entry_type(),
entry.get_data().is_empty(),
);
if entry.get_data().is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
continue;
}
match entry.get_entry_type() {
EntryType::EntryNormal => self.handle_normal(&entry).await?,
EntryType::EntryConfChange => {
self.handle_config_change(&entry).await?
}
EntryType::EntryConfChangeV2 => unimplemented!(),
}
async fn handle_committed_entries(
&mut self,
committed_entries: Vec<Entry>,
) -> Result<()> {
// Fitler out empty entries produced by new elected leaders.
for entry in committed_entries {
if entry.data.is_empty() {
// From new elected leaders.
continue;
}
if let EntryType::EntryConfChange = entry.get_entry_type() {
self.handle_config_change(&entry).await?;
} else {
self.handle_normal(&entry).await?;
}
}

self.advance(ready);
Ok(())
}

Expand Down Expand Up @@ -923,12 +933,12 @@ impl<S: Store + 'static> RaftNode<S> {

if let Ok(cs) = self.apply_conf_change(&change) {
let last_applied = self.raft.raft_log.applied;
let snapshot = self.store.snapshot().await?;
let snapshot = prost::bytes::Bytes::from(self.store.snapshot().await?);
{
let store = self.mut_store();
store.set_conf_state(&cs)?;
store.compact(last_applied)?;
let _ = store.create_snapshot(snapshot)?;
store.create_snapshot(snapshot)?;
}
}

Expand Down Expand Up @@ -1022,7 +1032,7 @@ impl<S: Store + 'static> RaftNode<S> {
//@TODO 600secs
self.last_snap_time = Instant::now();
let last_applied = self.raft.raft_log.applied;
let snapshot = self.store.snapshot().await?;
let snapshot = prost::bytes::Bytes::from(self.store.snapshot().await?);
let store = self.mut_store();
store.compact(last_applied)?;
let first_index = store.first_index().unwrap_or(0);
Expand Down
15 changes: 13 additions & 2 deletions src/storage.rs
Expand Up @@ -6,8 +6,9 @@ use crate::error::Result;
pub trait LogStore: Storage {
fn append(&mut self, entries: &[Entry]) -> Result<()>;
fn set_hard_state(&mut self, hard_state: &HardState) -> Result<()>;
fn set_hard_state_comit(&mut self, comit: u64) -> Result<()>;
fn set_conf_state(&mut self, conf_state: &ConfState) -> Result<()>;
fn create_snapshot(&mut self, data: Vec<u8>) -> Result<()>;
fn create_snapshot(&mut self, data: prost::bytes::Bytes) -> Result<()>;
fn apply_snapshot(&mut self, snapshot: Snapshot) -> Result<()>;
fn compact(&mut self, index: u64) -> Result<()>;
}
Expand Down Expand Up @@ -41,6 +42,15 @@ impl LogStore for MemStorage {
Ok(())
}

#[inline]
fn set_hard_state_comit(&mut self, comit: u64) -> Result<()> {
let mut store = self.core.wl();
let mut hard_state = store.hard_state().clone();
hard_state.set_commit(comit);
store.set_hardstate(hard_state);
Ok(())
}

#[inline]
fn set_conf_state(&mut self, conf_state: &ConfState) -> Result<()> {
let mut store = self.core.wl();
Expand All @@ -49,7 +59,7 @@ impl LogStore for MemStorage {
}

#[inline]
fn create_snapshot(&mut self, data: Vec<u8>) -> Result<()> {
fn create_snapshot(&mut self, data: prost::bytes::Bytes) -> Result<()> {
let mut snapshot = self.core.snapshot(0)?;
snapshot.set_data(data);
self.snapshot = snapshot;
Expand All @@ -69,6 +79,7 @@ impl LogStore for MemStorage {
store.compact(index)?;
Ok(())
}

}

impl Storage for MemStorage {
Expand Down

0 comments on commit 7adbd57

Please sign in to comment.