Skip to content

Commit

Permalink
Add e2e test max_connections, add accept timeout on e2e sync_backend
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Jun 5, 2023
1 parent b8fb52e commit cd55235
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 21 deletions.
19 changes: 19 additions & 0 deletions e2e/src/mock/sync_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use std::{
collections::HashMap,
io::{Read, Write},
net::{SocketAddr, TcpListener, TcpStream},
os::fd::{FromRawFd, IntoRawFd},
str::from_utf8,
time::Duration,
};

use libc::setsockopt;

use crate::BUFFER_SIZE;

/// A mock backend whose actions are all synchronous (accepting, receiving, responding...)
Expand Down Expand Up @@ -42,6 +45,22 @@ impl Backend {
/// Binds itself to its address, stores the yielded TCP listener
pub fn connect(&mut self) {
let listener = TcpListener::bind(self.address).expect("could not bind");
let timeout = Duration::from_millis(100);
let timeout = libc::timeval {
tv_sec: timeout.as_secs() as i64,
tv_usec: timeout.subsec_micros() as i64,
};
let listener = unsafe {
let fd = listener.into_raw_fd();
setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_RCVTIMEO,
&timeout as *const libc::timeval as *const _,
std::mem::size_of::<libc::timeval>() as libc::socklen_t,
);
TcpListener::from_raw_fd(fd)
};
self.listener = Some(listener);
self.clients = HashMap::new();
}
Expand Down
135 changes: 135 additions & 0 deletions e2e/src/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,132 @@ pub fn try_stick() -> State {
State::Success
}

fn try_max_connections() -> State {
let front_address = "127.0.0.1:2001"
.parse()
.expect("could not parse front address");

let (mut config, listeners, state) = Worker::empty_config();
config.max_connections = 15;
let (mut worker, mut backends) =
setup_sync_test("MAXCONN", config, listeners, state, front_address, 1, false);

let mut backend = backends.pop().unwrap();
backend.connect();
let expected_response_start = String::from("HTTP/1.1 200 OK\r\nContent-Length: 5");

let mut clients = Vec::new();
for i in 0..20 {
let mut client = Client::new(
format!("client{i}"),
front_address,
http_request("GET", "/api", format!("ping{i}"), "localhost"),
);
client.connect();
client.send();
if backend.accept(i) {
assert!(i < 15);
let request = backend.receive(i);
println!("request {i}: {request:?}");
backend.send(i);
} else {
assert!(i >= 15);
}
let response = client.receive();
println!("response {i}: {response:?}");
if i < 15 {
assert!(response.unwrap().starts_with(&expected_response_start));
} else {
assert_eq!(response, None);
}
clients.push(client);
}

for i in 0..20 {
let client = &mut clients[i];
if i < 15 {
client.send();
let request = backend.receive(i);
println!("request {i}: {request:?}");
backend.send(i);
let response = client.receive();
println!("response {i}: {response:?}");
assert!(client.is_connected());
assert!(response.unwrap().starts_with(&expected_response_start));
} else {
// assert!(!client.is_connected());
}
}

for i in 0..5 {
let new_client = &mut clients[15 + i];
new_client.set_request(format!(
"GET /api-{i} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
));
new_client.connect();
new_client.send();

let client = &mut clients[i];
client.set_request("GET /api HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
client.send();
let request = backend.receive(i);
println!("request {i}: {request:?}");
backend.send(i);
let response = client.receive();
println!("response {i}: {response:?}");
assert!(!client.is_connected());
assert!(response.unwrap().starts_with(&expected_response_start));

if backend.accept(15 + i) {
assert!(i >= 2);
} else {
assert!(i < 2);
}
}

assert!(!backend.accept(100));

for i in 15..20 {
let request = backend.receive(i);
backend.send(i);
println!("request {i}: {request:?}");
}
for i in 15..20 {
let client = &mut clients[i];
client.is_connected();
let response = client.receive();
println!("response {i}: {response:?}");
client.is_connected();
// assert!(response.unwrap().starts_with(&expected_response_start));
}

for i in 15..20 {
let client = &mut clients[i];
client.is_connected();
let response = client.receive();
println!("response: {response:?}");
}

worker.hard_stop();
worker.wait_for_server_stop();

for client in clients {
println!(
"{} sent: {}, received: {}",
client.name, client.requests_sent, client.responses_received
);
}
println!(
"{} sent: {}, received: {}",
backend.name, backend.responses_sent, backend.requests_received
);

assert_eq!(backend.requests_received, 38);
assert_eq!(backend.responses_sent, 38);

State::Success
}

#[serial]
#[test]
fn test_sync() {
Expand Down Expand Up @@ -1368,3 +1494,12 @@ fn test_stick() {
State::Success
);
}

#[serial]
#[test]
fn test_max_connections() {
assert_eq!(
repeat_until_error_or(2, "Max connections reached", try_max_connections),
State::Success
);
}
2 changes: 1 addition & 1 deletion lib/src/protocol/kawa_h1/editor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl HttpContext {
header.val = kawa::Store::from_string(format!(
"{}, {}",
unsafe { from_utf8_unchecked(header.val.data(buf)) },
peer_ip.to_string()
peer_ip
));
}
if let Some(header) = &mut forwarded {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
let was_initial = self.request_stream.is_initial();

kawa::h1::parse(&mut self.request_stream, &mut self.context);
kawa::debug_kawa(&self.request_stream);
// kawa::debug_kawa(&self.request_stream);

if was_initial && !self.request_stream.is_initial() {
gauge_add!("http.active_requests", 1);
Expand Down Expand Up @@ -630,7 +630,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
let was_initial = self.response_stream.is_initial();

kawa::h1::parse(&mut self.response_stream, &mut self.context);
kawa::debug_kawa(&self.response_stream);
// kawa::debug_kawa(&self.response_stream);

if self.response_stream.is_error() {
incr!("http.backend_parse_errors");
Expand Down
25 changes: 8 additions & 17 deletions lib/src/protocol/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
session
}

fn debug_tokens(&self) -> Option<(Token, Token)> {
if let Some(back) = self.backend_token {
return Some((self.frontend_token, back));
}
None
}

pub fn front_socket(&self) -> &TcpStream {
self.frontend.socket_ref()
}
Expand Down Expand Up @@ -541,16 +534,14 @@ impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
metrics.bout += sz;
}

if let Some((front, back)) = self.debug_tokens() {
debug!(
"{}\tFRONT [{}<-{}]: wrote {} bytes of {}",
self.log_ctx,
front.0,
back.0,
sz,
self.backend_buffer.available_data()
);
}
debug!(
"{}\tFRONT [{}<-{:?}]: wrote {} bytes of {}",
self.log_ctx,
self.frontend_token.0,
self.backend_token,
sz,
self.backend_buffer.available_data()
);

match res {
SocketResult::Error => {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl SocketHandler for FrontRustls {
let mut socket_state = SocketResult::Continue;
let mut size;
for buf in bufs {
(size, socket_state) = self.socket_write(&buf);
(size, socket_state) = self.socket_write(buf);
total_size += size;
if socket_state != SocketResult::Continue {
break;
Expand Down

0 comments on commit cd55235

Please sign in to comment.