Skip to content

Commit

Permalink
fix schema migration hang (#1409)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed May 23, 2024
1 parent a439cb6 commit 9da7b2c
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions libsql-server/src/schema/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::task;
use tokio::task::JoinSet;

use crate::connection::program::Program;
use crate::connection::MakeConnection;
use crate::connection::{Connection, MakeConnection};
use crate::database::PrimaryConnectionMaker;
use crate::namespace::meta_store::{MetaStore, MetaStoreConnection};
use crate::namespace::{NamespaceName, NamespaceStore};
Expand Down Expand Up @@ -554,7 +554,15 @@ enum WorkResult {
},
}

async fn backup_meta_store(meta: &MetaStore) -> Result<(), Error> {
async fn backup_meta_store(
meta: &MetaStore,
migration_db: Arc<Mutex<MetaStoreConnection>>,
) -> Result<(), Error> {
with_conn_async(migration_db, |conn| {
Ok(conn.query_row("PRAGMA wal_checkpoint(truncate)", (), |_| Ok(()))?)
})
.await?;

if let Some(mut savepoint) = meta.backup_savepoint() {
if let Err(e) = savepoint.confirmed().await {
tracing::error!("failed to backup meta store: {e}");
Expand All @@ -572,11 +580,23 @@ async fn backup_meta_store(meta: &MetaStore) -> Result<(), Error> {
}

async fn backup_namespace(store: &NamespaceStore, ns: NamespaceName) -> Result<(), Error> {
let savepoint = store
.with(ns.clone(), |ns| ns.db.backup_savepoint())
let (savepoint, conn_maker) = store
.with(ns.clone(), |ns| {
let sp = ns.db.backup_savepoint();
let conn_maker = ns.db.connection_maker();
(sp, conn_maker)
})
.await
.map_err(|e| Error::NamespaceLoad(Box::new(e)))?;

conn_maker
.create()
.await
.map_err(|e| Error::NamespaceBackupFailure(ns.clone(), e.into()))?
.checkpoint()
.await
.map_err(|e| Error::NamespaceBackupFailure(ns.clone(), e.into()))?;

if let Some(mut savepoint) = savepoint {
if let Err(e) = savepoint.confirmed().await {
return Err(Error::NamespaceBackupFailure(ns, e.into()));
Expand Down Expand Up @@ -608,14 +628,14 @@ async fn step_job_failure(
namespace_store: NamespaceStore,
) -> WorkResult {
try_step_job(MigrationJobStatus::DryRunFailure, async move {
with_conn_async(migration_db, move |conn| {
with_conn_async(migration_db.clone(), move |conn| {
// TODO ensure here that this transition is valid
// the error must already be there from when we stepped to DryRunFailure
update_job_status(conn, job_id, MigrationJobStatus::RunFailure, None)
})
.await?;

backup_meta_store(namespace_store.meta_store()).await?;
backup_meta_store(namespace_store.meta_store(), migration_db).await?;

Ok(MigrationJobStatus::RunFailure)
})
Expand All @@ -629,13 +649,13 @@ async fn step_job_waiting_run(
namespace_store: NamespaceStore,
) -> WorkResult {
try_step_job(MigrationJobStatus::DryRunSuccess, async move {
with_conn_async(migration_db, move |conn| {
with_conn_async(migration_db.clone(), move |conn| {
// TODO ensure here that this transition is valid
update_job_status(conn, job_id, MigrationJobStatus::WaitingRun, None)
})
.await?;

backup_meta_store(namespace_store.meta_store()).await?;
backup_meta_store(namespace_store.meta_store(), migration_db).await?;

Ok(MigrationJobStatus::WaitingRun)
})
Expand All @@ -651,7 +671,7 @@ async fn step_job_dry_run_failure(
(task_id, error, ns): (i64, String, NamespaceName),
) -> WorkResult {
try_step_job(status, async move {
with_conn_async(migration_db, move |conn| {
with_conn_async(migration_db.clone(), move |conn| {
let error = format!("task {task_id} for namespace `{ns}` failed with error: {error}");
update_job_status(
conn,
Expand All @@ -662,7 +682,7 @@ async fn step_job_dry_run_failure(
})
.await?;

backup_meta_store(namespace_store.meta_store()).await?;
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
Ok(MigrationJobStatus::DryRunFailure)
})
.await
Expand All @@ -675,12 +695,12 @@ async fn step_job_dry_run_success(
namespace_store: NamespaceStore,
) -> WorkResult {
try_step_job(MigrationJobStatus::WaitingDryRun, async move {
with_conn_async(migration_db, move |conn| {
with_conn_async(migration_db.clone(), move |conn| {
job_step_dry_run_success(conn, job_id)
})
.await?;

backup_meta_store(namespace_store.meta_store()).await?;
backup_meta_store(namespace_store.meta_store(), migration_db).await?;

Ok(MigrationJobStatus::DryRunSuccess)
})
Expand Down Expand Up @@ -744,14 +764,12 @@ async fn step_job_run_success(
// backup the schema
backup_namespace(&namespace_store, schema).await?;

tokio::task::spawn_blocking(move || {
let mut conn = migration_db.lock();
update_job_status(&mut conn, job_id, MigrationJobStatus::RunSuccess, None)
with_conn_async(migration_db.clone(), move |conn| {
update_job_status(conn, job_id, MigrationJobStatus::RunSuccess, None)
})
.await
.expect("task panicked")?;
.await?;

backup_meta_store(namespace_store.meta_store()).await?;
backup_meta_store(namespace_store.meta_store(), migration_db).await?;
Ok(MigrationJobStatus::RunSuccess)
})
.await
Expand Down

0 comments on commit 9da7b2c

Please sign in to comment.