Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metrics and pktparse #151

Merged
merged 12 commits into from
Aug 30, 2021
8 changes: 4 additions & 4 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ serde_yaml = "0.8"
bincode = "1.3.1"

#Observability
metrics = "0.12.1"
metrics-core = "0.5.2"
metrics-runtime = "0.13.1"
metrics = "0.17.0"
metrics-exporter-prometheus = "0.6.0"
tracing = { version = "0.1.15", features = ["release_max_level_info"]}
tracing-subscriber = { version = "0.2.11", features = ["env-filter"]}
tracing-futures = "0.2.4"
Expand Down Expand Up @@ -72,14 +71,15 @@ rusoto_signature = "0.47.0"
criterion = { version = "0.3", features = ["async_tokio", "html_reports"] }
redis = { version = "0.21.0", features = ["tokio-comp", "cluster"] }
pcap = "0.8.1"
pktparse = {version = "0.4.0", features = ["derive"]}
pktparse = {version = "0.6.1", features = ["serde"]}
dns-parser = "0.8"
tls-parser = "0.10.0"
threadpool = "1.0"
num_cpus = "1.0"
serial_test = "0.5.1"
test-helpers = { path = "../test-helpers" }
hex-literal = "0.3.3"
reqwest = { version = "0.11.4", features = ["blocking"] }

[[bench]]
name = "redis_benches"
Expand Down
38 changes: 13 additions & 25 deletions shotover-proxy/src/admin/httpserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ use hyper::{
};

use bytes::Bytes;
use metrics_core::{Builder, Drain, Observe};
use metrics_runtime::observers::{JsonBuilder, PrometheusBuilder};
use metrics_exporter_prometheus::PrometheusHandle;
use std::convert::Infallible;
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, trace};
use tracing_subscriber::reload::Handle;
use tracing_subscriber::EnvFilter;

/// Exports metrics over HTTP.
pub struct LogFilterHttpExporter<C, S> {
controller: C,
pub struct LogFilterHttpExporter<S> {
recorder_handle: PrometheusHandle,
address: SocketAddr,
handle: Handle<EnvFilter, S>,
}
Expand All @@ -24,7 +23,7 @@ where
S: tracing::Subscriber + 'static,
{
use std::str;
let body = str::from_utf8(&bytes.as_ref()).map_err(|e| format!("{}", e))?;
let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{}", e))?;
trace!(request.body = ?body);
let new_filter = body
.parse::<tracing_subscriber::filter::EnvFilter>()
Expand All @@ -39,17 +38,20 @@ fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
.expect("builder with known status code must not fail")
}

impl<C, S> LogFilterHttpExporter<C, S>
impl<S> LogFilterHttpExporter<S>
where
C: Observe + Send + Sync + 'static,
S: tracing::Subscriber + 'static,
{
/// Creates a new [`HttpExporter`] that listens on the given `address`.
///
/// Observers expose their output by being converted into strings.
pub fn new(controller: C, address: SocketAddr, handle: Handle<EnvFilter, S>) -> Self {
pub fn new(
recorder_handle: PrometheusHandle,
address: SocketAddr,
handle: Handle<EnvFilter, S>,
) -> Self {
LogFilterHttpExporter {
controller,
recorder_handle,
address,
handle,
}
Expand All @@ -58,7 +60,7 @@ where
/// Starts an HTTP server on the `address` the exporter was originally configured with,
/// responding to any request with the output of the configured observer.
pub async fn async_run(self) -> hyper::Result<()> {
let controller = Arc::new(self.controller);
let controller = Arc::new(self.recorder_handle);
let handle = Arc::new(self.handle);

let make_svc = make_service_fn(move |_| {
Expand All @@ -73,21 +75,7 @@ where
async move {
let response = match (req.method(), req.uri().path()) {
(&Method::GET, "/metrics") => {
let output = match req.uri().query() {
Some("x-accept=application/json") => {
let builder = JsonBuilder::new();
let mut observer = builder.build();
controller.observe(&mut observer);
observer.drain()
}
_ => {
let builder = PrometheusBuilder::new();
let mut observer = builder.build();
controller.observe(&mut observer);
observer.drain()
}
};
Response::new(Body::from(output))
Response::new(Body::from(controller.as_ref().render()))
}
(&Method::PUT, "/filter") => {
trace!("setting filter");
Expand Down
14 changes: 6 additions & 8 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::net::SocketAddr;

use anyhow::{anyhow, Result};
use clap::{crate_version, Clap};
use metrics_runtime::Receiver;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::runtime::{self, Runtime};
use tokio::signal;
use tokio::sync::broadcast;
Expand Down Expand Up @@ -81,14 +81,12 @@ impl Runner {
}

pub fn with_observability_interface(self) -> Result<Self> {
let receiver = Receiver::builder()
.build()
.expect("failed to create receiver");
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter =
LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());
let recorder = PrometheusBuilder::new().build();
let handle = recorder.handle();
metrics::set_boxed_recorder(Box::new(recorder))?;

receiver.install();
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(handle, socket, self.tracing.handle.clone());
self.runtime.spawn(exporter.async_run());

Ok(self)
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
// error here is non-recoverable.
let socket = self.accept().await?;

gauge!("shotover_available_connections", self.limit_connections.available_permits() as i64 ,"source" => self.source_name.clone());
gauge!("shotover_available_connections", self.limit_connections.available_permits() as f64,"source" => self.source_name.clone());

let peer = socket
.peer_addr()
Expand Down
5 changes: 2 additions & 3 deletions shotover-proxy/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{anyhow, Result};
use futures::TryFutureExt;

use itertools::Itertools;
use metrics::{counter, timing};
use metrics::{counter, histogram};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Receiver as OneReceiver;
Expand Down Expand Up @@ -191,12 +191,11 @@ impl TransformChain {
wrapper.reset(iter);

let result = wrapper.call_next_transform().await;
let end = Instant::now();
counter!("shotover_chain_total", 1, "chain" => self.name.clone());
if result.is_err() {
counter!("shotover_chain_failures", 1, "chain" => self.name.clone())
}
timing!("shotover_chain_latency", start, end, "chain" => self.name.clone(), "client_details" => client_details);
histogram!("shotover_chain_latency", start.elapsed(), "chain" => self.name.clone(), "client_details" => client_details);
result
}
}
6 changes: 2 additions & 4 deletions shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::message::{Message, Messages};
use crate::transforms::cassandra::cassandra_codec_destination::{
CodecConfiguration, CodecDestination,
};
use metrics::{counter, timing};
use metrics::{counter, histogram};

use crate::transforms::chain::TransformChain;
use crate::transforms::coalesce::{Coalesce, CoalesceConfig};
Expand Down Expand Up @@ -294,13 +294,11 @@ impl<'a> Wrapper<'a> {
let result = CONTEXT_CHAIN_NAME
.scope(chain_name, transform.transform(self))
.await;
let end = Instant::now();

counter!("shotover_transform_total", 1, "transform" => transform_name);
if result.is_err() {
counter!("shotover_transform_failures", 1, "transform" => transform_name)
}
timing!("shotover_transform_latency", start, end, "transform" => transform_name);
histogram!("shotover_transform_latency", start.elapsed(), "transform" => transform_name);
result
}

Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/tests/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod observability_int_tests;
36 changes: 36 additions & 0 deletions shotover-proxy/tests/admin/observability_int_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::helpers::ShotoverManager;
use serial_test::serial;
use test_helpers::docker_compose::DockerCompose;

#[test]
#[serial]
fn test_metrics() {
let _compose = DockerCompose::new("examples/redis-passthrough/docker-compose.yml");

let shotover_manager =
ShotoverManager::from_topology_file("examples/redis-passthrough/topology.yaml");

let mut connection = shotover_manager.redis_connection(6379);

redis::cmd("SET")
.arg("the_key")
.arg(42)
.execute(&mut connection);

redis::cmd("SET")
.arg("the_key")
.arg(43)
.execute(&mut connection);

let body = reqwest::blocking::get("http://localhost:9001/metrics")
.unwrap()
.text()
.unwrap();

// If the body contains these substrings, we can assume metrics are working
assert!(body.contains("# TYPE shotover_transform_total counter"));
assert!(body.contains("# TYPE shotover_chain_total counter"));
assert!(body.contains("# TYPE shotover_available_connections gauge"));
assert!(body.contains("# TYPE shotover_transform_latency summary"));
assert!(body.contains("# TYPE shotover_chain_latency summary"));
}
9 changes: 8 additions & 1 deletion shotover-proxy/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ impl ShotoverManager {
config_file: "config/config.yaml".into(),
..ConfigOpts::default()
};
let spawn = Runner::new(opts).unwrap().run_spawn();
let spawn = Runner::new(opts)
.unwrap()
.with_observability_interface()
.unwrap()
.run_spawn();

// If we allow the tracing_guard to be dropped then the following tests in the same file will not get tracing so we mem::forget it.
// This is because tracing can only be initialized once in the same execution, secondary attempts to initalize tracing will silently fail.
Expand Down Expand Up @@ -56,6 +60,9 @@ impl ShotoverManager {

impl Drop for ShotoverManager {
fn drop(&mut self) {
// Must clear the recorder before skipping a shutdown on panic; if one test panics and the recorder is not cleared,
// the following tests will panic because they will try to set another recorder
metrics::clear_recorder();
if std::thread::panicking() {
// If already panicking do nothing in order to avoid a double panic.
// We only shutdown shotover to test the shutdown process not because we need to clean up any resources.
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/tests/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod admin;
pub mod codec;
mod helpers;
pub mod redis_int_tests;