From 9da7b2cb335c926f3fbea85f052517080a0bc3a7 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 23 May 2024 14:27:25 +0200 Subject: [PATCH] fix schema migration hang (#1409) --- libsql-server/src/schema/scheduler.rs | 54 ++++++++++++++++++--------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/libsql-server/src/schema/scheduler.rs b/libsql-server/src/schema/scheduler.rs index 9b176d2275..811d1f5086 100644 --- a/libsql-server/src/schema/scheduler.rs +++ b/libsql-server/src/schema/scheduler.rs @@ -9,7 +9,7 @@ use tokio::task; use tokio::task::JoinSet; use crate::connection::program::Program; -use crate::connection::MakeConnection; +use crate::connection::{Connection, MakeConnection}; use crate::database::PrimaryConnectionMaker; use crate::namespace::meta_store::{MetaStore, MetaStoreConnection}; use crate::namespace::{NamespaceName, NamespaceStore}; @@ -554,7 +554,15 @@ enum WorkResult { }, } -async fn backup_meta_store(meta: &MetaStore) -> Result<(), Error> { +async fn backup_meta_store( + meta: &MetaStore, + migration_db: Arc>, +) -> Result<(), Error> { + with_conn_async(migration_db, |conn| { + Ok(conn.query_row("PRAGMA wal_checkpoint(truncate)", (), |_| Ok(()))?) + }) + .await?; + if let Some(mut savepoint) = meta.backup_savepoint() { if let Err(e) = savepoint.confirmed().await { tracing::error!("failed to backup meta store: {e}"); @@ -572,11 +580,23 @@ async fn backup_meta_store(meta: &MetaStore) -> Result<(), Error> { } async fn backup_namespace(store: &NamespaceStore, ns: NamespaceName) -> Result<(), Error> { - let savepoint = store - .with(ns.clone(), |ns| ns.db.backup_savepoint()) + let (savepoint, conn_maker) = store + .with(ns.clone(), |ns| { + let sp = ns.db.backup_savepoint(); + let conn_maker = ns.db.connection_maker(); + (sp, conn_maker) + }) .await .map_err(|e| Error::NamespaceLoad(Box::new(e)))?; + conn_maker + .create() + .await + .map_err(|e| Error::NamespaceBackupFailure(ns.clone(), e.into()))? + .checkpoint() + .await + .map_err(|e| Error::NamespaceBackupFailure(ns.clone(), e.into()))?; + if let Some(mut savepoint) = savepoint { if let Err(e) = savepoint.confirmed().await { return Err(Error::NamespaceBackupFailure(ns, e.into())); @@ -608,14 +628,14 @@ async fn step_job_failure( namespace_store: NamespaceStore, ) -> WorkResult { try_step_job(MigrationJobStatus::DryRunFailure, async move { - with_conn_async(migration_db, move |conn| { + with_conn_async(migration_db.clone(), move |conn| { // TODO ensure here that this transition is valid // the error must already be there from when we stepped to DryRunFailure update_job_status(conn, job_id, MigrationJobStatus::RunFailure, None) }) .await?; - backup_meta_store(namespace_store.meta_store()).await?; + backup_meta_store(namespace_store.meta_store(), migration_db).await?; Ok(MigrationJobStatus::RunFailure) }) @@ -629,13 +649,13 @@ async fn step_job_waiting_run( namespace_store: NamespaceStore, ) -> WorkResult { try_step_job(MigrationJobStatus::DryRunSuccess, async move { - with_conn_async(migration_db, move |conn| { + with_conn_async(migration_db.clone(), move |conn| { // TODO ensure here that this transition is valid update_job_status(conn, job_id, MigrationJobStatus::WaitingRun, None) }) .await?; - backup_meta_store(namespace_store.meta_store()).await?; + backup_meta_store(namespace_store.meta_store(), migration_db).await?; Ok(MigrationJobStatus::WaitingRun) }) @@ -651,7 +671,7 @@ async fn step_job_dry_run_failure( (task_id, error, ns): (i64, String, NamespaceName), ) -> WorkResult { try_step_job(status, async move { - with_conn_async(migration_db, move |conn| { + with_conn_async(migration_db.clone(), move |conn| { let error = format!("task {task_id} for namespace `{ns}` failed with error: {error}"); update_job_status( conn, @@ -662,7 +682,7 @@ async fn step_job_dry_run_failure( }) .await?; - backup_meta_store(namespace_store.meta_store()).await?; + backup_meta_store(namespace_store.meta_store(), migration_db).await?; Ok(MigrationJobStatus::DryRunFailure) }) .await @@ -675,12 +695,12 @@ async fn step_job_dry_run_success( namespace_store: NamespaceStore, ) -> WorkResult { try_step_job(MigrationJobStatus::WaitingDryRun, async move { - with_conn_async(migration_db, move |conn| { + with_conn_async(migration_db.clone(), move |conn| { job_step_dry_run_success(conn, job_id) }) .await?; - backup_meta_store(namespace_store.meta_store()).await?; + backup_meta_store(namespace_store.meta_store(), migration_db).await?; Ok(MigrationJobStatus::DryRunSuccess) }) @@ -744,14 +764,12 @@ async fn step_job_run_success( // backup the schema backup_namespace(&namespace_store, schema).await?; - tokio::task::spawn_blocking(move || { - let mut conn = migration_db.lock(); - update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess, None) + with_conn_async(migration_db.clone(), move |conn| { + update_job_status(conn, job_id, MigrationJobStatus::RunSuccess, None) }) - .await - .expect("task panicked")?; + .await?; - backup_meta_store(namespace_store.meta_store()).await?; + backup_meta_store(namespace_store.meta_store(), migration_db).await?; Ok(MigrationJobStatus::RunSuccess) }) .await