Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

Commit

Permalink
fixed encoding panic due to a fixed sized buffer (issue #20 in tender…
Browse files Browse the repository at this point in the history
…mint/rust-tsp)

* added a mock stream to test the code and wrote a test to isolate the issue

* regenerated protobuf code with the latest rust-protobuf crate

* fixed the panic issue by checking the needed space and reserving more (if needed) before writing into the buffer
  • Loading branch information
Tomas Tauber authored and melekes committed Nov 30, 2018
1 parent e7d4d70 commit dde7566
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 9 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ include = ["src/**/*", "Cargo.toml"]

[dependencies]
bytes = "0.4"
protobuf = "2.0.1"
byteorder = "1.2.4"
protobuf = "2.2.0"
byteorder = "1.2.7"
integer-encoding = "1.0.5"
mockstream = "0.0.3"

[build-dependencies]
protoc = "1.4.1"
protobuf-codegen-pure = "2.0.1"
protoc = "2.2.0"
protobuf-codegen-pure = "2.2.0"
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! Here's a simple example that communicates with Tendermint. Defaults callbacks are handled by
//! the Trait. The app doesn't do any actual processing on a transaction.
//!
//! ```
//! ```rust,no_run
//! struct EmptyApp;
//!
//! impl abci::Application for EmptyApp {}
Expand All @@ -24,6 +24,7 @@ use std::net::SocketAddr;
extern crate bytes;
extern crate integer_encoding;
extern crate protobuf;
extern crate mockstream;

pub mod common;
mod server;
Expand Down
81 changes: 77 additions & 4 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,50 @@ use integer_encoding::VarInt;
use protobuf;
use protobuf::Message;

use mockstream::SharedMockStream;
use std::fmt::{Formatter, Debug, self};

enum NetStream {
Mocked(SharedMockStream),
Tcp(TcpStream)
}

impl Debug for NetStream {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match *self {
NetStream::Mocked(ref s) => {
Ok(f.debug_struct("SharedMockStream").finish()?)
},
NetStream::Tcp(ref s) => s.fmt(f),
}
}
}

impl io::Read for NetStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match *self {
NetStream::Mocked(ref mut s) => s.read(buf),
NetStream::Tcp(ref mut s) => s.read(buf),
}
}
}

impl io::Write for NetStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match *self {
NetStream::Mocked(ref mut s) => s.write(buf),
NetStream::Tcp(ref mut s) => s.write(buf),
}
}

fn flush(&mut self) -> io::Result<()> {
match *self {
NetStream::Mocked(ref mut s) => s.flush(),
NetStream::Tcp(ref mut s) => s.flush(),
}
}
}

const BUFFER_SIZE: usize = 4096;

/// Creates the TCP server and listens for connections from Tendermint
Expand All @@ -31,7 +75,7 @@ where
match new_connection {
Ok(stream) => {
println!("Got connection! {:?}", stream);
thread::spawn(move || handle_stream(stream, &app_instance));
thread::spawn(move || handle_stream(NetStream::Tcp(stream), &app_instance));
}
Err(err) => {
// We need all 3 connections...
Expand All @@ -43,7 +87,7 @@ where
Ok(())
}

fn handle_stream<A>(mut stream: TcpStream, app: &Arc<Mutex<A>>)
fn handle_stream<A>(mut stream: NetStream, app: &Arc<Mutex<A>>)
where
A: Application + 'static + Send + Sync,
{
Expand All @@ -65,7 +109,7 @@ where
println!("Connection closed on {:?}", stream);
}

fn respond<A>(stream: &mut TcpStream, app: &mut A, request: &Request) -> io::Result<()>
fn respond<A>(stream: &mut NetStream, app: &mut A, request: &Request) -> io::Result<()>
where
A: Application + 'static + Send + Sync,
{
Expand Down Expand Up @@ -99,7 +143,7 @@ where
response.set_commit(app.commit(request.get_commit()));
response
} else if request.has_echo() {
let echo_msg = response.get_echo().get_message().to_string();
let echo_msg = request.get_echo().get_message().to_string();
response.set_echo({
let mut echo = ResponseEcho::new();
echo.set_message(echo_msg);
Expand Down Expand Up @@ -144,9 +188,38 @@ fn encode(msg: &Response, buf: &mut BytesMut) {
msg.write_to_vec(&mut msg_to_vec).unwrap();
let msg_len = msg_to_vec.len() as i64;
let varint = i64::encode_var_vec(msg_len);
let remaining = buf.remaining_mut();
let needed = msg_to_vec.len() + varint.len();
if remaining < needed {
buf.reserve(needed);
}

{
let mut writer = buf.writer();
writer.write_all(&varint).unwrap();
writer.write_all(&msg_to_vec).unwrap();
}
}

#[cfg(test)]
mod tests {
use super::*;

struct EmptyApp;
impl Application for EmptyApp {}

#[test]
fn respond_should_not_crash_over_4mb() {
let mut app = EmptyApp {};
let s = SharedMockStream::new();
let mut e = NetStream::Mocked(s.clone());
let mut r = Request::new();
let mut echo = RequestEcho::new();
let st = (0..2*BUFFER_SIZE).map(|_| "X").collect::<String>();
echo.set_message(st);

r.set_echo(echo);
let resp = respond(&mut e, &mut app, &r);
assert!(resp.is_ok())
}
}

0 comments on commit dde7566

Please sign in to comment.