Skip to content

Commit

Permalink
Add socketstats unittest
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Wonshtrum committed Jun 5, 2023
1 parent 72f200b commit f6e011f
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 19 deletions.
2 changes: 1 addition & 1 deletion e2e/src/sozu/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub fn set_no_close_exec(fd: i32) {
unsafe {
let old_flags = libc::fcntl(fd, libc::F_GETFD);
let new_flags = old_flags & !1;
println!("flags: {old_flags} -> {new_flags}");
// println!("flags: {old_flags} -> {new_flags}");
libc::fcntl(fd, libc::F_SETFD, new_flags);
}
}
Expand Down
33 changes: 18 additions & 15 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
retry::RetryPolicy,
router::Route,
server::{push_event, CONN_RETRIES},
socket::{stat::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
sozu_command::ready::Ready,
timer::TimeoutContainer,
AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, L7ListenerHandler,
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

/// Reset the connection in case of keep-alive to be ready for the next request
pub fn reset(&mut self) {
println!("==============reset");
trace!("==============reset");
self.context.keep_alive_frontend = true;
self.context.keep_alive_backend = true;
self.context.sticky_session_found = None;
Expand All @@ -230,7 +230,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
println!("==============readable");
trace!("==============readable");
if !self.container_frontend_timeout.reset() {
error!(
"could not reset front timeout {:?}",
Expand Down Expand Up @@ -315,7 +315,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

pub fn readable_parse(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
println!("==============readable_parse");
trace!("==============readable_parse");
let was_initial = self.request_stream.is_initial();

kawa::h1::parse(&mut self.request_stream, &mut self.context);
Expand Down Expand Up @@ -354,15 +354,15 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

if was_initial {
if let kawa::StatusLine::Request { .. } = self.request_stream.detached.status_line {
println!("============== HANDLE CONNECTION!");
trace!("============== HANDLE CONNECTION!");
return StateResult::ConnectBackend;
}
}
StateResult::Continue
}

pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
println!("==============writable");
trace!("==============writable");
//handle default answers
if let SessionStatus::DefaultAnswer(_, _, _) = self.status {
return self.writable_default_answer(metrics);
Expand Down Expand Up @@ -426,11 +426,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

match self.response_stream.detached.status_line {
kawa::StatusLine::Response { code: 101, .. } => {
println!("============== HANDLE UPGRADE!");
trace!("============== HANDLE UPGRADE!");
return StateResult::Upgrade;
}
kawa::StatusLine::Response { code: 100, .. } => {
println!("============== HANDLE CONTINUE!");
trace!("============== HANDLE CONTINUE!");
self.response_stream.clear();
return StateResult::Continue;
}
Expand All @@ -441,9 +441,10 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
// with no keepalive on backend, we could open a new backend ConnectionError
// with no keepalive on front but keepalive on backend, we could have
// a pool of connections
println!(
trace!(
"============== HANDLE KEEP-ALIVE: {} {}",
self.context.keep_alive_frontend, self.context.keep_alive_backend,
self.context.keep_alive_frontend,
self.context.keep_alive_backend
);
return match (
self.context.keep_alive_frontend,
Expand All @@ -469,7 +470,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
println!("==============backend_writable");
trace!("==============backend_writable");
if let SessionStatus::DefaultAnswer(_, _, _) = self.status {
error!(
"{}\tsending default answer, should not write to back",
Expand Down Expand Up @@ -547,7 +548,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L

// Read content from cluster
pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
println!("==============backend_readable");
trace!("==============backend_readable");
if !self.container_backend_timeout.reset() {
error!(
"could not reset back timeout {:?}",
Expand Down Expand Up @@ -629,7 +630,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

pub fn backend_readable_parse(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
println!("==============backend_readable_parse");
trace!("==============backend_readable_parse");
let was_initial = self.response_stream.is_initial();

kawa::h1::parse(&mut self.response_stream, &mut self.context);
Expand Down Expand Up @@ -1138,9 +1139,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
.cluster_id_from_request(proxy.clone())
.with_context(|| "Could not get cluster id from request")?;

println!(
trace!(
"connect_to_backend: {:?} {:?} {:?}",
self.cluster_id, cluster_id, self.backend_connection_status
self.cluster_id,
cluster_id,
self.backend_connection_status
);
// check if we can reuse the backend connection
if (self.cluster_id.as_ref()) == Some(&cluster_id)
Expand Down
2 changes: 1 addition & 1 deletion lib/src/protocol/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
logs::{Endpoint, LogContext, RequestRecord},
pool::Checkout,
protocol::SessionState,
socket::{stat::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
sozu_command::ready::Ready,
timer::TimeoutContainer,
Backend, L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult,
Expand Down
15 changes: 14 additions & 1 deletion lib/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,14 @@ pub fn server_bind(addr: String) -> anyhow::Result<TcpListener> {
Ok(TcpListener::from_std(sock.into()))
}

pub mod stat {
/// Socket statistics
pub mod stats {
use std::os::fd::AsRawFd;
use time::Duration;

use internal::{TcpInfo, OPT_LEVEL, OPT_NAME};

/// Round trip time for a TCP socket
pub fn socket_rtt<A: AsRawFd>(socket: &A) -> Option<Duration> {
socket_info(socket.as_raw_fd()).map(|info| Duration::microseconds(info.rtt() as i64))
}
Expand Down Expand Up @@ -519,4 +521,15 @@ pub mod stat {
#[cfg(not(unix))]
#[derive(Clone, Debug)]
struct TcpInfo {}

#[test]
#[serial_test::serial]
fn test_rtt() {
let sock = std::net::TcpStream::connect("google.com:80").unwrap();
let fd = sock.as_raw_fd();
let info = socket_info(fd);
assert!(info.is_some());
println!("{:#?}", info);
println!("rtt: {}", crate::logs::LogDuration(socket_rtt(&sock)));
}
}
2 changes: 1 addition & 1 deletion lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
push_event, ListenSession, ListenToken, ProxyChannel, Server, SessionManager, CONN_RETRIES,
TIMER,
},
socket::{server_bind, stat::socket_rtt},
socket::{server_bind, stats::socket_rtt},
sozu_command::{
logging,
proto::command::{
Expand Down

0 comments on commit f6e011f

Please sign in to comment.