Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftserver: simplify and cleanup #41

Merged
merged 1 commit into from
Jan 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/raftserver/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl Conn {
}

bufs.push(ConnData {
token: self.token,
msg_id: self.last_msg_id,
data: payload.flip(),
});
Expand Down
3 changes: 3 additions & 0 deletions src/raftserver/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

use std::vec::Vec;

use mio::Token;

use raftserver::{Result, ConnData, Sender, TimerMsg};

// ServerHandler is for server logic, we must implement it for our raft server.
Expand All @@ -19,6 +21,7 @@ pub trait ServerHandler :Sized {
// You can use sender to communicate with event loop.
fn handle_read_data(&mut self,
sender: &Sender,
token: Token,
msgs: Vec<ConnData>)
-> Result<(Vec<ConnData>)> {
Ok((msgs))
Expand Down
32 changes: 19 additions & 13 deletions src/raftserver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub struct Config {
}

pub struct ConnData {
token: Token,
msg_id: u64,
data: ByteBuf,
}
Expand All @@ -52,22 +51,26 @@ pub enum TimerMsg {
None,
}

pub struct TimerData {
delay: u64,
msg: TimerMsg,
}

pub enum Msg {
// Quit event loop.
Quit,
// Read data from connection.
ReadData(ConnData),
ReadData {
token: Token,
data: ConnData,
},
// Write data to connection.
WriteData(ConnData),
WriteData {
token: Token,
data: ConnData,
},
// Tick is for base internal tick message.
Tick,
// Timer is for custom timeout message.
Timer(TimerData),
Timer {
delay: u64,
msg: TimerMsg,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -117,17 +120,20 @@ impl Sender {
Ok(())
}

pub fn write_data(&self, data: ConnData) -> Result<()> {
try!(self.send(Msg::WriteData(data)));
pub fn write_data(&self, token: Token, data: ConnData) -> Result<()> {
try!(self.send(Msg::WriteData {
token: token,
data: data,
}));

Ok(())
}

pub fn timeout_ms(&self, delay: u64, m: TimerMsg) -> Result<()> {
try!(self.send(Msg::Timer(TimerData {
try!(self.send(Msg::Timer {
delay: delay,
msg: m,
})));
}));

Ok(())
}
Expand Down
54 changes: 34 additions & 20 deletions src/raftserver/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mio::{Token, Handler, EventLoop, EventSet, PollOpt};
use mio::tcp::TcpListener;

use raftserver::{SERVER_TOKEN, FIRST_CUSTOM_TOKEN, DEFAULT_BASE_TICK_MS};
use raftserver::{Msg, Sender, Result, ConnData, TimerData};
use raftserver::{Msg, Sender, Result, ConnData, TimerMsg};
use raftserver::conn::Conn;
use raftserver::handler::ServerHandler;

Expand Down Expand Up @@ -38,10 +38,14 @@ impl<T: ServerHandler> Server<T> {
Ok(())
}

fn register_timer(&mut self, event_loop: &mut EventLoop<Server<T>>, data: TimerData) {
// we have already checked when sender.
let delay = data.delay;
let token = Msg::Timer(data);
fn register_timer(&mut self,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why mut here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the function in server, we can treat all as mut, no need to distinguish mut or not.

event_loop: &mut EventLoop<Server<T>>,
delay: u64,
msg: TimerMsg) {
let token = Msg::Timer {
delay: delay,
msg: msg,
};
event_loop.timeout_ms(token, delay);
}

Expand Down Expand Up @@ -111,17 +115,24 @@ impl<T: ServerHandler> Server<T> {
Some(conn) => msgs = conn.read(event_loop),
}

msgs.and_then(|msgs| {
self.handler
.handle_read_data(&self.sender, msgs)
.and_then(|res| {
msgs.and_then(|msgs| self.handler.handle_read_data(&self.sender, token, msgs))
.and_then(|res| {
if res.len() == 0 {
return Ok(());
}

// append to write buffer here, no need using sender to notify.
if let Some(conn) = self.conns.get_mut(&token) {
for data in res {
try!(self.sender.write_data(data));
conn.append_write_buf(data);
}
Ok(())
})
});
try!(conn.reregister_writeable(event_loop));
}
Ok(())
})
.map_err(|e| warn!("handle read conn err {:?}", e));
}

}
}

Expand All @@ -135,8 +146,11 @@ impl<T: ServerHandler> Server<T> {
};
}

fn handle_writedata(&mut self, event_loop: &mut EventLoop<Server<T>>, data: ConnData) {
if let Some(conn) = self.conns.get_mut(&data.token) {
fn handle_writedata(&mut self,
event_loop: &mut EventLoop<Server<T>>,
token: Token,
data: ConnData) {
if let Some(conn) = self.conns.get_mut(&token) {
conn.append_write_buf(data);
conn.reregister_writeable(event_loop);
}
Expand All @@ -150,9 +164,9 @@ impl<T: ServerHandler> Server<T> {
self.register_tick(event_loop);
}

fn handle_timer(&mut self, _: &mut EventLoop<Server<T>>, data: TimerData) {
fn handle_timer(&mut self, _: &mut EventLoop<Server<T>>, msg: TimerMsg) {
self.handler
.handle_timer(&self.sender, data.msg)
.handle_timer(&self.sender, msg)
.map_err(|e| warn!("handle timer err {:?}", e));
}
}
Expand All @@ -179,16 +193,16 @@ impl<T: ServerHandler> Handler for Server<T> {
fn notify(&mut self, event_loop: &mut EventLoop<Server<T>>, msg: Msg) {
match msg {
Msg::Quit => event_loop.shutdown(),
Msg::WriteData(data) => self.handle_writedata(event_loop, data),
Msg::Timer(data) => self.register_timer(event_loop, data),
Msg::WriteData{token, data} => self.handle_writedata(event_loop, token, data),
Msg::Timer{delay, msg} => self.register_timer(event_loop, delay, msg),
_ => panic!("unexpected msg"),
}
}

fn timeout(&mut self, event_loop: &mut EventLoop<Server<T>>, msg: Msg) {
match msg {
Msg::Tick => self.handle_tick(event_loop),
Msg::Timer(data) => self.handle_timer(event_loop, data),
Msg::Timer{msg, ..} => self.handle_timer(event_loop, msg),
_ => panic!("unexpected msg"),
}
}
Expand Down