diff --git a/Cargo.toml b/Cargo.toml index 2bcf7272..1b3704f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,13 +19,13 @@ members = ["proto", "harness", "datadriven"] default = ["protobuf-codec", "default-logger"] # Enable failpoints failpoints = ["fail/failpoints"] -protobuf-codec = ["raft-proto/protobuf-codec", "bytes"] +protobuf-codec = ["raft-proto/protobuf-codec"] prost-codec = ["raft-proto/prost-codec"] default-logger = ["slog-stdlog", "slog-envlogger", "slog-term"] # Make sure to synchronize updates with Harness. [dependencies] -bytes = { version = "1", optional = true } +bytes = "1.0" fxhash = "0.2.1" fail = { version = "0.4", optional = true } getset = "0.1.1" @@ -37,6 +37,7 @@ slog = "2.2" slog-envlogger = { version = "2.1.0", optional = true } slog-stdlog = { version = "4", optional = true } slog-term = { version = "2.4.0", optional = true } +lazy_static = "1.4" [dev-dependencies] criterion = "0.3" diff --git a/src/derializer.rs b/src/derializer.rs new file mode 100644 index 00000000..0e1060a7 --- /dev/null +++ b/src/derializer.rs @@ -0,0 +1,147 @@ +use lazy_static::lazy_static; +use raft_proto::prelude::{ConfChange, ConfChangeV2, Message, Snapshot}; +use std::sync::{Arc, RwLock}; + +lazy_static! { + static ref DESERIALIZER: RwLock> = + RwLock::new(Arc::new(DefaultDeserializer)); +} + +use crate::prelude::Entry; + +#[derive(Debug)] +pub enum Bytes { + Prost(Vec), + Protobuf(bytes::Bytes), +} + +impl From> for Bytes { + fn from(vec: Vec) -> Self { + Bytes::Prost(vec) + } +} + +impl From for Bytes { + fn from(bytes: bytes::Bytes) -> Self { + Bytes::Protobuf(bytes) + } +} + +/// Sample Docs +pub trait CustomDeserializer { + /// Sample Docs + fn entry_context_deserialize(&self, v: &Bytes) -> String { + format!("{:?}", v) + } + + /// Sample Docs + fn entry_data_deserialize(&self, v: &Bytes) -> String { + format!("{:?}", v) + } + + /// Sample Docs + fn confchangev2_context_deserialize(&self, v: &Bytes) -> String { + format!("{:?}", v) + } + + /// Sample Docs + fn confchange_context_deserialize(&self, v: &Bytes) -> String { + format!("{:?}", v) + } + + /// Sample Docs + fn message_context_deserializer(&self, v: &Bytes) -> String { + format!("{:?}", v) + } + + /// Sample Docs + fn snapshot_data_deserializer(&self, v: &Bytes) -> String { + format!("{:?}", v) + } +} + +struct DefaultDeserializer; +impl CustomDeserializer for DefaultDeserializer {} + +/// Sample Docs +pub fn format_entry(entry: &Entry) -> String { + let derializer = DESERIALIZER.read().unwrap(); + + format!( + "Entry {{ context: {context:}, data: {data:}, entry_type: {entry_type:?}, index: {index:}, sync_log: {sync_log:}, term: {term:} }}", + data=derializer.entry_data_deserialize(&entry.data.clone().into()), + context=derializer.entry_context_deserialize(&entry.context.clone().into()), + entry_type=entry.get_entry_type(), + index=entry.get_index(), + sync_log=entry.get_sync_log(), + term=entry.get_term(), + ) +} + +/// Sample Docs +pub fn format_confchange(cc: &ConfChange) -> String { + let derializer = DESERIALIZER.read().unwrap(); + + format!( + "ConfChange {{ change_type: {change_type:?}, node_id: {node_id:}, context: {context:}, id: {id:} }}", + change_type = cc.get_change_type(), + node_id = cc.get_node_id(), + id = cc.get_id(), + context = derializer.confchange_context_deserialize(&cc.context.clone().into()) + ) +} + +/// Sample Docs +pub fn format_confchangev2(cc: &ConfChangeV2) -> String { + let derializer = DESERIALIZER.read().unwrap(); + + format!( + "ConfChangeV2 {{ transition: {transition:?}, changes: {changes:?}, context: {context:} }}", + transition = cc.transition, + changes = cc.changes, + context = derializer.confchangev2_context_deserialize(&cc.context.clone().into()) + ) +} + +/// Sample Docs +pub fn format_snapshot(snapshot: &Snapshot) -> String { + let derializer = DESERIALIZER.read().unwrap(); + + format!( + "Snapshot {{ data: {data:}, metadata: {metadata:?} }}", + data = derializer.snapshot_data_deserializer(&snapshot.data.clone().into()), + metadata = snapshot.metadata, + ) +} + +/// Sample Docs +pub fn format_message(msg: &Message) -> String { + let derializer = DESERIALIZER.read().unwrap(); + + format!( + "Message {{ msg_type: {msg_type:?}, to: {to:}, from: {from:}, term: {term:}, log_term: {log_term:}, index: {index:}, entries: [{entries:}], commit: {commit:}, commit_term: {commit_term:}, snapshot: {snapshot:}, request_snapshot: {request_snapshot:}, reject: {reject:}, reject_hint: {reject_hint:}, context: {context:}, deprecated_priority: {deprecated_priority:}, priority: {priority:} }}", + msg_type=msg.get_msg_type(), + to=msg.get_to(), + from=msg.get_from(), + term=msg.get_term(), + log_term=msg.get_log_term(), + index=msg.get_index(), + entries=msg.get_entries().iter().map(|e| format_entry(e)).collect::>().join(", "), + commit=msg.get_commit(), + commit_term=msg.get_commit_term(), + snapshot=format_snapshot(msg.get_snapshot()), + request_snapshot=msg.get_request_snapshot(), + reject=msg.get_reject(), + reject_hint=msg.get_reject_hint(), + context=derializer.message_context_deserializer(&msg.context.clone().into()), + deprecated_priority=msg.get_deprecated_priority(), + priority=msg.get_priority(), + ) +} + +/// Sample Docs +pub fn set_custom_deserializer(deserializer: D) { + let deserializer = Arc::new(deserializer); + let mut global_deserializer = DESERIALIZER.write().unwrap(); + *global_deserializer = deserializer; +} diff --git a/src/lib.rs b/src/lib.rs index 131d716d..9d8f6fcc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -504,6 +504,8 @@ macro_rules! fatal { mod confchange; mod config; +#[allow(missing_docs)] +pub mod derializer; mod errors; mod log_unstable; mod quorum; diff --git a/src/raft.rs b/src/raft.rs index 5af1eca4..78a1d786 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -18,6 +18,7 @@ use std::cmp; use std::convert::TryFrom; use std::ops::{Deref, DerefMut}; +use crate::derializer::{format_message, format_confchangev2}; use crate::eraftpb::{ ConfChange, ConfChangeV2, ConfState, Entry, EntryType, HardState, Message, MessageType, Snapshot, @@ -609,7 +610,7 @@ impl RaftCore { "Sending from {from} to {to}", from = self.id, to = m.to; - "msg" => ?m, + "msg" => format_message(&m), ); if m.from == INVALID_ID { m.from = self.id; @@ -2074,7 +2075,7 @@ impl Raft { info!( self.logger, "ignoring conf change"; - "conf change" => ?cc, + "conf change" => format_confchangev2(&cc), "reason" => reason, "config" => ?self.prs.conf(), "index" => self.pending_conf_index, diff --git a/src/raft_log.rs b/src/raft_log.rs index fcfcf32e..d0fe2b32 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -19,6 +19,7 @@ use std::cmp; use slog::warn; use slog::Logger; +use crate::derializer::format_entry; use crate::eraftpb::{Entry, Snapshot}; use crate::errors::{Error, Result, StorageError}; use crate::log_unstable::Unstable; @@ -359,7 +360,7 @@ impl RaftLog { trace!( self.unstable.logger, "Entries being appended to unstable list"; - "ents" => ?ents, + "ents" => ents.iter().map(|e| format_entry(e)).collect::>().join(", "), ); if ents.is_empty() { return self.last_index();