Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]: Curp Refactoring #169

Closed
1 task done
markcty opened this issue Feb 6, 2023 · 6 comments · Fixed by #178
Closed
1 task done

[Feature]: Curp Refactoring #169

markcty opened this issue Feb 6, 2023 · 6 comments · Fixed by #178
Assignees
Labels
enhancement New feature or request

Comments

@markcty
Copy link
Contributor

markcty commented Feb 6, 2023

Motivations

Currently, our curp server has the following pain points that might slow down our later developments:

  1. Poorly-structured state: Currently, the state of our curp server is filled with many things. It is not very maintainable since we will soon add persistent support to the project. And a big lock for the state is not efficient either.
  2. Poor ergonomics for developers and reviewers: Anyone who has worked with async Rust code development has likely struggled with the LockGuard scope issue. Given that LockGuard must be released at the await point, we must take special care in how we structure the code, which results in opaque and nested code that is difficult both for the developer to write and for reviewers to read. Things would go more smoothly if we could divide our code into two parts: the non-async (update of curp's state) component and the async (transport, io) half.
  3. Duplicated code and long functions: we need to dedup and create shorter and more readable code
  4. Preparation for the upcoming persistency and snapshot support: abstract storage layer and index should be added to log entry

Approach

As a solution, I suggest introducing a new abstraction layer called RawCurp. RawCurp can be viewed as a state machine. It is purely non-async. So the receiving and sending of requests and responses will be handled outside of RawCurp.

It looks like this:

pub(super) struct RawCurp<C: Command> {
    /// Curp state
    st: RwLock<State>,
    /// Additional leader state
    lst: RwLock<LeaderState>,
    /// Additional candidate state
    cst: Mutex<CandidateState<C>>,
    /// Curp logs
    log: RwLock<Log<C>>,
    /// Relevant context
    ctx: Context<C>,
}

impl<C: 'static + Command> RawCurp<C> {
    /// Handle `append_entries`
    /// Return Ok(term) if succeeds
    /// Return Err(term, hint_next_index_for_leader) if fails
    pub(super) fn handle_append_entries(
            &self,
            term: u64,
            leader_id: String,
            prev_log_index: usize,
            prev_log_term: u64,
            entries: Vec<LogEntry<C>>,
            leader_commit: usize,
        ) -> Result<u64, (u64, usize)>;
  
    /// Handle `append_entries` response
    /// Return Ok(append_entries_succeeded)
    /// Return Err(()) if self is no longer the leader
    pub(super) fn handle_append_entries_resp(
          &self,
          follower_id: &ServerId,
          last_sent_index: Option<usize>, // None means the ae is a heartbeat
          term: u64,
          success: bool,
          hint_index: usize,
      ) -> Result<bool, ()>;
  
    // other handlers ...

    pub(super) fn tick(&self) -> TickAction;
}

Before refactoring: Protocol, State, bg_tasks are messed up together
image

After refactoring:
image

CurpNode will handle all transport-related functionalities and error handlings, while RawCurp only needs to take care of internal structures and their consistencies.

I plan to deliver this refactor in 4 prs.

  1. A refactor of command workers: currently, we let bg_tasks to start execute workers and after sync workers, which can be encapsulated into the cmd workers. The cmd workers should only expose a channel through which the cmd can be sent.
  2. RawCurp without removing the bg_tasks: I will try my best to reserve the current bg_tasks interface and replace the inner code with the RawCurp. This will help reduce the burden of reviewers.
  3. Final enhancement: including rewriting tests and remove bg_tasks; refactor LogEntry: only one cmd for each log entry and it should contain index; improve error handling.

P.S. Although I will remove the bg_tasks in the end, it doesn't mean that there will be no background tasks in CurpNode. In fact, there will still be bg_tick and bg_leader_calibrate_follower, but since most logics are handled in the curp module, there will be not much code.

About Locks in RawCurp

RawCurp has 4 locks:

//! Lock order should be:
//!     1. self.st
//!     2. self.ctx || self.ltx (there is no need for grabbing both)
//!     3. self.log

pub(super) struct RawCurp<C: Command> {
    /// Curp state
    st: RwLock<State>,
    /// Additional leader state
    lst: RwLock<LeaderState>,
    /// Additional candidate state
    cst: Mutex<CandidateState<C>>,
    /// Curp logs
    log: RwLock<Log<C>>,
    /// Relevant context
    ctx: Context<C>,
}


pub(super) struct State {
    /* persisted state */
    /// Current term
    pub(super) term: u64,
    /// Candidate id that received vote in current term
    pub(super) voted_for: Option<ServerId>,

    /* volatile state */
    /// Role of the server
    pub(super) role: Role,
    /// Cached id of the leader.
    pub(super) leader_id: Option<ServerId>,
}

pub(super) struct CandidateState<C> {
    /// Collected speculative pools, used for recovery
    pub(super) sps: HashMap<ServerId, Vec<Arc<C>>>,
    /// Votes received in the election
    pub(super) votes_received: u64,
}

pub(super) struct LeaderState {
    /// For each server, index of the next log entry to send to that server
    pub(super) next_index: HashMap<ServerId, usize>,
    /// For each server, index of highest log entry known to be replicated on server
    pub(super) match_index: HashMap<ServerId, usize>,
}


pub(super) struct Log<C: Command> {
    /// Log entries, should be persisted
    pub(super) entries: Vec<LogEntry<C>>,
    /// Index of highest log entry known to be committed
    pub(super) commit_index: usize,
    /// Index of highest log entry applied to state machine
    pub(super) last_applied: usize,
}
Function | Lock st lst cst log Notes
tick(leader) 1 0 0 1 grab and release state read lock to check role, grab log read lock to generate heartbeats
tick(candidate/follower) 1 0 1 1 grab and release state read lock to check role, grab candidate state write lock and log read lock to become candidate and generate votes
handle_append_entries 1 0 0 1 grab upgradable read state lock to check term(upgrade to write if term is updated), while holding the state log, grab log write lock to append new logs
handle_append_entries_resp 1 1 0 1 grab and release state read lock to check role, grab and release leader state write lock to update match_index for follower, grab and release leader state read lock and log read lock to check if commit_index updated, if can grab log write lock to update commit index
handle_vote 1 0 0 1 grab state write lock to vote, grab log read lock to check if candidate's log is up-to-date
handle_vote_resp 1 1 1 1 grab state and cadidate write lock to handle vote response. If election succeeds, will release candidate lock, and grab leader state write log(to reset next_index) and log write lock(to recover commands)

Code of Conduct

  • I agree to follow this project's Code of Conduct
@markcty markcty added the enhancement New feature or request label Feb 6, 2023
@markcty
Copy link
Contributor Author

markcty commented Feb 6, 2023

Close #73

@Phoenix500526
Copy link
Collaborator

What‘s the meaning of the relevant context?

@markcty
Copy link
Contributor Author

markcty commented Feb 7, 2023

What‘s the meaning of the relevant context?

Just miscellanea that will be used in RawCurp:

/// Relevant context for Curp
struct Context<C: Command> {
    /// Id of the server
    id: ServerId,
    /// Other server ids and addresses
    others: HashSet<ServerId>,
    /// Timeout
    timeout: Arc<ServerTimeout>,
    /// Cmd board for tracking the cmd sync results
    cb: CmdBoardRef<C>,
    /// Speculative pool
    sp: SpecPoolRef<C>,
    /// Tx to send leader changes
    leader_tx: broadcast::Sender<Option<ServerId>>,
    /// Election tick
    election_tick: AtomicU8,
    /// Heartbeat opt out flag
    hb_opt: AtomicBool,
    /// Tx to send cmds to execute and do after sync
    cmd_tx: Box<dyn CmdExeSenderInterface<C>>,
}

@Phoenix500526
Copy link
Collaborator

What's the difference between LeaderState and CandidateState? How come the CandidateState is wrapped in a Mutex while the other is in a RwLock?

@markcty
Copy link
Contributor Author

markcty commented Feb 7, 2023

What's the difference between LeaderState and CandidateState? How come the CandidateState is wrapped in a Mutex while the other is in a RwLock?

pub(super) struct State {
    /* persisted state */
    /// Current term
    pub(super) term: u64,
    /// Candidate id that received vote in current term
    pub(super) voted_for: Option<ServerId>,

    /* volatile state */
    /// Role of the server
    pub(super) role: Role,
    /// Cached id of the leader.
    pub(super) leader_id: Option<ServerId>,
}

pub(super) struct CandidateState<C> {
    /// Collected speculative pools, used for recovery
    pub(super) sps: HashMap<ServerId, Vec<Arc<C>>>,
    /// Votes received in the election
    pub(super) votes_received: u64,
}

pub(super) struct LeaderState {
    /// For each server, index of the next log entry to send to that server
    pub(super) next_index: HashMap<ServerId, usize>,
    /// For each server, index of highest log entry known to be replicated on server
    pub(super) match_index: HashMap<ServerId, usize>,
}

Why Mutex for candidate state?

  1. CandidateState is only used in election, so it's idle most of the times. While LeaderState and State is used all the times, use RwLock can increase performance.
  2. When accessing CandidateState, we always need to modify its value.

@markcty
Copy link
Contributor Author

markcty commented Feb 7, 2023

FYI: the Log struct

pub(super) struct Log<C: Command> {
    /// Log entries
    pub(super) entries: Vec<LogEntry<C>>,
    /// Index of highest log entry known to be committed
    pub(super) commit_index: usize,
    /// Index of highest log entry applied to state machine
    pub(super) last_applied: usize,
}

@Phoenix500526 Phoenix500526 self-assigned this Feb 7, 2023
This was referenced Feb 8, 2023
markcty added a commit to markcty/Xline that referenced this issue Feb 20, 2023
move propose logic from curp_node to raw_curp
markcty added a commit to markcty/Xline that referenced this issue Feb 23, 2023
move propose logic from curp_node to raw_curp
markcty added a commit to markcty/Xline that referenced this issue Feb 23, 2023
move propose logic from curp_node to raw_curp
markcty added a commit to markcty/Xline that referenced this issue Feb 24, 2023
move propose logic from curp_node to raw_curp
@markcty markcty linked a pull request Feb 27, 2023 that will close this issue
markcty added a commit that referenced this issue Mar 5, 2023
move propose logic from curp_node to raw_curp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants