Skip to content

Commit

Permalink
Merge pull request #41 from pingcap/siddontang/update-enum-msg
Browse files Browse the repository at this point in the history
raftserver: simplify and cleanup
  • Loading branch information
siddontang committed Jan 21, 2016
2 parents 0899bc3 + 702d13e commit fcab78c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 34 deletions.
1 change: 0 additions & 1 deletion src/raftserver/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl Conn {
}

bufs.push(ConnData {
token: self.token,
msg_id: self.last_msg_id,
data: payload.flip(),
});
Expand Down
3 changes: 3 additions & 0 deletions src/raftserver/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use std::vec::Vec;

use mio::Token;

use raftserver::{Result, ConnData, Sender, TimerMsg};

// ServerHandler is for server logic, we must implement it for our raft server.
Expand All @@ -19,6 +21,7 @@ pub trait ServerHandler :Sized {
// You can use sender to communicate with event loop.
fn handle_read_data(&mut self,
sender: &Sender,
token: Token,
msgs: Vec<ConnData>)
-> Result<(Vec<ConnData>)> {
Ok((msgs))
Expand Down
32 changes: 19 additions & 13 deletions src/raftserver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub struct Config {
}

pub struct ConnData {
token: Token,
msg_id: u64,
data: ByteBuf,
}
Expand All @@ -52,22 +51,26 @@ pub enum TimerMsg {
None,
}

pub struct TimerData {
delay: u64,
msg: TimerMsg,
}

pub enum Msg {
// Quit event loop.
Quit,
// Read data from connection.
ReadData(ConnData),
ReadData {
token: Token,
data: ConnData,
},
// Write data to connection.
WriteData(ConnData),
WriteData {
token: Token,
data: ConnData,
},
// Tick is for base internal tick message.
Tick,
// Timer is for custom timeout message.
Timer(TimerData),
Timer {
delay: u64,
msg: TimerMsg,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -117,17 +120,20 @@ impl Sender {
Ok(())
}

pub fn write_data(&self, data: ConnData) -> Result<()> {
try!(self.send(Msg::WriteData(data)));
pub fn write_data(&self, token: Token, data: ConnData) -> Result<()> {
try!(self.send(Msg::WriteData {
token: token,
data: data,
}));

Ok(())
}

pub fn timeout_ms(&self, delay: u64, m: TimerMsg) -> Result<()> {
try!(self.send(Msg::Timer(TimerData {
try!(self.send(Msg::Timer {
delay: delay,
msg: m,
})));
}));

Ok(())
}
Expand Down
54 changes: 34 additions & 20 deletions src/raftserver/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mio::{Token, Handler, EventLoop, EventSet, PollOpt};
use mio::tcp::TcpListener;

use raftserver::{SERVER_TOKEN, FIRST_CUSTOM_TOKEN, DEFAULT_BASE_TICK_MS};
use raftserver::{Msg, Sender, Result, ConnData, TimerData};
use raftserver::{Msg, Sender, Result, ConnData, TimerMsg};
use raftserver::conn::Conn;
use raftserver::handler::ServerHandler;

Expand Down Expand Up @@ -38,10 +38,14 @@ impl<T: ServerHandler> Server<T> {
Ok(())
}

fn register_timer(&mut self, event_loop: &mut EventLoop<Server<T>>, data: TimerData) {
// we have already checked when sender.
let delay = data.delay;
let token = Msg::Timer(data);
fn register_timer(&mut self,
event_loop: &mut EventLoop<Server<T>>,
delay: u64,
msg: TimerMsg) {
let token = Msg::Timer {
delay: delay,
msg: msg,
};
event_loop.timeout_ms(token, delay);
}

Expand Down Expand Up @@ -111,17 +115,24 @@ impl<T: ServerHandler> Server<T> {
Some(conn) => msgs = conn.read(event_loop),
}

msgs.and_then(|msgs| {
self.handler
.handle_read_data(&self.sender, msgs)
.and_then(|res| {
msgs.and_then(|msgs| self.handler.handle_read_data(&self.sender, token, msgs))
.and_then(|res| {
if res.len() == 0 {
return Ok(());
}

// append to write buffer here, no need using sender to notify.
if let Some(conn) = self.conns.get_mut(&token) {
for data in res {
try!(self.sender.write_data(data));
conn.append_write_buf(data);
}
Ok(())
})
});
try!(conn.reregister_writeable(event_loop));
}
Ok(())
})
.map_err(|e| warn!("handle read conn err {:?}", e));
}

}
}

Expand All @@ -135,8 +146,11 @@ impl<T: ServerHandler> Server<T> {
};
}

fn handle_writedata(&mut self, event_loop: &mut EventLoop<Server<T>>, data: ConnData) {
if let Some(conn) = self.conns.get_mut(&data.token) {
fn handle_writedata(&mut self,
event_loop: &mut EventLoop<Server<T>>,
token: Token,
data: ConnData) {
if let Some(conn) = self.conns.get_mut(&token) {
conn.append_write_buf(data);
conn.reregister_writeable(event_loop);
}
Expand All @@ -150,9 +164,9 @@ impl<T: ServerHandler> Server<T> {
self.register_tick(event_loop);
}

fn handle_timer(&mut self, _: &mut EventLoop<Server<T>>, data: TimerData) {
fn handle_timer(&mut self, _: &mut EventLoop<Server<T>>, msg: TimerMsg) {
self.handler
.handle_timer(&self.sender, data.msg)
.handle_timer(&self.sender, msg)
.map_err(|e| warn!("handle timer err {:?}", e));
}
}
Expand All @@ -179,16 +193,16 @@ impl<T: ServerHandler> Handler for Server<T> {
fn notify(&mut self, event_loop: &mut EventLoop<Server<T>>, msg: Msg) {
match msg {
Msg::Quit => event_loop.shutdown(),
Msg::WriteData(data) => self.handle_writedata(event_loop, data),
Msg::Timer(data) => self.register_timer(event_loop, data),
Msg::WriteData{token, data} => self.handle_writedata(event_loop, token, data),
Msg::Timer{delay, msg} => self.register_timer(event_loop, delay, msg),
_ => panic!("unexpected msg"),
}
}

fn timeout(&mut self, event_loop: &mut EventLoop<Server<T>>, msg: Msg) {
match msg {
Msg::Tick => self.handle_tick(event_loop),
Msg::Timer(data) => self.handle_timer(event_loop, data),
Msg::Timer{msg, ..} => self.handle_timer(event_loop, msg),
_ => panic!("unexpected msg"),
}
}
Expand Down

0 comments on commit fcab78c

Please sign in to comment.