Skip to content

Commit

Permalink
protobuf access logs
Browse files Browse the repository at this point in the history
configuration:
- create configurable log_access_format: ascii or binary

new types:
- create protobuf type BinaryAccessLog, with subtypes
- replace lib::logs::Endpoint with proto::command::BinaryEndpoint
- use fixed32 for ipv4 addresses

refactor:
- remove log module in sozu_lib
- isolate SessionMetrics::register_end_of_session

transfer types to sozu_command_lib::logging:
- CachedTags
- RequestRecord
- LogDuration
- LogContext

avoid allocation:
- create unsafe trait DuplicateOwnership to display access logs
- intermediate buffer in the logger

Rewrite log macros:
- remove separate TAG RefCell
- move log backends and directives to InnerLogger
- allow borrow of Logger::tag and Logger::inner simultaneously
- check for Logger::enabled as soon as possible
- deduplicate log macros with/without arguments
- use scoped macro names instead of the alphabetic list

Co-Authored-By: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
  • Loading branch information
Keksoj and Wonshtrum committed Mar 11, 2024
1 parent 80a5c1d commit dfacdb7
Show file tree
Hide file tree
Showing 27 changed files with 913 additions and 647 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ log_target = "stdout"
# It supports the same options as log_target
# log_access_target = "file:///var/logs/sozu-access.log"

# format of the access logs. Defaults to ascii.
# - ascii
# - protobuf (defined in [sozu_command_lib::proto::command::ProtobufAccessLog])
# log_access_format = "ascii"

# path to the unix socket file used to send commands to sozu
# default value points to "sozu.sock" file in the current directory
command_socket = "./sozu.sock"
Expand Down
5 changes: 4 additions & 1 deletion bin/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tempfile::tempfile;
use sozu_command_lib::{
channel::{Channel, ChannelError},
config::Config,
logging::setup_logging,
logging::{setup_logging, AccessLogFormat},
proto::command::{ServerConfig, WorkerRequest, WorkerResponse},
ready::Ready,
request::{read_initial_state_from_file, RequestError},
Expand Down Expand Up @@ -110,10 +110,13 @@ pub fn begin_worker_process(

let worker_id = format!("{}-{:02}", "WRK", id);

let access_log_format = AccessLogFormat::from(&worker_config.access_log_format());

// do not try to log anything before this, or the logger will panic
setup_logging(
&worker_config.log_target,
worker_config.log_access_target.as_deref(),
Some(access_log_format),
&worker_config.log_level,
&worker_id,
);
Expand Down
1 change: 1 addition & 0 deletions command/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ nix = { version = "^0.28.0", features = ["socket", "uio"] }
nom = "^7.1.3"
prost = "^0.12.3"
rand = "^0.8.5"
rusty_ulid = "^2.0.0"
serde = { version = "^1.0.195", features = ["derive"] }
serde_json = "^1.0.111"
sha2 = "^0.10.8"
Expand Down
181 changes: 181 additions & 0 deletions command/src/access_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::{collections::BTreeMap, mem::ManuallyDrop, net::SocketAddr};

use rusty_ulid::Ulid;
use time::Duration;

use crate::proto::command::{ProtobufAccessLog, ProtobufEndpoint, TcpEndpoint, Uint128};

/// This uses unsafe to creates a "fake" owner of the underlying data.
/// Beware that for the compiler it is as legitimate as the original owner.
/// So you have to elide one of them (with std::mem::forget or ManuallyDrop)
/// before it is drop to avoid a double free.
///
/// This trait works on &T and Option<&T> types
trait DuplicateOwnership {
type Target;
/// Don't forget to use std::mem::forget or ManuallyDrop over one of your owners
unsafe fn duplicate(self) -> Self::Target;
}

impl<T> DuplicateOwnership for &T {
type Target = T;
unsafe fn duplicate(self) -> T {
std::ptr::read(self as *const T)
}
}
impl<'a, T> DuplicateOwnership for Option<&'a T>
where
T: ?Sized,
&'a T: DuplicateOwnership + 'a,
{
type Target = Option<<&'a T as DuplicateOwnership>::Target>;
unsafe fn duplicate(self) -> Self::Target {
self.map(|t| t.duplicate())
}
}
impl DuplicateOwnership for &str {
type Target = String;
unsafe fn duplicate(self) -> Self::Target {
String::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len())
}
}
impl<T> DuplicateOwnership for &[T] {
type Target = Vec<T>;
unsafe fn duplicate(self) -> Self::Target {
Vec::from_raw_parts(self.as_ptr() as *mut _, self.len(), self.len())
}
}

pub fn prepare_user_agent(user_agent: &str) -> String {
let mut user_agent = user_agent.replace(' ', "_");
let mut ua_bytes = std::mem::take(&mut user_agent).into_bytes();
if let Some(last) = ua_bytes.last_mut() {
if *last == b',' {
*last = b'!'
}
}
unsafe { String::from_utf8_unchecked(ua_bytes) }
}

pub struct LogDuration(pub Option<Duration>);

#[derive(Debug)]
pub struct LogContext<'a> {
pub request_id: Ulid,
pub cluster_id: Option<&'a str>,
pub backend_id: Option<&'a str>,
}

pub enum EndpointRecord<'a> {
Http {
method: Option<&'a str>,
authority: Option<&'a str>,
path: Option<&'a str>,
status: Option<u16>,
reason: Option<&'a str>,
},
Tcp {
context: Option<&'a str>,
},
}

/// used to aggregate tags in a session
#[derive(Debug)]
pub struct CachedTags {
pub tags: BTreeMap<String, String>,
pub concatenated: String,
}

impl CachedTags {
pub fn new(tags: BTreeMap<String, String>) -> Self {
let concatenated = tags
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(", ");
Self { tags, concatenated }
}
}

/// Intermediate representation of an access log agnostic of the final format.
/// Every field is a reference to avoid capturing ownership (as a logger should).
pub struct RequestRecord<'a> {
pub error: &'a Option<&'a str>,
pub context: &'a LogContext<'a>,
pub session_address: &'a Option<SocketAddr>,
pub backend_address: &'a Option<SocketAddr>,
pub protocol: &'a str,
pub endpoint: &'a EndpointRecord<'a>,
pub tags: &'a Option<&'a CachedTags>,
pub client_rtt: &'a Option<Duration>,
pub server_rtt: &'a Option<Duration>,
pub user_agent: &'a Option<String>,
pub service_time: &'a Duration,
pub response_time: &'a Duration,
pub bytes_in: &'a usize,
pub bytes_out: &'a usize,
}

impl RequestRecord<'_> {
/// Converts the RequestRecord in its protobuf representation.
/// Prost needs ownership over all the fields but we don't want to take it from the user
/// or clone them, so we use the unsafe DuplicateOwnership.
pub unsafe fn into_protobuf_access_log(
self,
time: i128,
tag: &str,
) -> ManuallyDrop<ProtobufAccessLog> {
let (low, high) = self.context.request_id.into();
let request_id = Uint128 { low, high };
let time: Uint128 = time.into();

let endpoint = match self.endpoint {
EndpointRecord::Http {
method,
authority,
path,
status,
reason,
} => crate::proto::command::protobuf_endpoint::Inner::Http(
crate::proto::command::HttpEndpoint {
method: method.duplicate().duplicate(),
authority: authority.duplicate().duplicate(),
path: path.duplicate().duplicate(),
status: status.map(|s| s as u32),
reason: reason.duplicate().duplicate(),
},
),
EndpointRecord::Tcp { context } => {
crate::proto::command::protobuf_endpoint::Inner::Tcp(TcpEndpoint {
context: context.duplicate().duplicate(),
})
}
};

ManuallyDrop::new(ProtobufAccessLog {
backend_address: self.backend_address.map(Into::into),
backend_id: self.context.backend_id.duplicate(),
bytes_in: *self.bytes_in as u64,
bytes_out: *self.bytes_out as u64,
client_rtt: self.client_rtt.map(|t| t.whole_microseconds() as u64),
cluster_id: self.context.cluster_id.duplicate(),
endpoint: ProtobufEndpoint {
inner: Some(endpoint),
},
error: self.error.duplicate().duplicate(),
protocol: self.protocol.duplicate(),
request_id,
response_time: self.response_time.whole_microseconds() as u64,
server_rtt: self.server_rtt.map(|t| t.whole_microseconds() as u64),
service_time: self.service_time.whole_microseconds() as u64,
session_address: self.session_address.map(Into::into),
tags: self
.tags
.map(|tags| tags.tags.duplicate())
.unwrap_or_default(),
user_agent: self.user_agent.duplicate(),
tag: tag.duplicate(),
time: time.duplicate(),
})
}
}
72 changes: 69 additions & 3 deletions command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ message SocketAddress {

message IpAddress {
oneof inner {
uint32 v4 = 1;
fixed32 v4 = 1;
Uint128 v6 = 2;
}
}
Expand All @@ -632,7 +632,6 @@ message Uint128 {
required uint64 low = 1;
// lower value, last 8 bytes of the ip
required uint64 high = 2;

}

// This is sent only from Sōzu to Sōzu
Expand Down Expand Up @@ -675,6 +674,12 @@ message ServerConfig {
required uint64 command_buffer_size = 13 [default = 1000000];
required uint64 max_command_buffer_size = 14 [default = 2000000];
optional ServerMetricsConfig metrics = 15;
required ProtobufAccessLogFormat access_log_format = 16;
}

enum ProtobufAccessLogFormat {
Ascii = 1;
Protobuf = 2;
}

// Addresses of listeners, passed to new workers
Expand All @@ -691,4 +696,65 @@ message ListenersCount {
// Consists in a collection of worker requests
message InitialState {
repeated WorkerRequest requests = 1;
}
}

// An access log, meant to be passed to another agent
message ProtobufAccessLog {
// error message if any
optional string error = 1;
// LogContext = request_id + cluster_id + backend_id
required Uint128 request_id = 2;
// id of the cluster (set of frontend, backend, routing rules)
optional string cluster_id = 3;
// id of the backend (the server to which the traffic is redirected)
optional string backend_id = 4;
// ip and port of the client
optional SocketAddress session_address = 5;
// socket address of the backend server
optional SocketAddress backend_address = 6;
// the protocol, with SSL/TLS version, for instance "HTTPS-TLS1.1"
required string protocol = 7;
// TCP or HTTP endpoint (method, path, context...)
required ProtobufEndpoint endpoint = 8;
// round trip time for the client (microseconds)
optional uint64 client_rtt = 9;
// round trip time for the backend (microseconds)
optional uint64 server_rtt = 10;
// time for the backend to respond (microseconds)
required uint64 response_time = 12;
// time spent on a session (microseconds)
required uint64 service_time = 13;
// number of bytes received from the client
required uint64 bytes_in = 14;
// number of bytes written to the client
required uint64 bytes_out = 15;
// value of the User-Agent header, if any
optional string user_agent = 16;
// custom tags as key-values, for instance owner_id: MyOrganisation
map<string, string> tags = 17;
// short description of which process sends the log, for instance: "WRK-02"
required string tag = 18;
// POSIX timestamp, nanoseconds
required Uint128 time = 19;
}

message ProtobufEndpoint {
oneof inner {
HttpEndpoint http = 1;
TcpEndpoint tcp = 2;
}
}

message HttpEndpoint {
optional string method = 1;
optional string authority = 2;
optional string path = 3;
// warning: this should be a u16 but protobuf only has uint32.
// Make sure the value never exceeds u16 bounds.
optional uint32 status = 4;
optional string reason = 5;
}

message TcpEndpoint {
optional string context = 1;
}
13 changes: 10 additions & 3 deletions command/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ use toml;

use crate::{
certificate::split_certificate_chain,
logging::AccessLogFormat,
proto::command::{
request::RequestType, ActivateListener, AddBackend, AddCertificate, CertificateAndKey,
Cluster, HttpListenerConfig, HttpsListenerConfig, ListenerType, LoadBalancingAlgorithms,
LoadBalancingParams, LoadMetric, MetricsConfiguration, PathRule, ProxyProtocolConfig,
Request, RequestHttpFrontend, RequestTcpFrontend, RulePosition, ServerConfig,
ServerMetricsConfig, SocketAddress, TcpListenerConfig, TlsVersion, WorkerRequest,
LoadBalancingParams, LoadMetric, MetricsConfiguration, PathRule, ProtobufAccessLogFormat,
ProxyProtocolConfig, Request, RequestHttpFrontend, RequestTcpFrontend, RulePosition,
ServerConfig, ServerMetricsConfig, SocketAddress, TcpListenerConfig, TlsVersion,
WorkerRequest,
},
ObjectKind,
};
Expand Down Expand Up @@ -1090,6 +1092,7 @@ pub struct FileConfig {
pub log_target: Option<String>,
#[serde(default)]
pub log_access_target: Option<String>,
pub log_access_format: Option<AccessLogFormat>,
pub worker_count: Option<u16>,
pub worker_automatic_restart: Option<bool>,
pub metrics: Option<MetricsConfig>,
Expand Down Expand Up @@ -1201,6 +1204,7 @@ impl ConfigBuilder {
front_timeout: file_config.front_timeout.unwrap_or(DEFAULT_FRONT_TIMEOUT),
handle_process_affinity: file_config.handle_process_affinity.unwrap_or(false),
log_access_target: file_config.log_access_target.clone(),
log_access_format: file_config.log_access_format.clone(),
log_level: file_config
.log_level
.clone()
Expand Down Expand Up @@ -1447,6 +1451,7 @@ pub struct Config {
pub log_target: String,
#[serde(default)]
pub log_access_target: Option<String>,
pub log_access_format: Option<AccessLogFormat>,
pub worker_count: u16,
pub worker_automatic_restart: bool,
pub metrics: Option<MetricsConfig>,
Expand Down Expand Up @@ -1777,6 +1782,8 @@ impl From<&Config> for ServerConfig {
command_buffer_size: config.command_buffer_size,
max_command_buffer_size: config.max_command_buffer_size,
metrics,
access_log_format: ProtobufAccessLogFormat::from(&config.log_access_format) as i32,
// log_colored: config.log_colored,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions command/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate serde;
#[macro_use]
/// custom made logging macros
pub mod logging;
pub mod access_logs;
/// Custom buffer used for parsing within the Sōzu codebase.
pub mod buffer;
/// TLS certificates
Expand Down

0 comments on commit dfacdb7

Please sign in to comment.