Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions crates/core/src/fix_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use alloc::string::String;

use crate::create_sqlite_optional_text_fn;
use crate::error::{PSResult, PowerSyncError};
use crate::schema::inspection::ExistingTable;
use powersync_sqlite_nostd::{self as sqlite, ColumnType, Value};
use powersync_sqlite_nostd::{Connection, Context, ResultCode};

Expand All @@ -23,12 +24,15 @@ use crate::util::quote_identifier;
pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result<i64, PowerSyncError> {
// language=SQLite
let statement = db
.prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'")
.into_db_result(db)?;
.prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'")
.into_db_result(db)?;

while statement.step()? == ResultCode::ROW {
let full_name = statement.column_text(0)?;
let short_name = statement.column_text(1)?;
let Some((short_name, _)) = ExistingTable::external_name(full_name) else {
continue;
};

let quoted = quote_identifier(full_name);

// language=SQLite
Expand Down
50 changes: 50 additions & 0 deletions crates/core/src/schema/inspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,53 @@ SELECT
Ok(())
}
}

pub struct ExistingTable {
pub name: String,
pub internal_name: String,
pub local_only: bool,
}

impl ExistingTable {
pub fn list(db: *mut sqlite::sqlite3) -> Result<Vec<Self>, PowerSyncError> {
let mut results = vec![];
let stmt = db
.prepare_v2(
"
SELECT name FROM sqlite_master WHERE type = 'table' AND name GLOB 'ps_data_*';
",
)
.into_db_result(db)?;

while stmt.step()? == ResultCode::ROW {
let internal_name = stmt.column_text(0)?;
let Some((name, local_only)) = Self::external_name(internal_name) else {
continue;
};

results.push(ExistingTable {
internal_name: internal_name.to_owned(),
name: name.to_owned(),
local_only: local_only,
});
}

Ok(results)
}

/// Extracts the public name from a `ps_data__` or a `ps_data_local__` table.
///
/// Also returns whether the name is from a local table.
pub fn external_name(name: &str) -> Option<(&str, bool)> {
const LOCAL_PREFIX: &str = "ps_data_local__";
const NORMAL_PREFIX: &str = "ps_data__";

if name.starts_with(LOCAL_PREFIX) {
Some((&name[LOCAL_PREFIX.len()..], true))
} else if name.starts_with(NORMAL_PREFIX) {
Some((&name[NORMAL_PREFIX.len()..], false))
} else {
None
}
}
}
126 changes: 43 additions & 83 deletions crates/core/src/schema/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sqlite::{Connection, ResultCode, Value};

use crate::error::{PSResult, PowerSyncError};
use crate::ext::ExtendedDatabase;
use crate::schema::inspection::ExistingView;
use crate::schema::inspection::{ExistingTable, ExistingView};
use crate::state::DatabaseState;
use crate::util::{quote_identifier, quote_json_path};
use crate::views::{
Expand All @@ -25,112 +25,72 @@ use crate::{create_auto_tx_function, create_sqlite_text_fn};

use super::Schema;

fn update_tables(db: *mut sqlite::sqlite3, schema: &str) -> Result<(), PowerSyncError> {
fn update_tables(db: *mut sqlite::sqlite3, schema: &Schema) -> Result<(), PowerSyncError> {
let existing_tables = ExistingTable::list(db)?;
let mut existing_tables = {
let mut map = BTreeMap::new();
for table in &existing_tables {
map.insert(&*table.name, table);
}
map
};

{
// In a block so that the statement is finalized before dropping tables
// language=SQLite
let statement = db
.prepare_v2(
"\
SELECT
json_extract(json_each.value, '$.name') as name,
powersync_internal_table_name(json_each.value) as internal_name,
ifnull(json_extract(json_each.value, '$.local_only'), 0) as local_only
FROM json_each(json_extract(?, '$.tables'))
WHERE name NOT IN (SELECT name FROM powersync_tables)",
)
.into_db_result(db)?;
statement.bind_text(1, schema, sqlite::Destructor::STATIC)?;
// In a block so that all statements are finalized before dropping tables.
for table in &schema.tables {
if let Some(existing) = existing_tables.remove(&*table.name) {
if existing.local_only != table.local_only() {
// Migrating between local-only and synced tables. This works by deleting
// existing and re-creating the table from scratch. We can re-create first and
// delete the old table afterwards because they have a different name
// (local-only tables have a ps_data_local prefix).

// To delete the old existing table in the end.
existing_tables.insert(&existing.name, existing);
} else {
// Compatible table exists already, nothing to do.
continue;
}
}

while statement.step().into_db_result(db)? == ResultCode::ROW {
let name = statement.column_text(0)?;
let internal_name = statement.column_text(1)?;
let local_only = statement.column_int(2) != 0;
// New table.
let quoted_internal_name = quote_identifier(&table.internal_name());

db.exec_safe(&format!(
"CREATE TABLE {:}(id TEXT PRIMARY KEY NOT NULL, data TEXT)",
quote_identifier(internal_name)
quoted_internal_name
))
.into_db_result(db)?;

if !local_only {
// MOVE data if any
db.exec_text(
&format!(
"INSERT INTO {:}(id, data)
SELECT id, data
FROM ps_untyped
WHERE type = ?",
quote_identifier(internal_name)
),
name,
)
.into_db_result(db)?;

// language=SQLite
db.exec_text(
"DELETE
FROM ps_untyped
WHERE type = ?",
name,
)?;
}

if !local_only {
if !table.local_only() {
// MOVE data if any
db.exec_text(
&format!(
"INSERT INTO {:}(id, data)
SELECT id, data
FROM ps_untyped
WHERE type = ?",
quote_identifier(internal_name)
quoted_internal_name
),
name,
&table.name,
)
.into_db_result(db)?;

// language=SQLite
db.exec_text(
"DELETE
FROM ps_untyped
WHERE type = ?",
name,
)?;
db.exec_text("DELETE FROM ps_untyped WHERE type = ?", &table.name)?;
}
}
}

let mut tables_to_drop: Vec<String> = alloc::vec![];

{
// In a block so that the statement is finalized before dropping tables
// language=SQLite
let statement = db
.prepare_v2(
"\
SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN (
SELECT json_extract(json_each.value, '$.name')
FROM json_each(json_extract(?, '$.tables'))
)",
)
.into_db_result(db)?;
statement.bind_text(1, schema, sqlite::Destructor::STATIC)?;

while statement.step()? == ResultCode::ROW {
let name = statement.column_text(0)?;
let internal_name = statement.column_text(1)?;
let local_only = statement.column_int(2) != 0;

tables_to_drop.push(String::from(internal_name));

if !local_only {
// Remaining tables need to be dropped. But first, we want to move their contents to
// ps_untyped.
for remaining in existing_tables.values() {
if !remaining.local_only {
db.exec_text(
&format!(
"INSERT INTO ps_untyped(type, id, data) SELECT ?, id, data FROM {:}",
quote_identifier(internal_name)
quote_identifier(&remaining.internal_name)
),
name,
&remaining.name,
)
.into_db_result(db)?;
}
Expand All @@ -139,8 +99,8 @@ SELECT name, internal_name, local_only FROM powersync_tables WHERE name NOT IN (

// We cannot have any open queries on sqlite_master at the point that we drop tables, otherwise
// we get "table is locked" errors.
for internal_name in tables_to_drop {
let q = format!("DROP TABLE {:}", quote_identifier(&internal_name));
for remaining in existing_tables.values() {
let q = format!("DROP TABLE {:}", quote_identifier(&remaining.internal_name));
db.exec_safe(&q).into_db_result(db)?;
}

Expand Down Expand Up @@ -305,7 +265,7 @@ fn powersync_replace_schema_impl(
// language=SQLite
db.exec_safe("SELECT powersync_init()").into_db_result(db)?;

update_tables(db, schema)?;
update_tables(db, &parsed_schema)?;
update_indexes(db, &parsed_schema)?;
update_views(db, &parsed_schema)?;

Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/schema/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ impl Table {
.unwrap_or(self.name.as_str())
}

pub fn local_only(&self) -> bool {
self.flags.local_only()
}

pub fn internal_name(&self) -> String {
if self.flags.local_only() {
if self.local_only() {
format!("ps_data_local__{:}", self.name)
} else {
format!("ps_data__{:}", self.name)
Expand Down
25 changes: 10 additions & 15 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use alloc::vec::Vec;
use serde::Deserialize;

use crate::error::{PSResult, PowerSyncError};
use crate::schema::inspection::ExistingTable;
use crate::schema::{PendingStatement, PendingStatementValue, RawTable, Schema};
use crate::state::DatabaseState;
use crate::sync::BucketPriority;
Expand Down Expand Up @@ -434,22 +435,16 @@ impl<'a> ParsedDatabaseSchema<'a> {
}

fn add_from_db(&mut self, db: *mut sqlite::sqlite3) -> Result<(), PowerSyncError> {
// language=SQLite
let statement = db
.prepare_v2(
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'",
)
.into_db_result(db)?;

while statement.step()? == ResultCode::ROW {
let name = statement.column_text(0)?;
// Strip the ps_data__ prefix so that we can lookup tables by their sync protocol name.
let visible_name = name.get(9..).unwrap_or(name);

// Tables which haven't been passed explicitly are assumed to not be raw tables.
self.tables
.insert(String::from(visible_name), ParsedSchemaTable::json_table());
let tables = ExistingTable::list(db)?;
for table in tables {
if !table.local_only {
let visible_name = table.name;

self.tables
.insert(visible_name, ParsedSchemaTable::json_table());
}
}

Ok(())
}
}
Expand Down
Loading