Skip to content

Commit

Permalink
Merge pull request #486 from psarna/topk
Browse files Browse the repository at this point in the history
libsql-server,admin: add top-k queries to stats
  • Loading branch information
psarna committed Oct 24, 2023
2 parents 409548b + b375291 commit e15da4e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
9 changes: 7 additions & 2 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl<W: WalHook> Connection<W> {

drop(qresult);

self.update_stats(&stmt);
self.update_stats(query.stmt.stmt.clone(), &stmt);

Ok((affected_row_count, last_insert_rowid))
}
Expand Down Expand Up @@ -640,7 +640,7 @@ impl<W: WalHook> Connection<W> {
Ok(())
}

fn update_stats(&self, stmt: &rusqlite::Statement) {
fn update_stats(&self, sql: String, stmt: &rusqlite::Statement) {
let rows_read = stmt.get_status(StatementStatus::RowsRead);
let rows_written = stmt.get_status(StatementStatus::RowsWritten);
let rows_read = if rows_read == 0 && rows_written == 0 {
Expand All @@ -650,6 +650,11 @@ impl<W: WalHook> Connection<W> {
};
self.stats.inc_rows_read(rows_read as u64);
self.stats.inc_rows_written(rows_written as u64);
let weight = (rows_read + rows_written) as i64;
if self.stats.qualifies_as_top_query(weight) {
self.stats
.add_top_query(crate::stats::TopQuery::new(sql, rows_read, rows_written));
}
}

fn describe(&self, sql: &str) -> DescribeResult {
Expand Down
10 changes: 9 additions & 1 deletion libsql-server/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::Json;

use crate::namespace::{MakeNamespace, NamespaceName};
use crate::replication::FrameNo;
use crate::stats::Stats;
use crate::stats::{Stats, TopQuery};

use super::AppState;

Expand All @@ -18,6 +18,7 @@ pub struct StatsResponse {
pub storage_bytes_used: u64,
pub write_requests_delegated: u64,
pub replication_index: FrameNo,
pub top_queries: Vec<TopQuery>,
}

impl From<&Stats> for StatsResponse {
Expand All @@ -28,6 +29,13 @@ impl From<&Stats> for StatsResponse {
storage_bytes_used: stats.storage_bytes_used(),
write_requests_delegated: stats.write_requests_delegated(),
replication_index: stats.get_current_frame_no(),
top_queries: stats
.top_queries()
.read()
.unwrap()
.iter()
.cloned()
.collect(),
}
}
}
Expand Down
54 changes: 52 additions & 2 deletions libsql-server/src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock, Weak};

use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use tokio::time::Duration;

use crate::replication::FrameNo;

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct TopQuery {
#[serde(skip)]
pub weight: i64,
pub rows_written: i32,
pub rows_read: i32,
pub query: String,
}

impl TopQuery {
pub fn new(query: String, rows_read: i32, rows_written: i32) -> Self {
Self {
weight: rows_read as i64 + rows_written as i64,
rows_read,
rows_written,
query,
}
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Stats {
#[serde(default)]
Expand All @@ -22,6 +43,11 @@ pub struct Stats {
write_requests_delegated: AtomicU64,
#[serde(default)]
current_frame_no: AtomicU64,
// Lowest value in currently stored top queries
#[serde(default)]
top_query_threshold: AtomicI64,
#[serde(default)]
top_queries: Arc<RwLock<BTreeSet<TopQuery>>>,
}

impl Stats {
Expand Down Expand Up @@ -91,6 +117,30 @@ impl Stats {
pub(crate) fn get_current_frame_no(&self) -> FrameNo {
self.current_frame_no.load(Ordering::Relaxed)
}

pub(crate) fn add_top_query(&self, query: TopQuery) {
let mut top_queries = self.top_queries.write().unwrap();
tracing::debug!(
"top query: {},{}:{}",
query.rows_read,
query.rows_written,
query.query
);
top_queries.insert(query);
if top_queries.len() > 10 {
top_queries.pop_first();
}
self.top_query_threshold
.store(top_queries.first().unwrap().weight, Ordering::Relaxed);
}

pub(crate) fn qualifies_as_top_query(&self, weight: i64) -> bool {
weight >= self.top_query_threshold.load(Ordering::Relaxed)
}

pub(crate) fn top_queries(&self) -> &Arc<RwLock<BTreeSet<TopQuery>>> {
&self.top_queries
}
}

async fn spawn_stats_persist_thread(stats: Weak<Stats>, path: PathBuf) -> anyhow::Result<()> {
Expand Down

0 comments on commit e15da4e

Please sign in to comment.