Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
queue len metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
khodzha committed Jun 1, 2020
1 parent 884d356 commit 42979fb
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
svc-agent = { version = "0.13", features = ["diesel"] }
svc-agent = { version = "0.13", features = ["diesel", "queue-counter"] }
svc-authn = { version = "0.5", features = ["jose", "diesel"] }
svc-authz = "0.10"
svc-error = { version = "0.1", features = ["diesel", "r2d2", "svc-agent", "svc-authn", "svc-authz", "sentry-extension"] }
Expand Down
16 changes: 15 additions & 1 deletion src/app/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use svc_agent::AgentId;
use svc_agent::{queue_counter::QueueCounterHandle, AgentId};
use svc_authz::ClientMap as Authz;

use crate::config::Config;
Expand All @@ -15,6 +15,7 @@ pub(crate) struct AppContext {
db: Db,
agent_id: AgentId,
janus_topics: JanusTopics,
queue_counter: Option<QueueCounterHandle>,
}

impl AppContext {
Expand All @@ -27,6 +28,14 @@ impl AppContext {
db,
agent_id,
janus_topics,
queue_counter: None,
}
}

pub(crate) fn add_queue_counter(self, qc: QueueCounterHandle) -> Self {
Self {
queue_counter: Some(qc),
..self
}
}
}
Expand All @@ -37,6 +46,7 @@ pub(crate) trait Context: Sync {
fn db(&self) -> &Db;
fn agent_id(&self) -> &AgentId;
fn janus_topics(&self) -> &JanusTopics;
fn queue_counter(&self) -> &Option<QueueCounterHandle>;
}

impl Context for AppContext {
Expand All @@ -59,6 +69,10 @@ impl Context for AppContext {
fn janus_topics(&self) -> &JanusTopics {
&self.janus_topics
}

fn queue_counter(&self) -> &Option<QueueCounterHandle> {
&self.queue_counter
}
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
74 changes: 64 additions & 10 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ use async_trait::async_trait;
use chrono::{serde::ts_seconds, DateTime, Utc};
use serde_derive::{Deserialize, Serialize};
use svc_agent::mqtt::{
IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ShortTermTimingProperties,
IncomingEventProperties, IntoPublishableMessage, OutgoingEvent, ResponseStatus,
ShortTermTimingProperties,
};

use crate::app::context::Context;
use crate::app::endpoint::prelude::*;
use crate::config::TelemetryConfig;

#[derive(Debug, Deserialize)]
pub(crate) struct PullPayload {}
pub(crate) struct PullPayload {
#[serde(default = "default_duration")]
duration: u64,
}

fn default_duration() -> u64 {
5
}

#[derive(Serialize, Debug)]
pub(crate) struct MetricValue {
Expand All @@ -23,8 +31,18 @@ pub(crate) struct MetricValue {
#[derive(Serialize, Debug)]
#[serde(tag = "metric")]
pub(crate) enum Metric {
//IncomingQueue(MetricValue),
//OutgoingQueue(MetricValue),
#[serde(rename(serialize = "apps.conference.incoming_requests_total"))]
IncomingQueueRequests(MetricValue),
#[serde(rename(serialize = "apps.conference.incoming_responses_total"))]
IncomingQueueResponses(MetricValue),
#[serde(rename(serialize = "apps.conference.incoming_events_total"))]
IncomingQueueEvents(MetricValue),
#[serde(rename(serialize = "apps.conference.outgoing_requests_total"))]
OutgoingQueueRequests(MetricValue),
#[serde(rename(serialize = "apps.conference.outgoing_responses_total"))]
OutgoingQueueResponses(MetricValue),
#[serde(rename(serialize = "apps.conference.outgoing_events_total"))]
OutgoingQueueEvents(MetricValue),
#[serde(rename(serialize = "apps.conference.db_connections_total"))]
DbConnections(MetricValue),
}
Expand All @@ -37,23 +55,59 @@ impl EventHandler for PullHandler {

async fn handle<C: Context>(
context: &C,
_payload: Self::Payload,
payload: Self::Payload,
evp: &IncomingEventProperties,
start_timestamp: DateTime<Utc>,
) -> Result {
match context.config().telemetry {
TelemetryConfig {
id: Some(ref account_id),
} => {
let outgoing_event_payload = vec![Metric::DbConnections(MetricValue {
let now = Utc::now();

let mut metrics = if let Some(qc) = context.queue_counter() {
let stats = qc
.get_stats(payload.duration)
.status(ResponseStatus::BAD_REQUEST)?;

vec![
Metric::IncomingQueueRequests(MetricValue {
value: stats.incoming_requests,
timestamp: now,
}),
Metric::IncomingQueueResponses(MetricValue {
value: stats.incoming_responses,
timestamp: now,
}),
Metric::IncomingQueueEvents(MetricValue {
value: stats.incoming_events,
timestamp: now,
}),
Metric::OutgoingQueueRequests(MetricValue {
value: stats.outgoing_requests,
timestamp: now,
}),
Metric::OutgoingQueueResponses(MetricValue {
value: stats.outgoing_responses,
timestamp: now,
}),
Metric::OutgoingQueueEvents(MetricValue {
value: stats.outgoing_events,
timestamp: now,
}),
]
} else {
vec![]
};

metrics.push(Metric::DbConnections(MetricValue {
value: context.db().state().connections as u64,
timestamp: Utc::now(),
})];
timestamp: now,
}));

let short_term_timing = ShortTermTimingProperties::until_now(start_timestamp);
let props = evp.to_event("metric.create", short_term_timing);
let outgoing_event =
OutgoingEvent::multicast(outgoing_event_payload, props, account_id);
let outgoing_event = OutgoingEvent::multicast(metrics, props, account_id);
let boxed_event =
Box::new(outgoing_event) as Box<dyn IntoPublishableMessage + Send>;
Ok(Box::new(stream::once(boxed_event)))
Expand Down
3 changes: 2 additions & 1 deletion src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ pub(crate) async fn run(db: &ConnectionPool, authz_cache: Option<AuthzCache>) ->
let janus_topics = subscribe(&mut agent, &agent_id, &config)?;

// Context
let context = AppContext::new(config.clone(), authz, db.clone(), janus_topics);
let context = AppContext::new(config.clone(), authz, db.clone(), janus_topics)
.add_queue_counter(agent.get_queue_counter());

// Message handler
let message_handler = Arc::new(MessageHandler::new(agent.clone(), context));
Expand Down
6 changes: 5 additions & 1 deletion src/test_helpers/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use serde_json::json;
use svc_agent::AgentId;
use svc_agent::{queue_counter::QueueCounterHandle, AgentId};
use svc_authz::ClientMap as Authz;

use crate::app::context::{Context, JanusTopics};
Expand Down Expand Up @@ -82,4 +82,8 @@ impl Context for TestContext {
fn janus_topics(&self) -> &JanusTopics {
&self.janus_topics
}

fn queue_counter(&self) -> &Option<QueueCounterHandle> {
&None
}
}

0 comments on commit 42979fb

Please sign in to comment.