Skip to content

Commit

Permalink
Implement buffer limits
Browse files Browse the repository at this point in the history
- ChunkVecBuffer now has a limit, by default 0 (no limit)
- The pending-plaintext and pending-records buffers can have their
  respective limits set.
- There are tests for these limits pre- and post-handshake.
  • Loading branch information
ctz committed Apr 30, 2017
1 parent bf41f9e commit 1a1904e
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 15 deletions.
11 changes: 9 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@ impl ClientSessionImpl {
!self.common.traffic
}

pub fn set_buffer_limit(&mut self, len: usize) {
self.common.set_buffer_limit(len)
}

pub fn process_msg(&mut self, mut msg: Message) -> Result<(), TLSError> {
// Decrypt if demanded by current state.
if self.common.peer_encrypting {
Expand Down Expand Up @@ -536,6 +540,10 @@ impl Session for ClientSession {
self.imp.is_handshaking()
}

fn set_buffer_limit(&mut self, len: usize) {
self.imp.set_buffer_limit(len)
}

fn send_close_notify(&mut self) {
self.imp.common.send_close_notify()
}
Expand Down Expand Up @@ -573,8 +581,7 @@ impl io::Write for ClientSession {
/// writing much data before it can be sent will
/// cause excess memory usage.
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.imp.common.send_plain(buf);
Ok(buf.len())
self.imp.common.send_some_plaintext(buf)
}

fn flush(&mut self) -> io::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion src/client_hs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ static EXPECT_TLS13_CERTIFICATE: State = State {
handle: handle_certificate_tls13,
};

fn handle_certificate_tls12(sess: &mut ClientSessionImpl, m: Message) -> StateResult {
fn handle_certificate_tls12(sess: &mut ClientSessionImpl, m: Message) -> StateResult {
let cert_chain = extract_handshake!(m, HandshakePayload::Certificate).unwrap();
sess.handshake_data.transcript.add_message(&m);

Expand Down
11 changes: 9 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ impl ServerSessionImpl {
!self.common.traffic
}

pub fn set_buffer_limit(&mut self, len: usize) {
self.common.set_buffer_limit(len)
}

pub fn process_msg(&mut self, mut msg: Message) -> Result<(), TLSError> {
// Decrypt if demanded by current state.
if self.common.peer_encrypting {
Expand Down Expand Up @@ -541,6 +545,10 @@ impl Session for ServerSession {
self.imp.is_handshaking()
}

fn set_buffer_limit(&mut self, len: usize) {
self.imp.set_buffer_limit(len)
}

fn send_close_notify(&mut self) {
self.imp.common.send_close_notify()
}
Expand Down Expand Up @@ -578,8 +586,7 @@ impl io::Write for ServerSession {
/// writing much data before it can be sent will
/// cause excess memory usage.
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.imp.common.send_plain(buf);
Ok(buf.len())
self.imp.common.send_some_plaintext(buf)
}

fn flush(&mut self) -> io::Result<()> {
Expand Down
58 changes: 50 additions & 8 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ pub trait Session: Read + Write + Send {
/// session is buffered in memory.
fn is_handshaking(&self) -> bool;

/// Sets a limit on the internal buffers used to buffer
/// unsent plaintext (prior to completing the TLS handshake)
/// and unsent TLS records.
///
/// By default, there is no limit. The limit can be set
/// at any time, even if the current buffer use is higher.
fn set_buffer_limit(&mut self, limit: usize);

/// Queues a close_notify fatal alert to be sent in the next
/// `write_tls` call. This informs the peer that the
/// connection is being closed.
Expand Down Expand Up @@ -232,6 +240,11 @@ impl SessionSecrets {
static SEQ_SOFT_LIMIT: u64 = 0xffff_ffff_ffff_0000u64;
static SEQ_HARD_LIMIT: u64 = 0xffff_ffff_ffff_fffeu64;

enum Limit {
Yes,
No
}

pub struct SessionCommon {
pub negotiated_version: Option<ProtocolVersion>,
pub is_client: bool,
Expand Down Expand Up @@ -324,6 +337,11 @@ impl SessionCommon {
!self.received_plaintext.is_empty()
}

pub fn set_buffer_limit(&mut self, limit: usize) {
self.sendable_plaintext.set_limit(limit);
self.sendable_tls.set_limit(limit);
}

pub fn encrypt_outgoing(&mut self, plain: BorrowMessage) -> Message {
let seq = self.write_seq;
self.write_seq += 1;
Expand Down Expand Up @@ -411,20 +429,32 @@ impl SessionCommon {

/// Like send_msg_encrypt, but operate on an appdata directly.
fn send_appdata_encrypt(&mut self,
payload: &[u8]) {
payload: &[u8],
limit: Limit) -> usize {
if self.want_write_key_update {
self.do_write_key_update();
}

// Here, the limit on sendable_tls applies to encrypted data,
// but we're respecting it for plaintext data -- so we'll
// be out by whatever the cipher+record overhead is. That's a
// constant and predictable amount, so it's not a terrible issue.
let len = match limit {
Limit::Yes => self.sendable_tls.apply_limit(payload.len()),
Limit::No => payload.len()
};

let mut plain_messages = VecDeque::new();
self.message_fragmenter.fragment_borrow(ContentType::ApplicationData,
ProtocolVersion::TLSv1_2,
payload,
&payload[..len],
&mut plain_messages);

for m in plain_messages {
self.send_single_fragment(m);
}

len
}

fn send_single_fragment(&mut self, m: BorrowMessage) {
Expand Down Expand Up @@ -464,22 +494,33 @@ impl SessionCommon {

/// Send plaintext application data, fragmenting and
/// encrypting it as it goes out.
pub fn send_plain(&mut self, data: &[u8]) {
///
/// If internal buffers are too small, this function will not accept
/// all the data.
pub fn send_some_plaintext(&mut self, data: &[u8]) -> io::Result<usize> {
self.send_plain(data, Limit::Yes)
}


fn send_plain(&mut self, data: &[u8], limit: Limit) -> io::Result<usize> {
if !self.traffic {
// If we haven't completed handshaking, buffer
// plaintext to send once we do.
self.sendable_plaintext.append(data.to_vec());
return;
let len = match limit {
Limit::Yes => self.sendable_plaintext.append_limited_copy(data),
Limit::No => self.sendable_plaintext.append(data.to_vec())
};
return Ok(len);
}

debug_assert!(self.we_encrypting);

if data.len() == 0 {
// Don't send empty fragments.
return;
return Ok(0);
}

self.send_appdata_encrypt(data);
Ok(self.send_appdata_encrypt(data, limit))
}

pub fn start_traffic(&mut self) {
Expand All @@ -496,7 +537,8 @@ impl SessionCommon {

while !self.sendable_plaintext.is_empty() {
let buf = self.sendable_plaintext.take_one();
self.send_plain(&buf);
self.send_plain(&buf, Limit::No)
.unwrap();
}
}

Expand Down
78 changes: 76 additions & 2 deletions src/vecbuf.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,85 @@
use std::io::Read;
use std::io;
use std::cmp;

/// This is a byte buffer that is built from a vector
/// of byte vectors. This avoids extra copies when
/// appending a new byte vector, at the expense of
/// more complexity when reading out.
pub struct ChunkVecBuffer {
chunks: Vec<Vec<u8>>,
limit: usize,
}

impl ChunkVecBuffer {
pub fn new() -> ChunkVecBuffer {
ChunkVecBuffer { chunks: Vec::new() }
ChunkVecBuffer { chunks: Vec::new(), limit: 0 }
}

/// Sets the upper limit on how many bytes this
/// object can store.
///
/// Setting a lower limit than the currently stored
/// data is not an error.
///
/// A zero limit is interpreted as no limit.
pub fn set_limit(&mut self, new_limit: usize) {
self.limit = new_limit;
}

/// If we're empty
pub fn is_empty(&self) -> bool {
self.chunks.is_empty()
}

pub fn append(&mut self, bytes: Vec<u8>) {
/// How many bytes we're storing
pub fn len(&self) -> usize {
let mut len = 0;
for ch in &self.chunks {
len += ch.len();
}
len
}

/// For a proposed append of `len` bytes, how many
/// bytes should we actually append to adhere to the
/// currently set `limit`?
pub fn apply_limit(&self, len: usize) -> usize {
if self.limit == 0 {
len
} else {
let space =self.limit.saturating_sub(self.len());
cmp::min(len, space)
}
}

/// Append a copy of `bytes`, perhaps a prefix if
/// we're near the limit.
pub fn append_limited_copy(&mut self, bytes: &[u8]) -> usize {
let take = self.apply_limit(bytes.len());
self.append(bytes[..take].to_vec());
take
}

/// Take and append the given `bytes`.
pub fn append(&mut self, bytes: Vec<u8>) -> usize {
let len = bytes.len();

if !bytes.is_empty() {
self.chunks.push(bytes);
}

len
}

/// Take one of the chunks from this object. This
/// function panics if the object `is_empty`.
pub fn take_one(&mut self) -> Vec<u8> {
self.chunks.remove(0)
}

/// Read data out of this object, writing it into `buf`
/// and returning how many bytes were written there.
pub fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut offs = 0;

Expand All @@ -46,6 +98,7 @@ impl ChunkVecBuffer {
Ok(offs)
}

/// Read data of this object, passing it `wr`
pub fn write_to(&mut self, wr: &mut io::Write) -> io::Result<usize> {
// would desperately like writev support here!
if self.is_empty() {
Expand All @@ -63,3 +116,24 @@ impl ChunkVecBuffer {
Ok(used)
}
}

#[cfg(test)]
mod test {
use super::ChunkVecBuffer;

#[test]
fn short_append_copy_with_limit()
{
let mut cvb = ChunkVecBuffer::new();
cvb.set_limit(12);
assert_eq!(cvb.append_limited_copy(b"hello"), 5);
assert_eq!(cvb.append_limited_copy(b"world"), 5);
assert_eq!(cvb.append_limited_copy(b"hello"), 2);
assert_eq!(cvb.append_limited_copy(b"world"), 0);

let mut buf = [0u8; 12];
assert_eq!(cvb.read(&mut buf).unwrap(), 12);
assert_eq!(buf.to_vec(),
b"helloworldhe".to_vec());
}
}
69 changes: 69 additions & 0 deletions tests/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,72 @@ fn client_is_send() {
let client = ClientSession::new(&Arc::new(client_config), "localhost");
&client as &Send;
}

#[test]
fn server_respects_buffer_limit_pre_handshake() {
let mut client = ClientSession::new(&Arc::new(make_client_config()), "localhost");
let mut server = ServerSession::new(&Arc::new(make_server_config()));

server.set_buffer_limit(32);

assert_eq!(server.write(b"01234567890123456789").unwrap(), 20);
assert_eq!(server.write(b"01234567890123456789").unwrap(), 12);

do_handshake(&mut client, &mut server);
transfer(&mut server, &mut client);
client.process_new_packets().unwrap();

check_read(&mut client, b"01234567890123456789012345678901");
}

#[test]
fn server_respects_buffer_limit_post_handshake() {
let mut client = ClientSession::new(&Arc::new(make_client_config()), "localhost");
let mut server = ServerSession::new(&Arc::new(make_server_config()));

// this test will vary in behaviour depending on the default suites
do_handshake(&mut client, &mut server);
server.set_buffer_limit(48);

assert_eq!(server.write(b"01234567890123456789").unwrap(), 20);
assert_eq!(server.write(b"01234567890123456789").unwrap(), 6);

transfer(&mut server, &mut client);
client.process_new_packets().unwrap();

check_read(&mut client, b"01234567890123456789012345");
}

#[test]
fn client_respects_buffer_limit_pre_handshake() {
let mut client = ClientSession::new(&Arc::new(make_client_config()), "localhost");
let mut server = ServerSession::new(&Arc::new(make_server_config()));

client.set_buffer_limit(32);

assert_eq!(client.write(b"01234567890123456789").unwrap(), 20);
assert_eq!(client.write(b"01234567890123456789").unwrap(), 12);

do_handshake(&mut client, &mut server);
transfer(&mut client, &mut server);
server.process_new_packets().unwrap();

check_read(&mut server, b"01234567890123456789012345678901");
}

#[test]
fn client_respects_buffer_limit_post_handshake() {
let mut client = ClientSession::new(&Arc::new(make_client_config()), "localhost");
let mut server = ServerSession::new(&Arc::new(make_server_config()));

do_handshake(&mut client, &mut server);
client.set_buffer_limit(48);

assert_eq!(client.write(b"01234567890123456789").unwrap(), 20);
assert_eq!(client.write(b"01234567890123456789").unwrap(), 6);

transfer(&mut client, &mut server);
server.process_new_packets().unwrap();

check_read(&mut server, b"01234567890123456789012345");
}

0 comments on commit 1a1904e

Please sign in to comment.