Skip to content

Commit

Permalink
create type WebSocketContext
Browse files Browse the repository at this point in the history
the point is to use a structure instead of a text,
for an easier parsing of the access log of the access logss
  • Loading branch information
Keksoj committed Mar 11, 2024
1 parent 5f55561 commit 756b78b
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 29 deletions.
4 changes: 1 addition & 3 deletions command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,4 @@ message HttpEndpoint {
optional string reason = 5;
}

message TcpEndpoint {
optional string context = 1;
}
message TcpEndpoint {}
8 changes: 2 additions & 6 deletions command/src/logging/access_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ pub enum EndpointRecord<'a> {
status: Option<u16>,
reason: Option<&'a str>,
},
Tcp {
context: Option<&'a str>,
},
Tcp,
}

/// used to aggregate tags in a session
Expand Down Expand Up @@ -153,9 +151,7 @@ impl RequestRecord<'_> {
status: status.map(|s| s as u32),
reason: reason.duplicate(),
}),
EndpointRecord::Tcp { context } => protobuf_endpoint::Inner::Tcp(TcpEndpoint {
context: context.duplicate(),
}),
EndpointRecord::Tcp => protobuf_endpoint::Inner::Tcp(TcpEndpoint {}),
};

ManuallyDrop::new(ProtobufAccessLog {
Expand Down
4 changes: 2 additions & 2 deletions command/src/logging/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl fmt::Display for EndpointRecord<'_> {
path.as_str_or("-"),
display_status(*status, f.alternate()),
),
Self::Tcp { context } => {
write!(f, "{}", context.as_str_or("-"))
Self::Tcp => {
write!(f, "-")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions command/src/proto/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,8 +970,8 @@ impl Display for ProtobufEndpoint {
path.as_string_or("-"),
status.as_string_or("-"),
),
Some(protobuf_endpoint::Inner::Tcp(TcpEndpoint { context })) => {
write!(f, "{}", context.as_string_or("-"))
Some(protobuf_endpoint::Inner::Tcp(_)) => {
write!(f, "-")
}
None => Ok(()),
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl HttpSession {
Protocol::HTTP,
http.context.id,
http.context.session_address,
Some(ws_context),
ws_context,
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
Expand Down
2 changes: 1 addition & 1 deletion lib/src/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ impl HttpsSession {
Protocol::HTTP,
http.context.id,
http.context.session_address,
Some(ws_context),
ws_context,
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
Expand Down
12 changes: 10 additions & 2 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use crate::{
RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult,
};

use super::pipe::WebSocketContext;

/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
type GenericHttpStream = kawa::Kawa<Checkout>;

Expand Down Expand Up @@ -812,8 +814,14 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

/// Format the context of the websocket into a loggable String
pub fn websocket_context(&self) -> String {
format!("{}", self.log_endpoint())
pub fn websocket_context(&self) -> WebSocketContext {
WebSocketContext::Http {
method: self.context.method.clone(),
authority: self.context.authority.clone(),
path: self.context.path.clone(),
reason: self.context.reason.clone(),
status: self.context.status,
}
}

pub fn log_request(&mut self, metrics: &SessionMetrics, message: Option<&str>) {
Expand Down
35 changes: 31 additions & 4 deletions lib/src/protocol/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use crate::{
L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
};

use super::http::parser::Method;

#[derive(PartialEq, Eq)]
pub enum SessionStatus {
Normal,
Expand All @@ -31,6 +33,18 @@ enum ConnectionStatus {
Closed,
}

/// matches sozu_command_lib::logging::access_logs::EndpointRecords
pub enum WebSocketContext {
Http {
method: Option<Method>,
authority: Option<String>,
path: Option<String>,
status: Option<u16>,
reason: Option<String>,
},
Tcp,
}

pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
backend_buffer: Checkout,
backend_id: Option<String>,
Expand All @@ -51,7 +65,7 @@ pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
protocol: Protocol,
request_id: Ulid,
session_address: Option<SocketAddr>,
websocket_context: Option<String>,
websocket_context: WebSocketContext,
}

impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
Expand All @@ -77,7 +91,7 @@ impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
protocol: Protocol,
request_id: Ulid,
session_address: Option<SocketAddr>,
websocket_context: Option<String>,
websocket_context: WebSocketContext,
) -> Pipe<Front, L> {
let frontend_status = ConnectionStatus::Normal;
let backend_status = if backend_socket.is_none() {
Expand Down Expand Up @@ -622,8 +636,21 @@ impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
}

fn log_endpoint(&self) -> EndpointRecord {
EndpointRecord::Tcp {
context: self.websocket_context.as_deref(),
match &self.websocket_context {
WebSocketContext::Http {
method,
authority,
path,
status,
reason,
} => EndpointRecord::Http {
method: method.as_deref(),
authority: authority.as_deref(),
path: path.as_deref(),
status: status.to_owned(),
reason: reason.as_deref(),
},
WebSocketContext::Tcp => EndpointRecord::Tcp,
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/protocol/proxy_protocol/expect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use sozu_command::{config::MAX_LOOP_ITERATIONS, logging::LogContext};

use crate::{
pool::Checkout,
protocol::{pipe::Pipe, SessionResult, SessionState},
protocol::{pipe::{Pipe, WebSocketContext}, SessionResult, SessionState},
socket::{SocketHandler, SocketResult},
sozu_command::ready::Ready,
tcp::TcpListener,
Expand Down Expand Up @@ -182,7 +182,7 @@ impl<Front: SocketHandler> ExpectProxyProtocol<Front> {
Protocol::TCP,
self.request_id,
addr,
None,
WebSocketContext::Tcp,
);

pipe.frontend_readiness.event = self.frontend_readiness.event;
Expand Down
4 changes: 2 additions & 2 deletions lib/src/protocol/proxy_protocol/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rusty_ulid::Ulid;

use crate::{
pool::Checkout,
protocol::{pipe::Pipe, proxy_protocol::parser::parse_v2_header},
protocol::{pipe::{Pipe, WebSocketContext}, proxy_protocol::parser::parse_v2_header},
socket::{SocketHandler, SocketResult},
sozu_command::ready::Ready,
tcp::TcpListener,
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<Front: SocketHandler> RelayProxyProtocol<Front> {
Protocol::TCP,
self.request_id,
addr,
None,
WebSocketContext::Tcp,
);

pipe.frontend_readiness.event = self.frontend_readiness.event;
Expand Down
4 changes: 2 additions & 2 deletions lib/src/protocol/proxy_protocol/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rusty_ulid::Ulid;
use crate::{
pool::Checkout,
protocol::{
pipe::Pipe,
pipe::{Pipe, WebSocketContext},
proxy_protocol::header::{Command, HeaderV2, ProxyProtocolHeader},
},
socket::SocketHandler,
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<Front: SocketHandler> SendProxyProtocol<Front> {
Protocol::TCP,
self.request_id,
addr,
None,
WebSocketContext::Tcp,
);

pipe.frontend_readiness = self.frontend_readiness;
Expand Down
5 changes: 3 additions & 2 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
backends::{Backend, BackendMap},
pool::{Checkout, Pool},
protocol::{
pipe::WebSocketContext,
proxy_protocol::{
expect::ExpectProxyProtocol, relay::RelayProxyProtocol, send::SendProxyProtocol,
},
Expand Down Expand Up @@ -159,7 +160,7 @@ impl TcpSession {
Protocol::TCP,
request_id,
frontend_address,
None,
WebSocketContext::Tcp,
);
pipe.set_cluster_id(cluster_id.clone());
TcpStateMachine::Pipe(pipe)
Expand Down Expand Up @@ -202,7 +203,7 @@ impl TcpSession {
session_address: self.frontend_address,
backend_address: None,
protocol: "TCP",
endpoint: EndpointRecord::Tcp { context: None },
endpoint: EndpointRecord::Tcp,
tags: listener.get_tags(&listener.get_addr().to_string()),
client_rtt: socket_rtt(self.state.front_socket()),
server_rtt: None,
Expand Down

0 comments on commit 756b78b

Please sign in to comment.