Skip to content

Commit

Permalink
feat: add migration for latest table
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Oct 9, 2023
1 parent a06113e commit ebd6cc9
Showing 1 changed file with 100 additions and 14 deletions.
114 changes: 100 additions & 14 deletions iroh-sync/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ouroboros::self_referencing;
use parking_lot::RwLock;
use redb::{
Database, MultimapTableDefinition, Range as TableRange, ReadOnlyTable, ReadTransaction,
ReadableMultimapTable, ReadableTable, StorageError, TableDefinition,
ReadableMultimapTable, ReadableTable, StorageError, Table, TableDefinition,
};

use crate::{
Expand Down Expand Up @@ -54,33 +54,64 @@ const NAMESPACES_TABLE: TableDefinition<&[u8; 32], &[u8; 32]> =
// (u64, [u8; 32], [u8; 32], u64, [u8; 32])
// # (timestamp, signature_namespace, signature_author, len, hash)
const RECORDS_TABLE: TableDefinition<RecordsId, RecordsValue> = TableDefinition::new("records-1");
type RecordsId<'a> = (&'a [u8; 32], &'a [u8; 32], &'a [u8]);
type RecordsValue<'a> = (u64, &'a [u8; 64], &'a [u8; 64], u64, &'a [u8; 32]);
type RecordsTable<'a> = ReadOnlyTable<'a, RecordsId<'static>, RecordsValue<'static>>;
type RecordsRange<'a> = TableRange<'a, RecordsId<'static>, RecordsValue<'static>>;

// Latest
// Latest by author
// Table
// Key: ([u8; 32], [u8; 32]) # (NamespaceId, AuthorId)
// Value: (u64, Vec<u8>) # (Timestamp, Key)
const LATEST_TABLE: TableDefinition<LatestKey, LatestValue> = TableDefinition::new("latest-1");
const LATEST_TABLE: TableDefinition<LatestKey, LatestValue> = TableDefinition::new("latest-by-author-1");
type LatestKey<'a> = (&'a [u8; 32], &'a [u8; 32]);
type LatestValue<'a> = (u64, &'a [u8]);
type LatestTable<'a> = ReadOnlyTable<'a, LatestKey<'static>, LatestValue<'static>>;
type LatestRange<'a> = TableRange<'a, LatestKey<'static>, LatestValue<'static>>;

type RecordsId<'a> = (&'a [u8; 32], &'a [u8; 32], &'a [u8]);
type RecordsValue<'a> = (u64, &'a [u8; 64], &'a [u8; 64], u64, &'a [u8; 32]);
type RecordsRange<'a> = TableRange<'a, RecordsId<'static>, RecordsValue<'static>>;
type RecordsTable<'a> = ReadOnlyTable<'a, RecordsId<'static>, RecordsValue<'static>>;
type DbResult<T> = Result<T, StorageError>;

/// Number of seconds elapsed since [`std::time::SystemTime::UNIX_EPOCH`]. Used to register the
/// last time a peer was useful in a document.
// NOTE: resolution is nanoseconds, stored as a u64 since this covers ~500years from unix epoch,
// which should be more than enough
type Nanos = u64;
/// Peers stored per document.
/// - Key: [`NamespaceId::as_bytes`]
/// - Value: ([`Nanos`], &[`PeerIdBytes`]) representing the last time a peer was used.
const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> =
MultimapTableDefinition::new("sync-peers-1");
/// Number of seconds elapsed since [`std::time::SystemTime::UNIX_EPOCH`]. Used to register the
/// last time a peer was useful in a document.
// NOTE: resolution is nanoseconds, stored as a u64 since this covers ~500years from unix epoch,
// which should be more than enough
type Nanos = u64;

/// migration 001: populate the latest table (which did not exist before)
fn migration_001_populate_latest_table(
records_table: &Table<RecordsId<'static>, RecordsValue<'static>>,
latest_table: &mut Table<LatestKey<'static>, LatestValue<'static>>,
) -> Result<()> {
tracing::info!("Starting migration: 001_populate_latest_table");
#[allow(clippy::type_complexity)]
let mut heads: HashMap<([u8; 32], [u8; 32]), (u64, Vec<u8>)> = HashMap::new();
let iter = records_table.iter()?;

for next in iter {
let next = next?;
let (namespace, author, key) = next.0.value();
let (timestamp, _namespace_sig, _author_sig, _len, _hash) = next.1.value();
heads
.entry((*namespace, *author))
.and_modify(|e| {
if timestamp >= e.0 {
*e = (timestamp, key.to_vec());
}
})
.or_insert_with(|| (timestamp, key.to_vec()));
}
let len = heads.len();
for ((namespace, author), (timestamp, key)) in heads {
latest_table.insert((&namespace, &author), (timestamp, key.as_slice()))?;
}
tracing::info!("Migration finished (inserted {} entries)", len);
Ok(())
}

impl Store {
/// Create or open a store from a `path` to a database file.
Expand All @@ -92,11 +123,16 @@ impl Store {
// Setup all tables
let write_tx = db.begin_write()?;
{
let _table = write_tx.open_table(RECORDS_TABLE)?;
let records_table = write_tx.open_table(RECORDS_TABLE)?;
let _table = write_tx.open_table(NAMESPACES_TABLE)?;
let _table = write_tx.open_table(AUTHORS_TABLE)?;
let _table = write_tx.open_table(LATEST_TABLE)?;
let mut latest_table = write_tx.open_table(LATEST_TABLE)?;
let _table = write_tx.open_multimap_table(NAMESPACE_PEERS_TABLE)?;

// migration 001: populate latest table if it was empty before
if latest_table.is_empty()? && !records_table.is_empty()? {
migration_001_populate_latest_table(&records_table, &mut latest_table)?;
}
}
write_tx.commit()?;

Expand Down Expand Up @@ -931,4 +967,54 @@ mod tests {

Ok(())
}

fn copy_and_modify(
source: &Path,
modify: impl Fn(&redb::WriteTransaction) -> Result<()>,
) -> Result<tempfile::NamedTempFile> {
let dbfile = tempfile::NamedTempFile::new()?;
std::fs::copy(source, dbfile.path())?;
// drop the latest table to test the migration.
let db = Database::create(dbfile.path())?;
let write_tx = db.begin_write()?;
modify(&write_tx)?;
write_tx.commit()?;
Ok(dbfile)
}

#[test]
fn test_migration_001_populate_latest_table() -> Result<()> {
let dbfile = tempfile::NamedTempFile::new()?;
let namespace = Namespace::new(&mut rand::thread_rng());

// create a store and add some data
let store = Store::new(dbfile.path())?;
let author1 = store.new_author(&mut rand::thread_rng())?;
let author2 = store.new_author(&mut rand::thread_rng())?;
let replica = store.new_replica(namespace.clone())?;
replica.hash_and_insert(b"k1", &author1, b"v1")?;
replica.hash_and_insert(b"k2", &author2, b"v1")?;
replica.hash_and_insert(b"k3", &author1, b"v1")?;

let expected = store
.get_latest(namespace.id())?
.collect::<Result<Vec<_>>>()?;
assert_eq!(expected.len(), 2);

// create a copy of our db file with the latest table deleted.
let dbfile_before_migration = copy_and_modify(dbfile.path(), |tx| {
tx.delete_table(LATEST_TABLE)?;
Ok(())
})?;

// open the copied db file, which will run the migration.
let store = Store::new(dbfile_before_migration.path())?;
let actual = store
.get_latest(namespace.id())?
.collect::<Result<Vec<_>>>()?;

assert_eq!(expected, actual);

Ok(())
}
}

0 comments on commit ebd6cc9

Please sign in to comment.