Skip to content

Commit

Permalink
fixed encoding panic due to a fixed sized buffer (issue tendermint#20
Browse files Browse the repository at this point in the history
…in tendermint/rust-tsp)

* added a mock stream to test the code and wrote a test to isolate the issue

* regenerated protobuf code with the latest rust-protobuf crate

* fixed the panic issue by checking the needed space and reserving more (if needed) before writing into the buffer
  • Loading branch information
Tomas Tauber committed Nov 21, 2018
1 parent 90dad48 commit c15fd9a
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 93 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ include = ["src/**/*", "Cargo.toml"]

[dependencies]
bytes = "0.4"
protobuf = "2.0.1"
byteorder = "1.2.4"
protobuf = "2.2.0"
byteorder = "1.2.7"
integer-encoding = "1.0.5"
mockstream = "0.0.3"

[build-dependencies]
protoc = "1.4.1"
protobuf-codegen-pure = "2.0.1"
protoc = "2.2.0"
protobuf-codegen-pure = "2.2.0"
14 changes: 7 additions & 7 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// This file is generated by rust-protobuf 2.0.4. Do not edit
// This file is generated by rust-protobuf 2.2.0. Do not edit
// @generated

// https://github.com/Manishearth/rust-clippy/issues/702
Expand Down Expand Up @@ -27,8 +27,8 @@ pub struct KVPair {
pub key: ::std::vec::Vec<u8>,
pub value: ::std::vec::Vec<u8>,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}

impl KVPair {
Expand Down Expand Up @@ -232,8 +232,8 @@ pub struct KI64Pair {
pub key: ::std::vec::Vec<u8>,
pub value: i64,
// special fields
unknown_fields: ::protobuf::UnknownFields,
cached_size: ::protobuf::CachedSize,
pub unknown_fields: ::protobuf::UnknownFields,
pub cached_size: ::protobuf::CachedSize,
}

impl KI64Pair {
Expand Down Expand Up @@ -429,8 +429,8 @@ static file_descriptor_proto_data: &'static [u8] = b"\
on\",\n\x06KVPair\x12\x0f\n\x03key\x18\x01\x20\x01(\x0cB\x02\x18\0\x12\
\x11\n\x05value\x18\x02\x20\x01(\x0cB\x02\x18\0\".\n\x08KI64Pair\x12\x0f\
\n\x03key\x18\x01\x20\x01(\x0cB\x02\x18\0\x12\x11\n\x05value\x18\x02\x20\
\x01(\x03B\x02\x18\0B\x1c\xb8\xe2\x1e\x01\xd0\xe2\x1e\x01\xa8\xe2\x1e\
\x01\xf8\xe1\x1e\x01\xc8\xe2\x1e\x01\xe0\xe2\x1e\x01\xc0\xe3\x1e\x01b\
\x01(\x03B\x02\x18\0B\x1c\xe0\xe2\x1e\x01\xb8\xe2\x1e\x01\xa8\xe2\x1e\
\x01\xd0\xe2\x1e\x01\xf8\xe1\x1e\x01\xc0\xe3\x1e\x01\xc8\xe2\x1e\x01b\
\x06proto3\
";

Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! Here's a simple example that communicates with Tendermint. Defaults callbacks are handled by
//! the Trait. The app doesn't do any actual processing on a transaction.
//!
//! ```
//! ```rust,no_run
//! struct EmptyApp;
//!
//! impl abci::Application for EmptyApp {}
Expand All @@ -24,6 +24,7 @@ use std::net::SocketAddr;
extern crate bytes;
extern crate integer_encoding;
extern crate protobuf;
extern crate mockstream;

pub mod common;
mod server;
Expand Down
81 changes: 77 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,50 @@ use integer_encoding::VarInt;
use protobuf;
use protobuf::Message;

use mockstream::SharedMockStream;
use std::fmt::{Formatter, Debug, self};

enum NetStream {
Mocked(SharedMockStream),
Tcp(TcpStream)
}

impl Debug for NetStream {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match *self {
NetStream::Mocked(ref s) => {
Ok(f.debug_struct("SharedMockStream").finish()?)
},
NetStream::Tcp(ref s) => s.fmt(f),
}
}
}

impl io::Read for NetStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
NetStream::Mocked(ref mut s) => s.read(buf),
NetStream::Tcp(ref mut s) => s.read(buf),
}
}
}

impl io::Write for NetStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self {
NetStream::Mocked(ref mut s) => s.write(buf),
NetStream::Tcp(ref mut s) => s.write(buf),
}
}

fn flush(&mut self) -> io::Result<()> {
match *self {
NetStream::Mocked(ref mut s) => s.flush(),
NetStream::Tcp(ref mut s) => s.flush(),
}
}
}

const BUFFER_SIZE: usize = 4096;

/// Creates the TCP server and listens for connections from Tendermint
Expand All @@ -31,7 +75,7 @@ where
match new_connection {
Ok(stream) => {
println!("Got connection! {:?}", stream);
thread::spawn(move || handle_stream(stream, &app_instance));
thread::spawn(move || handle_stream(NetStream::Tcp(stream), &app_instance));
}
Err(err) => {
// We need all 3 connections...
Expand All @@ -43,7 +87,7 @@ where
Ok(())
}

fn handle_stream<A>(mut stream: TcpStream, app: &Arc<Mutex<A>>)
fn handle_stream<A>(mut stream: NetStream, app: &Arc<Mutex<A>>)
where
A: Application + 'static + Send + Sync,
{
Expand All @@ -65,7 +109,7 @@ where
println!("Connection closed on {:?}", stream);
}

fn respond<A>(stream: &mut TcpStream, app: &mut A, request: &Request) -> io::Result<()>
fn respond<A>(stream: &mut NetStream, app: &mut A, request: &Request) -> io::Result<()>
where
A: Application + 'static + Send + Sync,
{
Expand Down Expand Up @@ -99,7 +143,7 @@ where
response.set_commit(app.commit(request.get_commit()));
response
} else if request.has_echo() {
let echo_msg = response.get_echo().get_message().to_string();
let echo_msg = request.get_echo().get_message().to_string();
response.set_echo({
let mut echo = ResponseEcho::new();
echo.set_message(echo_msg);
Expand Down Expand Up @@ -144,9 +188,38 @@ fn encode(msg: &Response, buf: &mut BytesMut) {
msg.write_to_vec(&mut msg_to_vec).unwrap();
let msg_len = msg_to_vec.len() as i64;
let varint = i64::encode_var_vec(msg_len);
let remaining = buf.remaining_mut();
let needed = msg_to_vec.len() + varint.len();
if remaining < needed {
buf.reserve(needed);
}

{
let mut writer = buf.writer();
writer.write_all(&varint).unwrap();
writer.write_all(&msg_to_vec).unwrap();
}
}

#[cfg(test)]
mod tests {
use super::*;

struct EmptyApp;
impl Application for EmptyApp {}

#[test]
fn respond_should_not_crash_over_4mb() {
let mut app = EmptyApp {};
let s = SharedMockStream::new();
let mut e = NetStream::Mocked(s.clone());
let mut r = Request::new();
let mut echo = RequestEcho::new();
let st = (0..2*BUFFER_SIZE).map(|_| "X").collect::<String>();
echo.set_message(st);

r.set_echo(echo);
let resp = respond(&mut e, &mut app, &r);
assert!(resp.is_ok())
}
}
Loading

0 comments on commit c15fd9a

Please sign in to comment.