Skip to content

Commit

Permalink
feat(api): instrument graphql task monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Mar 23, 2024
1 parent 6d56ae7 commit cb44f3b
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 15 deletions.
4 changes: 4 additions & 0 deletions crates/synd_api/src/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use synd_feed::feed::{
use crate::{
args::{self, KvsdOptions, TlsOptions},
config,
monitor::Monitors,
repository::kvsd::KvsdClient,
serve::{auth::Authenticator, ServeOptions},
usecase::{authorize::Authorizer, MakeUsecase, Runtime},
Expand All @@ -20,13 +21,15 @@ pub struct Dependency {
pub runtime: Runtime,
pub tls_config: RustlsConfig,
pub serve_options: ServeOptions,
pub monitors: Monitors,
}

impl Dependency {
pub async fn new(
kvsd: KvsdOptions,
tls: TlsOptions,
serve_options: args::ServeOptions,
monitors: Monitors,
) -> anyhow::Result<Self> {
let KvsdOptions {
kvsd_host,
Expand Down Expand Up @@ -71,6 +74,7 @@ impl Dependency {
runtime,
tls_config,
serve_options: serve_options.into(),
monitors,
})
}
}
14 changes: 9 additions & 5 deletions crates/synd_api/src/gql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@ pub mod handler {
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::{response::IntoResponse, Extension};
use synd_o11y::audit_span;
use tokio_metrics::TaskMonitor;
use tracing::Instrument;

use crate::principal::Principal;

use super::SyndSchema;
use crate::{principal::Principal, serve::Context};

pub async fn graphiql() -> impl IntoResponse {
axum::response::Html(GraphiQLSource::build().endpoint("/graphql").finish())
}

pub async fn graphql(
Extension(schema): Extension<SyndSchema>,
Extension(Context {
schema,
gql_monitor,
}): Extension<Context>,
Extension(principal): Extension<Principal>,
req: GraphQLRequest,
) -> GraphQLResponse {
// Inject authentication
let req = req.into_inner().data(principal);
schema.execute(req).instrument(audit_span!()).await.into()
TaskMonitor::instrument(&gql_monitor, schema.execute(req).instrument(audit_span!()))
.await
.into()
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/synd_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod client;
pub mod config;
pub mod dependency;
pub mod gql;
pub mod monitor;
pub mod principal;
pub mod repository;
pub mod serve;
Expand Down
27 changes: 20 additions & 7 deletions crates/synd_api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use synd_api::{
args::{self, Args, ObservabilityOptions},
config,
dependency::Dependency,
monitor::Monitors,
repository::kvsd::ConnectKvsdFailed,
serve::listen_and_serve,
shutdown::Shutdown,
Expand Down Expand Up @@ -89,8 +90,9 @@ async fn run(
o11y,
}: Args,
shutdown: Shutdown,
monitors: Monitors,
) -> anyhow::Result<()> {
let dep = Dependency::new(kvsd, tls, serve).await?;
let dep = Dependency::new(kvsd, tls, serve, monitors).await?;

info!(
version = config::VERSION,
Expand Down Expand Up @@ -118,29 +120,40 @@ fn init_file_descriptor_limit() {
.ok();
}

fn init_runtime_monitor() {
fn init_runtime_monitor() -> Monitors {
let handle = tokio::runtime::Handle::current();
let runtime_monitor = RuntimeMonitor::new(&handle);
let task_monitors = Monitors::new();
let intervals = runtime_monitor
.intervals()
.zip(task_monitors.gql.intervals());
tokio::spawn(async move {
for interval in runtime_monitor.intervals() {
metric!(counter.runtime.poll = interval.total_polls_count);
metric!(counter.runtime.busy_duration = interval.total_busy_duration.as_secs_f64());
for (runtime_metrics, gql_metrics) in intervals {
metric!(counter.runtime.poll = runtime_metrics.total_polls_count);
metric!(
counter.runtime.busy_duration = runtime_metrics.total_busy_duration.as_secs_f64()
);
metric!(
counter.task.graphql.idle_duration = gql_metrics.total_idle_duration.as_secs_f64()
);

tokio::time::sleep(Duration::from_secs(60)).await;
}
});

task_monitors
}

#[tokio::main]
async fn main() {
let args = args::parse();
let _guard = init_tracing(&args.o11y);
let shutdown = Shutdown::watch_signal();
let monitors = init_runtime_monitor();

init_file_descriptor_limit();
init_runtime_monitor();

if let Err(err) = run(args, shutdown).await {
if let Err(err) = run(args, shutdown, monitors).await {
if let Some(err) = err.downcast_ref::<ConnectKvsdFailed>() {
error!("{err}: make sure kvsd is running");
} else {
Expand Down
13 changes: 13 additions & 0 deletions crates/synd_api/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use tokio_metrics::TaskMonitor;

pub struct Monitors {
pub gql: TaskMonitor,
}

impl Monitors {
pub fn new() -> Self {
Self {
gql: TaskMonitor::new(),
}
}
}
17 changes: 14 additions & 3 deletions crates/synd_api/src/serve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{
BoxError, Extension, Router,
};
use tokio::net::TcpListener;
use tokio_metrics::TaskMonitor;
use tower::{limit::ConcurrencyLimitLayer, timeout::TimeoutLayer, ServiceBuilder};
use tower_http::{
cors::CorsLayer, limit::RequestBodyLimitLayer, sensitive_headers::SetSensitiveHeadersLayer,
Expand All @@ -16,7 +17,7 @@ use tracing::info;

use crate::{
dependency::Dependency,
gql,
gql::{self, SyndSchema},
serve::layer::{authenticate, request_metrics::RequestMetricsLayer, trace},
shutdown::Shutdown,
};
Expand All @@ -37,6 +38,12 @@ pub struct ServeOptions {
pub concurrency_limit: usize,
}

#[derive(Clone)]
pub struct Context {
pub gql_monitor: TaskMonitor,
pub schema: SyndSchema,
}

/// Bind tcp listener and serve.
pub async fn listen_and_serve(
dep: Dependency,
Expand Down Expand Up @@ -65,13 +72,17 @@ pub async fn serve(
body_limit_bytes: request_body_limit_bytes,
concurrency_limit,
},
monitors,
} = dep;

let schema = gql::schema_builder().data(runtime).finish();
let cx = Context {
gql_monitor: monitors.gql,
schema: gql::schema_builder().data(runtime).finish(),
};

let service = Router::new()
.route("/graphql", post(gql::handler::graphql))
.layer(Extension(schema))
.layer(Extension(cx))
.layer(authenticate::AuthenticateLayer::new(authenticator))
.route("/graphql", get(gql::handler::graphiql))
.layer(
Expand Down
2 changes: 2 additions & 0 deletions crates/synd_term/tests/test/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ratatui::backend::TestBackend;
use synd_api::{
client::github::GithubClient,
dependency::Dependency,
monitor::Monitors,
repository::kvsd::KvsdClient,
serve::{auth::Authenticator, ServeOptions},
shutdown::Shutdown,
Expand Down Expand Up @@ -61,6 +62,7 @@ pub async fn serve_api(mock_port: u16, api_port: u16) -> anyhow::Result<()> {
runtime,
tls_config,
serve_options,
monitors: Monitors::new(),
};
let listener = TcpListener::bind(("localhost", api_port)).await?;

Expand Down

0 comments on commit cb44f3b

Please sign in to comment.