Skip to content

Commit

Permalink
Merge pull request #506 from psarna/slowestquery
Browse files Browse the repository at this point in the history
libsql-server: track slowest queries too
  • Loading branch information
psarna committed Oct 27, 2023
2 parents d08eeee + 38c9860 commit 255297a
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 8 deletions.
21 changes: 17 additions & 4 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ impl<W: WalHook> Connection<W> {
builder: &mut impl QueryResultBuilder,
) -> Result<(u64, Option<i64>)> {
tracing::trace!("executing query: {}", query.stmt.stmt);

let start = Instant::now();
let config = self.config_store.get();
let blocked = match query.stmt.kind {
StmtKind::Read | StmtKind::TxnBegin | StmtKind::Other => config.block_reads,
Expand Down Expand Up @@ -606,7 +606,7 @@ impl<W: WalHook> Connection<W> {

drop(qresult);

self.update_stats(query.stmt.stmt.clone(), &stmt);
self.update_stats(query.stmt.stmt.clone(), &stmt, Instant::now() - start);

Ok((affected_row_count, last_insert_rowid))
}
Expand Down Expand Up @@ -640,7 +640,8 @@ impl<W: WalHook> Connection<W> {
Ok(())
}

fn update_stats(&self, sql: String, stmt: &rusqlite::Statement) {
fn update_stats(&self, sql: String, stmt: &rusqlite::Statement, elapsed: Duration) {
let elapsed = elapsed.as_millis() as u64;
let rows_read = stmt.get_status(StatementStatus::RowsRead);
let rows_written = stmt.get_status(StatementStatus::RowsWritten);
let rows_read = if rows_read == 0 && rows_written == 0 {
Expand All @@ -652,8 +653,20 @@ impl<W: WalHook> Connection<W> {
self.stats.inc_rows_written(rows_written as u64);
let weight = (rows_read + rows_written) as i64;
if self.stats.qualifies_as_top_query(weight) {
self.stats.add_top_query(crate::stats::TopQuery::new(
sql.clone(),
rows_read,
rows_written,
));
}
if self.stats.qualifies_as_slowest_query(elapsed) {
self.stats
.add_top_query(crate::stats::TopQuery::new(sql, rows_read, rows_written));
.add_slowest_query(crate::stats::SlowestQuery::new(
sql,
elapsed,
rows_read,
rows_written,
));
}
}

Expand Down
10 changes: 9 additions & 1 deletion libsql-server/src/http/admin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use axum::Json;

use crate::namespace::{MakeNamespace, NamespaceName};
use crate::replication::FrameNo;
use crate::stats::{Stats, TopQuery};
use crate::stats::{SlowestQuery, Stats, TopQuery};

use super::AppState;

Expand All @@ -19,6 +19,7 @@ pub struct StatsResponse {
pub write_requests_delegated: u64,
pub replication_index: FrameNo,
pub top_queries: Vec<TopQuery>,
pub slowest_queries: Vec<SlowestQuery>,
}

impl From<&Stats> for StatsResponse {
Expand All @@ -36,6 +37,13 @@ impl From<&Stats> for StatsResponse {
.iter()
.cloned()
.collect(),
slowest_queries: stats
.slowest_queries()
.read()
.unwrap()
.iter()
.cloned()
.collect(),
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion libsql-server/src/rpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,9 @@ pub async fn garbage_collect(clients: &mut HashMap<Uuid, Arc<PrimaryConnection>>
let limit = std::time::Duration::from_secs(30);

clients.retain(|_, db| db.idle_time() < limit);
tracing::trace!("gc: remaining client handles count: {}", clients.len());
if !clients.is_empty() {
tracing::trace!("gc: remaining client handles count: {}", clients.len());
}
}

#[tonic::async_trait]
Expand Down
49 changes: 47 additions & 2 deletions libsql-server/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ impl TopQuery {
}
}

#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct SlowestQuery {
pub elapsed_ms: u64,
pub query: String,
pub rows_written: i32,
pub rows_read: i32,
}

impl SlowestQuery {
pub fn new(query: String, elapsed_ms: u64, rows_read: i32, rows_written: i32) -> Self {
Self {
elapsed_ms,
query,
rows_read,
rows_written,
}
}
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct Stats {
#[serde(default)]
Expand All @@ -48,6 +67,11 @@ pub struct Stats {
top_query_threshold: AtomicI64,
#[serde(default)]
top_queries: Arc<RwLock<BTreeSet<TopQuery>>>,
// Lowest value in currently stored slowest queries
#[serde(default)]
slowest_query_threshold: AtomicU64,
#[serde(default)]
slowest_queries: Arc<RwLock<BTreeSet<SlowestQuery>>>,
}

impl Stats {
Expand Down Expand Up @@ -129,9 +153,9 @@ impl Stats {
top_queries.insert(query);
if top_queries.len() > 10 {
top_queries.pop_first();
self.top_query_threshold
.store(top_queries.first().unwrap().weight, Ordering::Relaxed);
}
self.top_query_threshold
.store(top_queries.first().unwrap().weight, Ordering::Relaxed);
}

pub(crate) fn qualifies_as_top_query(&self, weight: i64) -> bool {
Expand All @@ -141,6 +165,27 @@ impl Stats {
pub(crate) fn top_queries(&self) -> &Arc<RwLock<BTreeSet<TopQuery>>> {
&self.top_queries
}

pub(crate) fn add_slowest_query(&self, query: SlowestQuery) {
let mut slowest_queries = self.slowest_queries.write().unwrap();
tracing::debug!("slowest query: {}: {}", query.elapsed_ms, query.query);
slowest_queries.insert(query);
if slowest_queries.len() > 10 {
slowest_queries.pop_first();
self.slowest_query_threshold.store(
slowest_queries.first().unwrap().elapsed_ms,
Ordering::Relaxed,
);
}
}

pub(crate) fn qualifies_as_slowest_query(&self, elapsed_ms: u64) -> bool {
elapsed_ms >= self.slowest_query_threshold.load(Ordering::Relaxed)
}

pub(crate) fn slowest_queries(&self) -> &Arc<RwLock<BTreeSet<SlowestQuery>>> {
&self.slowest_queries
}
}

async fn spawn_stats_persist_thread(stats: Weak<Stats>, path: PathBuf) -> anyhow::Result<()> {
Expand Down

0 comments on commit 255297a

Please sign in to comment.