Skip to content

Commit

Permalink
durable objects metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
j-white committed May 8, 2024
1 parent 9b7dfbd commit 2a569e9
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 51 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ By running it as a worker and pushing metrics, we avoid the need to deploy a ded

- [x] Workers
- [x] D1
- [x] Durable Objects
- [ ] Queues
- [ ] Durable Objects
- [ ] Zones

## Usage
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions features/data/durableobjects_query_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data":{"viewer":{"accounts":[{"durableObjectsInvocationsAdaptiveGroups":[]}]}},"errors":null}
File renamed without changes.
7 changes: 5 additions & 2 deletions features/step_definitions/cf_mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ export class CloudflareMockServer {

start() {
let self = this;
const workerQuery = fs.readFileSync('./features/step_definitions/worker_query_response.json').toString();
const d1Query = fs.readFileSync('./features/step_definitions/d1_query_response.json').toString();
const workerQuery = fs.readFileSync('./features/data/worker_query_response.json').toString();
const d1Query = fs.readFileSync('./features/data/d1_query_response.json').toString();
const durableObjectsQuery = fs.readFileSync('./features/data/durableobjects_query_response.json').toString();
this.server = http.createServer((req, res) => {
var body = "";
req.on('readable', function() {
Expand All @@ -23,6 +24,8 @@ export class CloudflareMockServer {
res.setHeader('Content-Type', 'application/json');
if (body.indexOf('d1AnalyticsAdaptiveGroups') > -1) {
res.end(d1Query);
} else if (body.indexOf('durableObjectsInvocationsAdaptiveGroups') > -1) {
res.end(durableObjectsQuery);
} else {
res.end(workerQuery);
}
Expand Down
File renamed without changes.
35 changes: 35 additions & 0 deletions gql/durableobjects_query.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
query GetDurableObjectsAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
durableObjectsInvocationsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
scriptName
datetimeMinute
}

sum {
errors
requests
}

quantiles {
responseBodySizeP25
responseBodySizeP50
responseBodySizeP75
responseBodySizeP90
responseBodySizeP99
responseBodySizeP999
wallTimeP25
wallTimeP50
wallTimeP75
wallTimeP90
wallTimeP99
wallTimeP999
}
}
}
}
}
36 changes: 0 additions & 36 deletions gql/queries.graphql
Original file line number Diff line number Diff line change
@@ -1,42 +1,6 @@


query GetDurableObjectsAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
durableObjectsInvocationsAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
scriptName
datetimeMinute
}

sum {
errors
requests
responseBodySize
wallTime
}

quantiles {
responseBodySizeP25
responseBodySizeP50
responseBodySizeP75
responseBodySizeP90
responseBodySizeP99
responseBodySizeP999
wallTimeP25
wallTimeP50
wallTimeP75
wallTimeP90
wallTimeP99
wallTimeP999
}
}
}
}
}

query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
Expand Down
File renamed without changes.
97 changes: 86 additions & 11 deletions src/gql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use worker::console_log;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/workers.graphql",
query_path = "gql/workers_query.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
Expand All @@ -21,21 +21,21 @@ pub struct GetWorkersAnalyticsQuery;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/d1.graphql",
query_path = "gql/d1_query.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
pub struct GetD1AnalyticsQuery;

// #[derive(GraphQLQuery)]
// #[graphql(
// schema_path = "gql/schema.graphql",
// query_path = "gql/queries.graphql",
// variables_derives = "Debug",
// response_derives = "Debug,Clone"
// )]
// pub struct GetDurableObjectsAnalyticsQuery;
//
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/durableobjects_query.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
pub struct GetDurableObjectsAnalyticsQuery;

// #[derive(GraphQLQuery)]
// #[graphql(
// schema_path = "gql/schema.graphql",
Expand Down Expand Up @@ -208,6 +208,81 @@ pub async fn do_get_d1_analytics_query(cloudflare_api_url: &String, cloudflare_a
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_durable_objects_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
let request_body = GetDurableObjectsAnalyticsQuery::build_query(variables);
//console_log!("request_body: {:?}", request_body);
let client = reqwest::Client::new();
let res = client.post(cloudflare_api_url)
.bearer_auth(cloudflare_api_key)
.json(&request_body).send().await?;

if !res.status().is_success() {
console_log!("GraphQL query failed: {:?}", res.status());
return Err(Box::new(res.error_for_status().unwrap_err()));
}

let response_body: Response<get_durable_objects_analytics_query::ResponseData> = res.json().await?;
if response_body.errors.is_some() {
console_log!("GraphQL query failed: {:?}", response_body.errors);
return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap())));
}
let response_data: get_durable_objects_analytics_query::ResponseData = response_body.data.expect("missing response data");

let registry = Registry::new();
let do_errors_opts = Opts::new("cloudflare_durable_objects_errors", "Sum of errors");
let do_errors = CounterVec::new(do_errors_opts, &["script_name"]).unwrap();
registry.register(Box::new(do_errors.clone())).unwrap();

let do_requests_opts = Opts::new("cloudflare_durable_objects_requests", "Sum of requests");
let do_requests = CounterVec::new(do_requests_opts, &["script_name"]).unwrap();
registry.register(Box::new(do_requests.clone())).unwrap();

let do_response_body_size_bytes_opts = Opts::new("cloudflare_durable_objects_response_body_size_bytes", "Response body size - bytes");
let do_response_body_size_bytes = GaugeVec::new(do_response_body_size_bytes_opts, &["script_name", "quantile"]).unwrap();
registry.register(Box::new(do_response_body_size_bytes.clone())).unwrap();

let do_wall_time_microseconds_opts = Opts::new("cloudflare_durable_objects_wall_time_microseconds", "Wall time - microseconds");
let do_wall_time_microseconds = GaugeVec::new(do_wall_time_microseconds_opts, &["script_name", "quantile"]).unwrap();
registry.register(Box::new(do_wall_time_microseconds.clone())).unwrap();

let mut last_datetime: Option<Time> = None;
for account in response_data.clone().viewer.unwrap().accounts.iter() {
for group in account.durable_objects_invocations_adaptive_groups.iter() {
let dimensions = group.dimensions.as_ref().unwrap();
last_datetime = Some(dimensions.datetime_minute.clone());
let script_name = dimensions.script_name.clone();
let sum = group.sum.as_ref().unwrap();
let quantiles = group.quantiles.as_ref().unwrap();

do_errors.with_label_values(&[script_name.as_str()]).inc_by(sum.errors as f64);
do_requests.with_label_values(&[script_name.as_str()]).inc_by(sum.requests as f64);

do_response_body_size_bytes.with_label_values(&[script_name.as_str(), "P25"]).set(quantiles.response_body_size_p25 as f64);
do_response_body_size_bytes.with_label_values(&[script_name.as_str(), "P50"]).set(quantiles.response_body_size_p50 as f64);
do_response_body_size_bytes.with_label_values(&[script_name.as_str(), "P75"]).set(quantiles.response_body_size_p75 as f64);
do_response_body_size_bytes.with_label_values(&[script_name.as_str(), "P90"]).set(quantiles.response_body_size_p90 as f64);
do_response_body_size_bytes.with_label_values(&[script_name.as_str(), "P99"]).set(quantiles.response_body_size_p99 as f64);
do_response_body_size_bytes.with_label_values(&[script_name.as_str(), "P999"]).set(quantiles.response_body_size_p999 as f64);

do_wall_time_microseconds.with_label_values(&[script_name.as_str(), "P25"]).set(quantiles.wall_time_p25 as f64);
do_wall_time_microseconds.with_label_values(&[script_name.as_str(), "P50"]).set(quantiles.wall_time_p50 as f64);
do_wall_time_microseconds.with_label_values(&[script_name.as_str(), "P75"]).set(quantiles.wall_time_p75 as f64);
do_wall_time_microseconds.with_label_values(&[script_name.as_str(), "P90"]).set(quantiles.wall_time_p90 as f64);
do_wall_time_microseconds.with_label_values(&[script_name.as_str(), "P99"]).set(quantiles.wall_time_p99 as f64);
do_wall_time_microseconds.with_label_values(&[script_name.as_str(), "P999"]).set(quantiles.wall_time_p999 as f64);
}
}

let timestamp: std::time::SystemTime = last_datetime.map(|datetime| {
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&*datetime, "%+").unwrap();
datetime.and_utc().into()
}).unwrap_or_else(|| {
to_std_systemtime(SystemTime::now())
});

Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

fn to_std_systemtime(time: web_time::SystemTime) -> std::time::SystemTime {
let duration = time.duration_since(web_time::SystemTime::UNIX_EPOCH).unwrap();
std::time::SystemTime::UNIX_EPOCH + duration
Expand Down
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use prost::Message;
use worker::*;
use worker::js_sys::Uint8Array;
use worker::wasm_bindgen::JsValue;
use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do_get_d1_analytics_query, get_d1_analytics_query};
use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do_get_d1_analytics_query, get_d1_analytics_query, do_get_durableobjects_analytics_query, get_durable_objects_analytics_query};

mod gql;
mod metrics;
Expand Down Expand Up @@ -103,6 +103,24 @@ async fn do_trigger(env: Env) -> Result<()> {
return Err(Error::JsError(e.to_string()));
}
};

let result = do_get_durableobjects_analytics_query(&cloudflare_api_url, &cloudflare_api_key, get_durable_objects_analytics_query::Variables {
account_tag: cloudflare_account_id.clone(),
datetime_start: Some(start.to_rfc3339()),
datetime_end: Some(end.to_rfc3339()),
limit: 9999,
}).await;
match result {
Ok(metrics) => {
for metric in metrics {
all_metrics.push(metric);
}
},
Err(e) => {
console_log!("Querying Cloudflare API failed: {:?}", e);
return Err(Error::JsError(e.to_string()));
}
};
console_log!("Done fetching!");

do_push_metrics(env, all_metrics).await
Expand Down

0 comments on commit 2a569e9

Please sign in to comment.