Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions orion-error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ impl Error {
self.0
}

pub fn inner(&self) -> &(impl ErrorTrait + Send + Sync + 'static) {
&self.0
}

pub fn get_context_data<T: 'static>(&self) -> Option<&T> {
if let ErrorImpl::Context(ErrorInfo { message: _, any: Some(val) }, _) = &self.0 {
val.downcast_ref::<T>()
Expand Down Expand Up @@ -274,8 +278,7 @@ enum ErrorImpl {
impl ErrorTrait for ErrorImpl {
fn source(&self) -> Option<&(dyn ErrorTrait + 'static)> {
match self {
Self::Error(err) => err.source(),
Self::Context(_, err) => Some(err.as_ref()),
Self::Error(err) | Self::Context(_, err) => Some(err.as_ref()),
}
}
}
Expand Down
28 changes: 20 additions & 8 deletions orion-format/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ fn benchmark_rust_format(c: &mut Criterion) {
bytes_received: 128,
bytes_sent: 256,
response_flags: ResponseFlags::empty(),
upstream_failure: None,
response_code_details: None,
connection_termination_details: None,
};

let default_header_value = HeaderValue::from_static("");
Expand Down Expand Up @@ -174,6 +177,9 @@ fn benchmark_log_formatter(c: &mut Criterion) {
bytes_received: 128,
bytes_sent: 256,
response_flags: ResponseFlags::empty(),
upstream_failure: None,
response_code_details: None,
connection_termination_details: None,
};

let fmt = LogFormatter::try_new(DEFAULT_ACCESS_LOG_FORMAT, false).stealth_unwrap();
Expand All @@ -183,7 +189,7 @@ fn benchmark_log_formatter(c: &mut Criterion) {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand All @@ -196,7 +202,7 @@ fn benchmark_log_formatter(c: &mut Criterion) {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand All @@ -214,7 +220,7 @@ fn benchmark_log_formatter(c: &mut Criterion) {

let mut formatted = fmt.local_clone();
eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand Down Expand Up @@ -244,14 +250,17 @@ fn benchmark_request_parts(c: &mut Criterion) {
bytes_received: 128,
bytes_sent: 256,
response_flags: ResponseFlags::empty(),
upstream_failure: None,
response_code_details: None,
connection_termination_details: None,
};

let fmt = LogFormatter::try_new("%START_TIME%", false).stealth_unwrap();
c.bench_function("%START_TIME%", |b| {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand All @@ -265,7 +274,7 @@ fn benchmark_request_parts(c: &mut Criterion) {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand All @@ -279,7 +288,7 @@ fn benchmark_request_parts(c: &mut Criterion) {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand All @@ -293,7 +302,7 @@ fn benchmark_request_parts(c: &mut Criterion) {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand Down Expand Up @@ -357,6 +366,9 @@ fn benchmark_log_headers(c: &mut Criterion) {
bytes_received: 128,
bytes_sent: 256,
response_flags: ResponseFlags::empty(),
upstream_failure: None,
response_code_details: None,
connection_termination_details: None,
};

let fmt = LogFormatter::try_new(ENVOY_FORMAT, false).stealth_unwrap();
Expand All @@ -365,7 +377,7 @@ fn benchmark_log_headers(c: &mut Criterion) {
b.iter(|| {
let mut fmt = fmt.local_clone();
black_box(eval_format(
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0 },
&DownstreamContext { request: &request, trace_id: None, request_head_size: 0, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand Down
5 changes: 4 additions & 1 deletion orion-format/src/bin/format-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ fn main() -> Result<(), BoxError> {
bytes_received: 128,
bytes_sent: 256,
response_flags: ResponseFlags::empty(),
upstream_failure: None,
response_code_details: None,
connection_termination_details: None,
};

let fmt = LogFormatter::try_new(DEF_FMT, false)?;
Expand All @@ -71,7 +74,7 @@ fn main() -> Result<(), BoxError> {
for _ in 0..TOTAL {
let mut fmt = black_box(fmt.local_clone());
black_box(eval_format(
&DownstreamContext { request: &request, request_head_size: 0, trace_id: None },
&DownstreamContext { request: &request, request_head_size: 0, trace_id: None, server_name: None },
&DownstreamResponse { response: &response, response_head_size: 0 },
&start,
&end,
Expand Down
124 changes: 63 additions & 61 deletions orion-format/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,56 +52,44 @@ impl Context for TcpContext<'_> {
}
fn eval_part(&self, op: &Operator) -> StringType {
match op {
Operator::UpstreamHost | Operator::UpstreamRemoteAddress => match self.upstream_peer_addr {
Some(addr) => StringType::Smol(addr.to_smolstr()),
None => StringType::None,
Operator::UpstreamHost | Operator::UpstreamRemoteAddress => {
self.upstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr()))
},
Operator::UpstreamRemoteAddressWithoutPort => match self.upstream_peer_addr {
None => StringType::None,
Some(addr) => StringType::Smol(addr.ip().to_smolstr()),
Operator::UpstreamRemoteAddressWithoutPort => {
self.upstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr()))
},
Operator::UpstreamRemotePort => match self.upstream_peer_addr {
None => StringType::None,
Some(addr) => StringType::Smol(addr.port().to_smolstr()),
Operator::UpstreamRemotePort => {
self.upstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr()))
},
Operator::UpstreamLocalAddress => match self.upstream_local_addr {
Some(addr) => StringType::Smol(addr.to_smolstr()),
None => StringType::None,
Operator::UpstreamLocalAddress => {
self.upstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr()))
},
Operator::UpstreamLocalAddressWithoutPort => match self.upstream_local_addr {
Some(addr) => StringType::Smol(addr.ip().to_smolstr()),
None => StringType::None,
Operator::UpstreamLocalAddressWithoutPort => {
self.upstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr()))
},
Operator::UpstreamLocalPort => match self.upstream_local_addr {
Some(addr) => StringType::Smol(addr.port().to_smolstr()),
None => StringType::None,
Operator::UpstreamLocalPort => {
self.upstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr()))
},
Operator::DownstreamLocalAddress => match self.downstream_local_addr {
Some(addr) => StringType::Smol(addr.to_smolstr()),
None => StringType::None,
Operator::DownstreamLocalAddress => {
self.downstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr()))
},
Operator::DownstreamLocalAddressWithoutPort => match self.downstream_local_addr {
Some(addr) => StringType::Smol(addr.ip().to_smolstr()),
None => StringType::None,
Operator::DownstreamLocalAddressWithoutPort => {
self.downstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr()))
},
Operator::DownstreamLocalPort => match self.downstream_local_addr {
Some(addr) => StringType::Smol(addr.port().to_smolstr()),
None => StringType::None,
Operator::DownstreamLocalPort => {
self.downstream_local_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr()))
},

Operator::DownstreamRemoteAddress => match self.downstream_peer_addr {
Some(addr) => StringType::Smol(addr.to_smolstr()),
None => StringType::None,
Operator::DownstreamRemoteAddress => {
self.downstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.to_smolstr()))
},

Operator::DownstreamRemoteAddressWithoutPort => match self.downstream_peer_addr {
Some(addr) => StringType::Smol(addr.ip().to_smolstr()),
None => StringType::None,
Operator::DownstreamRemoteAddressWithoutPort => {
self.downstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.ip().to_smolstr()))
},

Operator::DownstreamRemotePort => match self.downstream_peer_addr {
Some(addr) => StringType::Smol(addr.port().to_smolstr()),
None => StringType::None,
Operator::DownstreamRemotePort => {
self.downstream_peer_addr.map_or(StringType::None, |addr| StringType::Smol(addr.port().to_smolstr()))
},

Operator::UpstreamCluster | Operator::UpstreamClusterRaw => {
Expand Down Expand Up @@ -142,8 +130,9 @@ fn hash_connection(local: Option<&SocketAddr>, peer: Option<&SocketAddr>, protoc

#[derive(Clone, Debug)]
pub struct UpstreamContext<'a> {
pub authority: &'a Authority,
pub cluster_name: &'a str,
pub authority: Option<&'a Authority>,
pub cluster_name: Option<&'a str>,
pub route_name: &'a str,
}

impl Context for UpstreamContext<'_> {
Expand All @@ -152,10 +141,13 @@ impl Context for UpstreamContext<'_> {
}
fn eval_part(&self, op: &Operator) -> StringType {
match op {
Operator::UpstreamHost => StringType::Smol(SmolStr::new(self.authority.as_str())),
Operator::UpstreamHost => {
self.authority.map_or(StringType::None, |name| StringType::Smol(SmolStr::new(name)))
},
Operator::UpstreamCluster | Operator::UpstreamClusterRaw => {
StringType::Smol(SmolStr::new(self.cluster_name))
self.cluster_name.map_or(StringType::None, |cluster_name| StringType::Smol(SmolStr::new(cluster_name)))
},
Operator::RouteName => StringType::Smol(SmolStr::new(self.route_name)),
_ => StringType::None,
}
}
Expand Down Expand Up @@ -184,6 +176,7 @@ pub struct InitHttpContext<'a, T> {
pub downstream_request: &'a Request<T>,
pub request_head_size: usize,
pub trace_id: Option<u128>,
pub server_name: Option<&'a str>,
}

impl<T> Context for InitHttpContext<'_, T> {
Expand All @@ -197,6 +190,7 @@ impl<T> Context for InitHttpContext<'_, T> {
request: self.downstream_request,
trace_id: self.trace_id,
request_head_size: self.request_head_size,
server_name: self.server_name,
}
.eval_part(op),
}
Expand Down Expand Up @@ -259,6 +253,9 @@ pub struct FinishContext {
pub bytes_received: u64,
pub bytes_sent: u64,
pub response_flags: ResponseFlags,
pub upstream_failure: Option<&'static str>,
pub response_code_details: Option<&'static str>,
pub connection_termination_details: Option<&'static str>,
}

impl Context for FinishContext {
Expand All @@ -281,6 +278,15 @@ impl Context for FinishContext {
let mut buffer = itoa::Buffer::new();
StringType::Smol(SmolStr::new(buffer.format(self.bytes_sent)))
},
Operator::UpstreamTransportFailureReason => {
self.upstream_failure.map_or(StringType::None, |msg| StringType::Smol(SmolStr::new_static(msg)))
},
Operator::ResponseCodeDetails => {
self.response_code_details.map_or(StringType::None, |msg| StringType::Smol(SmolStr::new_static(msg)))
},
Operator::ConnectionTerminationDetails => self
.connection_termination_details
.map_or(StringType::None, |msg| StringType::Smol(SmolStr::new_static(msg))),
_ => StringType::None,
}
}
Expand All @@ -290,6 +296,7 @@ pub struct DownstreamContext<'a, T> {
pub request: &'a Request<T>,
pub request_head_size: usize,
pub trace_id: Option<u128>,
pub server_name: Option<&'a str>,
}

pub struct DownstreamResponse<'a, T> {
Expand Down Expand Up @@ -336,21 +343,18 @@ impl<T> Context for DownstreamContext<'_, T> {
StringType::None
}
},
Operator::Request(h) => {
let hv = self.request.headers().get(h.0.as_str());
match hv {
Some(hv) => StringType::Bytes(hv.as_bytes().into()),
None => StringType::None,
}
},
Operator::TraceId => {
if let Some(trace_id) = self.trace_id {
StringType::Smol(format_smolstr!("{:032x}", trace_id))
} else {
StringType::None
}
},
Operator::Request(h) => self
.request
.headers()
.get(h.0.as_str())
.map_or(StringType::None, |hv| StringType::Bytes(hv.as_bytes().into())),
Operator::TraceId => self
.trace_id
.map_or(StringType::None, |trace_id| StringType::Smol(format_smolstr!("{:032x}", trace_id))),
Operator::Protocol => StringType::Smol(SmolStr::new_static(self.request.version().to_static_str())),
Operator::RequestedServerName => {
self.server_name.map_or(StringType::None, |sni| StringType::Smol(SmolStr::new(sni)))
},
_ => StringType::None,
}
}
Expand Down Expand Up @@ -394,13 +398,11 @@ impl<T> Context for DownstreamResponse<'_, T> {
Operator::ResponseStatus | Operator::ResponseCode => {
StringType::Smol(SmolStr::new_inline(self.response.status().as_str()))
},
Operator::Response(header_name) => {
let hv = self.response.headers().get(header_name.0.as_str());
match hv {
Some(hv) => StringType::Bytes(hv.as_bytes().into()),
None => StringType::None,
}
},
Operator::Response(header_name) => self
.response
.headers()
.get(header_name.0.as_str())
.map_or(StringType::None, |hv| StringType::Bytes(hv.as_bytes().into())),
_ => StringType::None,
}
}
Expand Down
15 changes: 10 additions & 5 deletions orion-format/src/grammar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ static ENVOY_PATTERNS: LazyLock<Trie<u8, (Operator, Category, usize, bool)>> = L
trie_mapstr!(trie, "PROTOCOL", Operator::Protocol, Category::DOWNSTREAM_REQUEST);
trie_mapstr!(trie, "UPSTREAM_PROTOCOL", Operator::UpstreamProtocol, Category::UPSTREAM_REQUEST);
trie_mapstr!(trie, "RESPONSE_CODE", Operator::ResponseCode, Category::DOWNSTREAM_RESPONSE);
trie_mapstr!(trie, "RESPONSE_CODE_DETAILS", Operator::ResponseCodeDetails, Category::UNSUPPORTED);
trie_mapstr!(trie, "CONNECTION_TERMINATION_DETAILS", Operator::ConnectionTerminationDetails, Category::UNSUPPORTED);
trie_mapstr!(trie, "RESPONSE_CODE_DETAILS", Operator::ResponseCodeDetails, Category::FINISH_CONTEXT);
trie_mapstr!(
trie,
"CONNECTION_TERMINATION_DETAILS",
Operator::ConnectionTerminationDetails,
Category::FINISH_CONTEXT
);
trie_mapstr!(trie, "BYTES_SENT", Operator::BytesSent, Category::FINISH_CONTEXT);
trie_mapstr!(trie, "UPSTREAM_WIRE_BYTES_SENT", Operator::UpstreamWireBytesSent, Category::UNSUPPORTED);
trie_mapstr!(trie, "UPSTREAM_HEADER_BYTES_SENT", Operator::UpstreamHeaderBytesSent, Category::UNSUPPORTED);
Expand Down Expand Up @@ -164,8 +169,8 @@ static ENVOY_PATTERNS: LazyLock<Trie<u8, (Operator, Category, usize, bool)>> = L
trie_mapstr!(trie, "CONNECTION_ID", Operator::ConnectionId, Category::DOWNSTREAM_CONTEXT);
trie_mapstr!(trie, "REQUEST_HEADERS_BYTES", Operator::RequestHeadersBytes, Category::DOWNSTREAM_REQUEST);
trie_mapstr!(trie, "RESPONSE_HEADERS_BYTES", Operator::ResponseHeadersBytes, Category::DOWNSTREAM_RESPONSE);
trie_mapstr!(trie, "REQUESTED_SERVER_NAME", Operator::RequestedServerName, Category::UNSUPPORTED);
trie_mapstr!(trie, "ROUTE_NAME", Operator::RouteName, Category::UNSUPPORTED);
trie_mapstr!(trie, "REQUESTED_SERVER_NAME", Operator::RequestedServerName, Category::DOWNSTREAM_REQUEST);
trie_mapstr!(trie, "ROUTE_NAME", Operator::RouteName, Category::UPSTREAM_CONTEXT);
trie_mapstr!(trie, "UPSTREAM_PEER_URI_SAN", Operator::UpstreamPeerUriSan, Category::UNSUPPORTED);
trie_mapstr!(trie, "UPSTREAM_PEER_DNS_SAN", Operator::UpstreamPeerDnsSan, Category::UNSUPPORTED);
trie_mapstr!(trie, "UPSTREAM_PEER_IP_SAN", Operator::UpstreamPeerIpSan, Category::UNSUPPORTED);
Expand Down Expand Up @@ -220,7 +225,7 @@ static ENVOY_PATTERNS: LazyLock<Trie<u8, (Operator, Category, usize, bool)>> = L
trie,
"UPSTREAM_TRANSPORT_FAILURE_REASON",
Operator::UpstreamTransportFailureReason,
Category::UNSUPPORTED
Category::FINISH_CONTEXT
);
trie_mapstr!(trie, "HOSTNAME", Operator::Hostname, Category::UNSUPPORTED);
trie_mapstr!(trie, "FILTER_CHAIN_NAME", Operator::FilterChainName, Category::UNSUPPORTED);
Expand Down
Loading
Loading