Skip to content

Commit

Permalink
server: reuse recv buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Jul 22, 2016
1 parent 5eefcae commit 38df4fb
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 87 deletions.
114 changes: 50 additions & 64 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
// limitations under the License.

use std::cmp;
use std::io::Read;

use mio::{Token, EventLoop, EventSet, PollOpt};
use mio::tcp::TcpStream;
use bytes::{MutBuf, MutByteBuf};
use protobuf::Message as PbMessage;

use kvproto::msgpb::Message;
Expand All @@ -27,7 +27,7 @@ use super::transport::RaftStoreRouter;
use super::resolve::StoreAddrResolver;
use super::snap::Task as SnapTask;
use util::worker::Scheduler;
use util::buf::{TryRead, create_mem_buf, PipeBuffer};
use util::buf::PipeBuffer;


#[derive(PartialEq)]
Expand All @@ -39,6 +39,7 @@ enum ConnType {

const SNAPSHOT_PAYLOAD_BUF: usize = 4 * 1024 * 1024;
const DEFAULT_SEND_BUFFER_SIZE: usize = 8 * 1024;
const DEFAULT_RECV_BUFFER_SIZE: usize = 8 * 1024;

pub struct Conn {
pub sock: TcpStream,
Expand All @@ -52,31 +53,15 @@ pub struct Conn {
pub store_id: Option<u64>,

// message header
last_msg_id: u64,
header: MutByteBuf,
// message
payload: Option<MutByteBuf>,
last_msg_id: Option<u64>,

file_size: usize,
expect_size: usize,
read_size: usize,

snap_scheduler: Scheduler<SnapTask>,

send_buffer: PipeBuffer,
}

fn try_read_data<T: TryRead, B: MutBuf>(r: &mut T, buf: &mut B) -> Result<()> {
if buf.remaining() == 0 {
return Ok(());
}

if let Some(n) = try!(r.try_read_buf(buf)) {
if n == 0 {
// 0 means remote has closed the socket.
return Err(box_err!("remote has closed the connection"));
}
}

Ok(())
recv_buffer: Option<PipeBuffer>,
}

impl Conn {
Expand All @@ -90,15 +75,14 @@ impl Conn {
token: token,
interest: EventSet::readable() | EventSet::hup(),
conn_type: ConnType::Handshake,
header: create_mem_buf(rpc::MSG_HEADER_LEN),
read_size: 0,
file_size: 0,
payload: None,
last_msg_id: 0,
expect_size: 0,
last_msg_id: None,
snap_scheduler: snap_scheduler,
store_id: store_id,
// TODO: Maybe we should need max size to shrink later.
send_buffer: PipeBuffer::new(DEFAULT_SEND_BUFFER_SIZE),
recv_buffer: Some(PipeBuffer::new(DEFAULT_RECV_BUFFER_SIZE)),
}
}

Expand Down Expand Up @@ -150,8 +134,10 @@ impl Conn {
let mut snap_data = RaftSnapshotData::new();
try!(snap_data.merge_from_bytes(
data.msg.get_raft().get_message().get_snapshot().get_data()));
self.file_size = snap_data.get_file_size() as usize;
self.payload = Some(create_mem_buf(cmp::min(SNAPSHOT_PAYLOAD_BUF, self.file_size)));
self.expect_size = snap_data.get_file_size() as usize;
let expect_cap = cmp::min(SNAPSHOT_PAYLOAD_BUF, self.expect_size);
// no need to shrink, the connection will be closed soon.
self.recv_buffer.as_mut().unwrap().ensure(expect_cap);

let register_task = SnapTask::Register(self.token, data.msg.take_raft());
box_try!(self.snap_scheduler.schedule(register_task));
Expand All @@ -168,68 +154,68 @@ impl Conn {
S: StoreAddrResolver
{
// all content should be read, ignore any read operation.
if self.payload.is_none() {
if self.recv_buffer.is_none() {
return Ok(());
}
// TODO: limit rate
while try!(self.read_payload()) {
let payload = self.payload.take().unwrap();
let cap = payload.capacity();
self.read_size += cap;
loop {
{
let recv_buffer = self.recv_buffer.as_mut().unwrap();
try!(recv_buffer.read_from(&mut self.sock));
// if the snapshot is too small, the default buffer may be not filled.
if !recv_buffer.is_full() && recv_buffer.len() != self.expect_size {
break;
}
}
let recv_buffer = self.recv_buffer.take().unwrap();
let len = recv_buffer.len();
self.read_size += len;

let task = SnapTask::Write(self.token, payload.flip());
let task = SnapTask::Write(self.token, recv_buffer);
box_try!(self.snap_scheduler.schedule(task));

if self.read_size == self.file_size {
if self.read_size == self.expect_size {
// last chunk
box_try!(self.snap_scheduler.schedule(SnapTask::Close(self.token)));
// let snap_scheduler to close the connection.
break;
} else if self.read_size + cap >= self.file_size {
self.payload = Some(create_mem_buf(self.file_size - self.read_size))
} else if self.read_size + len >= self.expect_size {
self.recv_buffer = Some(PipeBuffer::new(self.expect_size - self.read_size));
} else {
self.payload = Some(create_mem_buf(cap))
};
self.recv_buffer = Some(PipeBuffer::new(len));
}
}
Ok(())
}

fn read_payload(&mut self) -> Result<bool> {
let payload = self.payload.as_mut().unwrap();
try!(try_read_data(&mut self.sock, payload));
let ret = payload.remaining() == 0;
Ok(ret)
}

fn read_one_message(&mut self) -> Result<Option<ConnData>> {
if self.payload.is_none() {
try!(try_read_data(&mut self.sock, &mut self.header));
if self.header.remaining() > 0 {
let recv_buffer = self.recv_buffer.as_mut().unwrap();
if self.last_msg_id.is_none() {
recv_buffer.ensure(rpc::MSG_HEADER_LEN);
if recv_buffer.len() < rpc::MSG_HEADER_LEN {
try!(recv_buffer.read_from(&mut self.sock));
}
if recv_buffer.len() < rpc::MSG_HEADER_LEN {
// we need to read more data for header
return Ok(None);
}

// we have already read whole header, parse it and begin to read payload.
let (msg_id, payload_len) = try!(rpc::decode_msg_header(self.header
.bytes()));
self.last_msg_id = msg_id;
self.payload = Some(create_mem_buf(payload_len));
let (msg_id, payload_len) = try!(rpc::decode_msg_header(recv_buffer));
self.last_msg_id = Some(msg_id);
self.expect_size = payload_len;
}

// payload here can't be None.
let mut payload = self.payload.take().unwrap();
try!(try_read_data(&mut self.sock, &mut payload));
if payload.remaining() > 0 {
recv_buffer.ensure(self.expect_size);
try!(recv_buffer.read_from(&mut self.sock));
if recv_buffer.len() < self.expect_size {
// we need to read more data for payload
self.payload = Some(payload);
return Ok(None);
}

let mut msg = Message::new();
try!(rpc::decode_body(payload.bytes(), &mut msg));
self.header.clear();
try!(rpc::decode_body(&mut recv_buffer.take(self.expect_size as u64), &mut msg));
let msg_id = self.last_msg_id.unwrap();
self.last_msg_id = None;
Ok(Some(ConnData {
msg_id: self.last_msg_id,
msg_id: msg_id,
msg: msg,
}))
}
Expand Down
33 changes: 23 additions & 10 deletions src/server/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,30 @@ use std::fmt::{self, Formatter, Display};
use std::io;
use std::fs::File;
use std::net::{SocketAddr, TcpStream};
use std::io::{Read, Write};
use std::io::Read;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::boxed::FnBox;
use std::sync::{Arc, RwLock};
use std::time::Instant;
use std::time::{Instant, Duration};
use threadpool::ThreadPool;
use mio::Token;
use bytes::{Buf, ByteBuf};

use super::{Result, ConnData, SendCh, Msg};
use super::transport::RaftStoreRouter;
use raftstore::store::{SnapFile, SnapManager, SnapKey, SnapEntry};
use util::worker::Runnable;
use util::codec::rpc;
use util::buf::PipeBuffer;
use util::HandyRwLock;

use kvproto::raft_serverpb::RaftMessage;

pub type Callback = Box<FnBox(Result<()>) + Send>;

const DEFAULT_SENDER_POOL_SIZE: usize = 3;
const DEFAULT_READ_TIMEOUT: u64 = 30;
const DEFAULT_WRITE_TIMEOUT: u64 = 30;

/// `Task` that `Runner` can handle.
///
Expand All @@ -46,7 +49,7 @@ const DEFAULT_SENDER_POOL_SIZE: usize = 3;
/// `SendTo` send the snapshot file to specified address.
pub enum Task {
Register(Token, RaftMessage),
Write(Token, ByteBuf),
Write(Token, PipeBuffer),
Close(Token),
Discard(Token),
SendTo {
Expand Down Expand Up @@ -92,6 +95,8 @@ fn send_snap(mgr: SnapManager, addr: SocketAddr, data: ConnData) -> Result<()> {
let mut f = try!(File::open(snap_file.path()));
let mut conn = try!(TcpStream::connect(&addr));
try!(conn.set_nodelay(true));
try!(conn.set_read_timeout(Some(Duration::from_secs(DEFAULT_READ_TIMEOUT))));
try!(conn.set_write_timeout(Some(Duration::from_secs(DEFAULT_WRITE_TIMEOUT))));

let res = rpc::encode_msg(&mut conn, data.msg_id, &data.msg)
.and_then(|_| io::copy(&mut f, &mut conn).map_err(From::from))
Expand Down Expand Up @@ -158,14 +163,22 @@ impl<R: RaftStoreRouter + 'static> Runnable<Task> for Runner<R> {
Err(e) => error!("failed to create snap file for {:?}: {:?}", token, e),
}
}
Task::Write(token, data) => {
match self.files.get_mut(&token) {
Some(&mut (ref mut writer, _)) => {
if let Err(e) = writer.write_all(Buf::bytes(&data)) {
error!("failed to write data to {:?}: {:?}", token, e);
Task::Write(token, mut data) => {
let mut should_close = false;
match self.files.entry(token) {
Entry::Occupied(mut e) => {
if let Err(err) = data.write_all_to(&mut e.get_mut().0) {
error!("failed to write data to {:?}: {:?}", token, err);
let (_, msg) = e.remove();
let key = SnapKey::from_snap(msg.get_message().get_snapshot()).unwrap();
self.snap_mgr.wl().deregister(&key, &SnapEntry::Receiving);
should_close = true;
}
}
None => error!("invalid snap token {:?}", token),
Entry::Vacant(_) => error!("invalid snap token {:?}", token),
}
if should_close {
self.close(token);
}
}
Task::Close(token) => {
Expand Down
30 changes: 17 additions & 13 deletions src/util/codec/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
// all use bigendian.
// Now the version is always 1.
// Payload can be any arbitrary data, but we use Protobuf in our program default.
use std::io;
use std::io::{self, BufRead};
use std::vec::Vec;

use byteorder::{ByteOrder, BigEndian};
use protobuf;
use byteorder::{ByteOrder, BigEndian, ReadBytesExt};
use protobuf::{self, CodedInputStream};

use super::{Result, Error};

Expand Down Expand Up @@ -52,7 +52,8 @@ pub fn encode_msg<T: io::Write, M: protobuf::Message + ?Sized>(w: &mut T,
// Decodes encoded message, returns message ID.
pub fn decode_msg<T: io::Read, M: protobuf::Message>(r: &mut T, m: &mut M) -> Result<u64> {
let (message_id, payload) = try!(decode_data(r));
try!(decode_body(&payload, m));
let mut reader = payload.as_slice();
try!(decode_body(&mut reader, m));

Ok(message_id)
}
Expand Down Expand Up @@ -83,38 +84,40 @@ pub fn encode_msg_header(msg_id: u64, payload_len: usize) -> Vec<u8> {
pub fn decode_data<T: io::Read>(r: &mut T) -> Result<(u64, Vec<u8>)> {
let mut header = vec![0;MSG_HEADER_LEN];
try!(r.read_exact(&mut header));
let (msg_id, payload_len) = try!(decode_msg_header(&header));
let mut reader = header.as_slice();
let (msg_id, payload_len) = try!(decode_msg_header(&mut reader));
let mut payload = vec![0;payload_len];
try!(r.read_exact(&mut payload));

Ok((msg_id, payload))
}

// Decodes msg header in header buffer, the buffer length size must be equal MSG_HEADER_LEN;
pub fn decode_msg_header(header: &[u8]) -> Result<(u64, usize)> {
let magic = BigEndian::read_u16(&header[0..2]);
pub fn decode_msg_header<R: io::Read>(header: &mut R) -> Result<(u64, usize)> {
let magic = try!(header.read_u16::<BigEndian>());
if MSG_MAGIC != magic {
return Err(other_err(format!("invalid magic {}, not {}", magic, MSG_MAGIC)));
}

let version = BigEndian::read_u16(&header[2..4]);
let version = try!(header.read_u16::<BigEndian>());
if MSG_VERSION_V1 != version {
return Err(other_err(format!("unsupported version {}, we need {} now",
version,
MSG_VERSION_V1)));
}

let payload_len = BigEndian::read_u32(&header[4..8]) as usize;
let payload_len = try!(header.read_u32::<BigEndian>()) as usize;
// TODO: check max payload

let message_id = BigEndian::read_u64(&header[8..16]);
let message_id = try!(header.read_u64::<BigEndian>());

Ok((message_id, payload_len))
}

// Decodes only body.
pub fn decode_body<M: protobuf::Message>(payload: &[u8], m: &mut M) -> Result<()> {
try!(m.merge_from_bytes(&payload));
pub fn decode_body<R: BufRead, M: protobuf::Message>(payload: &mut R, m: &mut M) -> Result<()> {
let mut is = CodedInputStream::from_buffered_reader(payload);
try!(m.merge_from(&mut is));
Ok(())
}

Expand Down Expand Up @@ -154,7 +157,8 @@ mod tests {
#[test]
fn test_header_codec() {
let m1 = encode_msg_header(1, 1);
let (msg_id, payload_len) = decode_msg_header(&m1).unwrap();
let mut m1_r = m1.as_slice();
let (msg_id, payload_len) = decode_msg_header(&mut m1_r).unwrap();
assert_eq!(msg_id, 1);
assert_eq!(payload_len, 1);
}
Expand Down

0 comments on commit 38df4fb

Please sign in to comment.