Skip to content
This repository has been archived by the owner on Aug 16, 2023. It is now read-only.

Commit

Permalink
Revert "updated svc-agent to 0.14.11"
Browse files Browse the repository at this point in the history
This reverts commit 3920516.
  • Loading branch information
khodzha committed Nov 5, 2020
1 parent ea0dd91 commit 6a08c34
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 41 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions src/app/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) trait GlobalContext: Sync {
fn queue_counter(&self) -> &Option<QueueCounterHandle>;
fn redis_pool(&self) -> &Option<RedisConnectionPool>;
fn dynamic_stats(&self) -> Option<&DynamicStatsCollector>;
fn get_metrics(&self) -> anyhow::Result<Vec<Metric>>;
fn get_metrics(&self, duration: u64) -> anyhow::Result<Vec<Metric>>;

fn get_conn(&self) -> Result<PooledConnection<ConnectionManager<PgConnection>>, AppError> {
self.db()
Expand Down Expand Up @@ -137,8 +137,8 @@ impl GlobalContext for AppContext {
self.dynamic_stats.as_deref()
}

fn get_metrics(&self) -> anyhow::Result<Vec<Metric>> {
crate::app::metrics::Collector::new(self).get()
fn get_metrics(&self, duration: u64) -> anyhow::Result<Vec<Metric>> {
crate::app::metrics::Collector::new(self, duration).get()
}
}

Expand Down Expand Up @@ -197,8 +197,8 @@ impl<'a, C: GlobalContext> GlobalContext for AppMessageContext<'a, C> {
self.global_context.dynamic_stats()
}

fn get_metrics(&self) -> anyhow::Result<Vec<Metric>> {
self.global_context.get_metrics()
fn get_metrics(&self, duration: u64) -> anyhow::Result<Vec<Metric>> {
self.global_context.get_metrics(duration)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/app/endpoint/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ impl EventHandler for PullHandler {

async fn handle<C: Context>(
context: &mut C,
_payload: Self::Payload,
payload: Self::Payload,
evp: &IncomingEventProperties,
) -> Result {
match context.config().telemetry {
TelemetryConfig {
id: Some(ref account_id),
} => {
let metrics = context
.get_metrics()
.get_metrics(payload.duration)
.error(AppErrorKind::StatsCollectionFailed)?;

let metrics2 = metrics
Expand Down
50 changes: 22 additions & 28 deletions src/app/metrics/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ use crate::app::metrics::{Metric, MetricKey, Tags};

pub(crate) struct Collector<'a, C: GlobalContext> {
context: &'a C,
duration: u64,
}

impl<'a, C: GlobalContext> Collector<'a, C> {
pub(crate) fn new(context: &'a C) -> Self {
Self { context }
pub(crate) fn new(context: &'a C, duration: u64) -> Self {
Self { context, duration }
}

pub(crate) fn get(&self) -> anyhow::Result<Vec<crate::app::metrics::Metric>> {
let now = Utc::now();
let mut metrics = vec![];

append_mqtt_stats(&mut metrics, self.context, now)?;
append_mqtt_stats(&mut metrics, self.context, now, self.duration)?;
append_internal_stats(&mut metrics, self.context, now);
append_redis_pool_metrics(&mut metrics, self.context, now);
append_dynamic_stats(&mut metrics, self.context, now)?;
Expand All @@ -31,63 +32,56 @@ fn append_mqtt_stats(
metrics: &mut Vec<Metric>,
context: &impl GlobalContext,
now: DateTime<Utc>,
duration: u64,
) -> anyhow::Result<()> {
if let Some(qc) = context.queue_counter() {
let stats = qc
.get_stats()
.get_stats(duration)
.map_err(|err| anyhow!(err).context("Failed to get stats"))?;

stats.into_iter().for_each(|(tags, value)| {
let tags = Tags::build_queues_tags(crate::APP_VERSION, context.agent_id(), tags);

if value.incoming_requests > 0 {
metrics.push(Metric::new(
let m = [
Metric::new(
MetricKey::IncomingQueueRequests,
value.incoming_requests,
now,
tags.clone(),
));
}
if value.incoming_responses > 0 {
metrics.push(Metric::new(
),
Metric::new(
MetricKey::IncomingQueueResponses,
value.incoming_responses,
now,
tags.clone(),
));
}
if value.incoming_events > 0 {
metrics.push(Metric::new(
),
Metric::new(
MetricKey::IncomingQueueEvents,
value.incoming_events,
now,
tags.clone(),
));
}
if value.outgoing_requests > 0 {
metrics.push(Metric::new(
),
Metric::new(
MetricKey::OutgoingQueueRequests,
value.outgoing_requests,
now,
tags.clone(),
));
}
if value.outgoing_responses > 0 {
metrics.push(Metric::new(
),
Metric::new(
MetricKey::OutgoingQueueResponses,
value.outgoing_responses,
now,
tags.clone(),
));
}
if value.outgoing_events > 0 {
metrics.push(Metric::new(
),
Metric::new(
MetricKey::OutgoingQueueEvents,
value.outgoing_events,
now,
tags,
));
}
),
];

metrics.extend_from_slice(&m);
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/app/metrics/stats_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<C: GlobalContext + Send + 'static> StatsRoute<C> {
let metrics = self
.message_handler
.global_context()
.get_metrics()
.get_metrics(5)
.context("Failed to get metrics")?;

for metric in metrics {
Expand Down
2 changes: 1 addition & 1 deletion src/test_helpers/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl GlobalContext for TestContext {
None
}

fn get_metrics(&self) -> anyhow::Result<Vec<crate::app::metrics::Metric>> {
fn get_metrics(&self, _duration: u64) -> anyhow::Result<Vec<crate::app::metrics::Metric>> {
Ok(vec![])
}
}
Expand Down

0 comments on commit 6a08c34

Please sign in to comment.