From ee2121d6e91f52a747c05d5e567b680a0e938eae Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Sat, 23 Mar 2024 16:23:27 +0100 Subject: [PATCH 01/15] deps: update redb to v2 in iroh-bytes and iroh-base --- Cargo.lock | 15 +++++++++++--- iroh-base/Cargo.toml | 2 +- iroh-base/src/hash.rs | 6 +++--- iroh-bytes/Cargo.toml | 2 +- iroh-bytes/src/store/fs.rs | 2 +- iroh-bytes/src/store/fs/tables.rs | 34 +++++++++++++++---------------- iroh-bytes/src/util.rs | 2 +- 7 files changed, 36 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1fdb4ffe4..926af72c5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2349,7 +2349,7 @@ dependencies = [ "proptest", "rand", "rand_core", - "redb", + "redb 2.0.0", "serde", "serde-error", "serde_json", @@ -2404,7 +2404,7 @@ dependencies = [ "rand", "range-collections", "rcgen 0.12.1", - "redb", + "redb 2.0.0", "reflink-copy", "rustls", "self_cell", @@ -2666,7 +2666,7 @@ dependencies = [ "rand", "rand_chacha", "rand_core", - "redb", + "redb 1.5.1", "serde", "strum 0.25.0", "tempfile", @@ -4059,6 +4059,15 @@ dependencies = [ "libc", ] +[[package]] +name = "redb" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1100a056c5dcdd4e5513d5333385223b26ef1bf92f31eb38f407e8c20549256" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/iroh-base/Cargo.toml b/iroh-base/Cargo.toml index 54a598fbe8..261712b725 100644 --- a/iroh-base/Cargo.toml +++ b/iroh-base/Cargo.toml @@ -21,7 +21,7 @@ data-encoding = { version = "2.3.3", optional = true } hex = "0.4.3" multibase = { version = "0.9.1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"], optional = true } -redb = { version = "1.5.1", optional = true } +redb = { version = "2.0.0", optional = true } serde = { version = "1", features = ["derive"] } serde-error = "0.1.2" thiserror = "1" diff --git a/iroh-base/src/hash.rs b/iroh-base/src/hash.rs index 1a8ee9c5f6..a6e4e82eea 100644 --- a/iroh-base/src/hash.rs +++ b/iroh-base/src/hash.rs @@ -217,7 +217,7 @@ pub struct HashAndFormat { mod redb_support { use super::{Hash, HashAndFormat}; use postcard::experimental::max_size::MaxSize; - use redb::{RedbKey, RedbValue}; + use redb::{Key as RedbKey, Value as RedbValue}; impl RedbValue for Hash { type SelfType<'a> = Self; @@ -421,7 +421,7 @@ mod tests { #[cfg(feature = "redb")] #[test] fn hash_redb() { - use redb::RedbValue; + use redb::Value as RedbValue; let bytes: [u8; 32] = (0..32).collect::>().as_slice().try_into().unwrap(); let hash = Hash::from(bytes); assert_eq!(::fixed_width(), Some(32)); @@ -446,7 +446,7 @@ mod tests { #[cfg(feature = "redb")] #[test] fn hash_and_format_redb() { - use redb::RedbValue; + use redb::Value as RedbValue; let hash_bytes: [u8; 32] = (0..32).collect::>().as_slice().try_into().unwrap(); let hash = Hash::from(hash_bytes); let haf = HashAndFormat::raw(hash); diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index f045ab9741..c7590e2017 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -38,7 +38,7 @@ postcard = { version = "1", default-features = false, features = ["alloc", "use- quinn = "0.10" rand = "0.8" range-collections = "0.4.0" -redb = { version = "1.5.1", optional = true } +redb = { version = "2.0.0", optional = true } reflink-copy = { version = "0.1.8", optional = true } self_cell = "1.0.1" serde = { version = "1", features = ["derive"] } diff --git a/iroh-bytes/src/store/fs.rs b/iroh-bytes/src/store/fs.rs index 0b4927aba1..0780054ab5 100644 --- a/iroh-bytes/src/store/fs.rs +++ b/iroh-bytes/src/store/fs.rs @@ -307,7 +307,7 @@ impl EntryState { } } -impl redb::RedbValue for EntryState { +impl redb::Value for EntryState { type SelfType<'a> = EntryState; type AsBytes<'a> = SmallVec<[u8; 128]>; diff --git a/iroh-bytes/src/store/fs/tables.rs b/iroh-bytes/src/store/fs/tables.rs index 05ba9c3dd3..ef6aee951a 100644 --- a/iroh-bytes/src/store/fs/tables.rs +++ b/iroh-bytes/src/store/fs/tables.rs @@ -30,12 +30,12 @@ pub(super) trait ReadableTables { /// A struct similar to [`redb::Table`] but for all tables that make up the /// blob store. -pub(super) struct Tables<'a, 'b> { - pub blobs: redb::Table<'a, 'b, Hash, EntryState>, - pub tags: redb::Table<'a, 'b, Tag, HashAndFormat>, - pub inline_data: redb::Table<'a, 'b, Hash, &'static [u8]>, - pub inline_outboard: redb::Table<'a, 'b, Hash, &'static [u8]>, - pub delete_after_commit: &'b mut DeleteSet, +pub(super) struct Tables<'a> { + pub blobs: redb::Table<'a, Hash, EntryState>, + pub tags: redb::Table<'a, Tag, HashAndFormat>, + pub inline_data: redb::Table<'a, Hash, &'static [u8]>, + pub inline_outboard: redb::Table<'a, Hash, &'static [u8]>, + pub delete_after_commit: &'a mut DeleteSet, } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -45,9 +45,9 @@ pub(super) enum BaoFilePart { Sizes, } -impl<'db, 'txn> Tables<'db, 'txn> { +impl<'txn> Tables<'txn> { pub fn new( - tx: &'txn redb::WriteTransaction<'db>, + tx: &'txn redb::WriteTransaction, delete_after_commit: &'txn mut DeleteSet, ) -> std::result::Result { Ok(Self { @@ -60,7 +60,7 @@ impl<'db, 'txn> Tables<'db, 'txn> { } } -impl ReadableTables for Tables<'_, '_> { +impl ReadableTables for Tables<'_> { fn blobs(&self) -> &impl ReadableTable { &self.blobs } @@ -77,15 +77,15 @@ impl ReadableTables for Tables<'_, '_> { /// A struct similar to [`redb::ReadOnlyTable`] but for all tables that make up /// the blob store. -pub(super) struct ReadOnlyTables<'txn> { - pub blobs: redb::ReadOnlyTable<'txn, Hash, EntryState>, - pub tags: redb::ReadOnlyTable<'txn, Tag, HashAndFormat>, - pub inline_data: redb::ReadOnlyTable<'txn, Hash, &'static [u8]>, - pub inline_outboard: redb::ReadOnlyTable<'txn, Hash, &'static [u8]>, +pub(super) struct ReadOnlyTables { + pub blobs: redb::ReadOnlyTable, + pub tags: redb::ReadOnlyTable, + pub inline_data: redb::ReadOnlyTable, + pub inline_outboard: redb::ReadOnlyTable, } -impl<'txn> ReadOnlyTables<'txn> { - pub fn new(tx: &'txn redb::ReadTransaction<'txn>) -> std::result::Result { +impl<'txn> ReadOnlyTables { + pub fn new(tx: &'txn redb::ReadTransaction) -> std::result::Result { Ok(Self { blobs: tx.open_table(BLOBS_TABLE)?, tags: tx.open_table(TAGS_TABLE)?, @@ -95,7 +95,7 @@ impl<'txn> ReadOnlyTables<'txn> { } } -impl ReadableTables for ReadOnlyTables<'_> { +impl ReadableTables for ReadOnlyTables { fn blobs(&self) -> &impl ReadableTable { &self.blobs } diff --git a/iroh-bytes/src/util.rs b/iroh-bytes/src/util.rs index 9654d3ce80..7089124740 100644 --- a/iroh-bytes/src/util.rs +++ b/iroh-bytes/src/util.rs @@ -23,7 +23,7 @@ pub struct Tag(pub Bytes); mod redb_support { use super::Tag; use bytes::Bytes; - use redb::{RedbKey, RedbValue}; + use redb::{Key as RedbKey, Value as RedbValue}; impl RedbValue for Tag { type SelfType<'a> = Self; From 58a30666e5f7bdd360288e88b98a5f921b1ebe5c Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 25 Mar 2024 11:30:34 +0100 Subject: [PATCH 02/15] update iroh-sync to redb v2 --- Cargo.lock | 83 +---------- iroh-sync/Cargo.toml | 5 +- iroh-sync/src/store/fs.rs | 51 +++---- iroh-sync/src/store/fs/migrations.rs | 2 +- iroh-sync/src/store/fs/query.rs | 16 +- iroh-sync/src/store/fs/ranges.rs | 212 +++++++++++---------------- 6 files changed, 128 insertions(+), 241 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 926af72c5b..8e21e7cec3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -49,12 +49,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "aliasable" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" - [[package]] name = "allocator-api2" version = "0.2.16" @@ -2349,7 +2343,7 @@ dependencies = [ "proptest", "rand", "rand_core", - "redb 2.0.0", + "redb", "serde", "serde-error", "serde_json", @@ -2404,7 +2398,7 @@ dependencies = [ "rand", "range-collections", "rcgen 0.12.1", - "redb 2.0.0", + "redb", "reflink-copy", "rustls", "self_cell", @@ -2658,7 +2652,6 @@ dependencies = [ "lru", "num_enum", "once_cell", - "ouroboros", "parking_lot", "postcard", "proptest", @@ -2666,7 +2659,7 @@ dependencies = [ "rand", "rand_chacha", "rand_core", - "redb 1.5.1", + "redb", "serde", "strum 0.25.0", "tempfile", @@ -2718,15 +2711,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itoa" version = "1.0.10" @@ -3255,31 +3239,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "ouroboros" -version = "0.18.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b7be5a8a3462b752f4be3ff2b2bf2f7f1d00834902e46be2a4d68b87b0573c" -dependencies = [ - "aliasable", - "ouroboros_macro", - "static_assertions", -] - -[[package]] -name = "ouroboros_macro" -version = "0.18.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b645dcde5f119c2c454a92d0dfa271a2a3b205da92e4292a68ead4bdbfde1f33" -dependencies = [ - "heck 0.4.1", - "itertools 0.12.1", - "proc-macro2", - "proc-macro2-diagnostics", - "quote", - "syn 2.0.53", -] - [[package]] name = "overload" version = "0.1.1" @@ -3679,7 +3638,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af7cee1a6c8a5b9208b3cb1061f10c0cb689087b3d8ce85fb9d2dd7a29b6ba66" dependencies = [ "diff", - "yansi 0.5.1", + "yansi", ] [[package]] @@ -3741,19 +3700,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "proc-macro2-diagnostics" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.53", - "version_check", - "yansi 1.0.1", -] - [[package]] name = "prometheus-client" version = "0.22.2" @@ -4050,15 +3996,6 @@ dependencies = [ "yasna", ] -[[package]] -name = "redb" -version = "1.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd7f82ecd6ba647a39dd1a7172b8a1cd9453c0adee6da20cb553d83a9a460fa5" -dependencies = [ - "libc", -] - [[package]] name = "redb" version = "2.0.0" @@ -4844,12 +4781,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "str-buf" version = "1.0.6" @@ -6163,12 +6094,6 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" -[[package]] -name = "yansi" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" - [[package]] name = "yasna" version = "0.5.2" diff --git a/iroh-sync/Cargo.toml b/iroh-sync/Cargo.toml index 9dff404233..a3958e7466 100644 --- a/iroh-sync/Cargo.toml +++ b/iroh-sync/Cargo.toml @@ -39,8 +39,7 @@ tracing = "0.1" tokio = { version = "1", features = ["sync"] } # fs-store -redb = { version = "1.5.1", optional = true } -ouroboros = { version = "0.18", optional = true } +redb = { version = "2.0.0", optional = true } # net iroh-net = { version = "0.12.0", optional = true, path = "../iroh-net" } @@ -61,5 +60,5 @@ test-strategy = "0.3.1" [features] default = ["net", "fs-store", "metrics"] net = ["iroh-net", "tokio/io-util", "tokio-stream", "tokio-util", "quinn", "futures"] -fs-store = ["redb", "ouroboros"] +fs-store = ["redb"] metrics = ["iroh-metrics"] diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 57c15f1963..3dd75f03d7 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -17,7 +17,7 @@ use iroh_base::hash::Hash; use parking_lot::RwLock; use redb::{ Database, MultimapTableDefinition, ReadOnlyTable, ReadableMultimapTable, ReadableTable, - TableDefinition, + ReadableTableMetadata, TableDefinition, }; use crate::{ @@ -72,7 +72,7 @@ const RECORDS_TABLE: TableDefinition = TableDefinition: type RecordsId<'a> = (&'a [u8; 32], &'a [u8; 32], &'a [u8]); type RecordsIdOwned = ([u8; 32], [u8; 32], Bytes); type RecordsValue<'a> = (u64, &'a [u8; 64], &'a [u8; 64], u64, &'a [u8; 32]); -type RecordsTable<'a> = ReadOnlyTable<'a, RecordsId<'static>, RecordsValue<'static>>; +type RecordsTable<'a> = ReadOnlyTable, RecordsValue<'static>>; /// Table: Latest per author /// Key: `([u8; 32], [u8; 32])` # (NamespaceId, AuthorId) @@ -147,9 +147,9 @@ impl Store { impl super::Store for Store { type Instance = StoreInstance; - type GetIter<'a> = QueryIterator<'a>; - type ContentHashesIter<'a> = ContentHashesIterator<'a>; - type LatestIter<'a> = LatestIterator<'a>; + type GetIter<'a> = QueryIterator; + type ContentHashesIter<'a> = ContentHashesIterator; + type LatestIter<'a> = LatestIterator; type AuthorsIter<'a> = std::vec::IntoIter>; type NamespaceIter<'a> = std::vec::IntoIter>; type PeersIter<'a> = std::vec::IntoIter; @@ -269,12 +269,12 @@ impl super::Store for Store { { let mut record_table = write_tx.open_table(RECORDS_TABLE)?; let bounds = RecordsBounds::namespace(*namespace); - record_table.drain(bounds.as_ref())?; + record_table.retain_in(bounds.as_ref(), |_k, _v| false)?; } { let mut table = write_tx.open_table(RECORDS_BY_KEY_TABLE)?; let bounds = ByKeyBounds::namespace(*namespace); - let _ = table.drain(bounds.as_ref()); + let _ = table.retain_in(bounds.as_ref(), |_k, _v| false); } { let mut namespace_table = write_tx.open_table(NAMESPACES_TABLE)?; @@ -487,9 +487,8 @@ impl super::DownloadPolicyStore for StoreInstance { impl crate::ranger::Store for StoreInstance { type Error = anyhow::Error; - type RangeIterator<'a> = - Chain, Flatten>>>; - type ParentIterator<'a> = ParentIterator<'a>; + type RangeIterator<'a> = Chain>>; + type ParentIterator<'a> = ParentIterator; /// Get a the first key (or the default if none is available). fn get_first(&self) -> Result { @@ -672,7 +671,7 @@ impl crate::ranger::Store for StoreInstance { predicate(&record) }; - let iter = table.drain_filter(bounds.as_ref(), cb)?; + let iter = table.extract_from_if(bounds.as_ref(), cb)?; iter.count() }; write_tx.commit()?; @@ -689,16 +688,16 @@ fn chain_none<'a, I: Iterator + 'a, T>( /// Iterator over parent entries, i.e. entries with the same namespace and author, and a key which /// is a prefix of the key passed to the iterator. #[derive(Debug)] -pub struct ParentIterator<'a> { - reader: TableReader<'a, RecordsId<'static>, RecordsValue<'static>>, +pub struct ParentIterator { + reader: TableReader, RecordsValue<'static>>, namespace: NamespaceId, author: AuthorId, key: Vec, } -impl<'a> ParentIterator<'a> { +impl ParentIterator { fn new( - db: &'a Arc, + db: &Arc, namespace: NamespaceId, author: AuthorId, key: Vec, @@ -713,7 +712,7 @@ impl<'a> ParentIterator<'a> { } } -impl Iterator for ParentIterator<'_> { +impl Iterator for ParentIterator { type Item = Result; fn next(&mut self) -> Option { @@ -733,16 +732,16 @@ impl Iterator for ParentIterator<'_> { /// Iterator over all content hashes for the fs store. #[derive(Debug)] -pub struct ContentHashesIterator<'a>(RecordsRange<'a>); +pub struct ContentHashesIterator(RecordsRange); -impl<'a> ContentHashesIterator<'a> { - fn new(db: &'a Arc) -> anyhow::Result { - let range = RecordsRange::new(db, |table| table.iter())?; +impl ContentHashesIterator { + fn new(db: &Arc) -> anyhow::Result { + let range = RecordsRange::new(db, |table| table.range::>(..))?; Ok(Self(range)) } } -impl Iterator for ContentHashesIterator<'_> { +impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { @@ -755,12 +754,10 @@ impl Iterator for ContentHashesIterator<'_> { /// Iterator over the latest entry per author. #[derive(Debug)] -pub struct LatestIterator<'a>( - TableRange<'a, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>, -); +pub struct LatestIterator(TableRange, LatestPerAuthorValue<'static>>); -impl<'a> LatestIterator<'a> { - fn new(db: &'a Arc, namespace: NamespaceId) -> anyhow::Result { +impl LatestIterator { + fn new(db: &Arc, namespace: NamespaceId) -> anyhow::Result { Ok(Self(TableRange::new( db, |tx| tx.open_table(LATEST_PER_AUTHOR_TABLE), @@ -773,7 +770,7 @@ impl<'a> LatestIterator<'a> { } } -impl Iterator for LatestIterator<'_> { +impl Iterator for LatestIterator { type Item = Result<(AuthorId, u64, Vec)>; fn next(&mut self) -> Option { diff --git a/iroh-sync/src/store/fs/migrations.rs b/iroh-sync/src/store/fs/migrations.rs index d6178627f9..71341b9f6c 100644 --- a/iroh-sync/src/store/fs/migrations.rs +++ b/iroh-sync/src/store/fs/migrations.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use anyhow::Result; -use redb::{Database, ReadableTable, TableHandle, WriteTransaction}; +use redb::{Database, ReadableTable, ReadableTableMetadata, TableHandle, WriteTransaction}; use tracing::{debug, info}; use crate::{Capability, NamespaceSecret}; diff --git a/iroh-sync/src/store/fs/query.rs b/iroh-sync/src/store/fs/query.rs index 521c219c6d..7cd37ffdd6 100644 --- a/iroh-sync/src/store/fs/query.rs +++ b/iroh-sync/src/store/fs/query.rs @@ -20,28 +20,28 @@ use super::{ /// A query iterator for entry queries. #[derive(Debug)] -pub struct QueryIterator<'a> { - range: QueryRange<'a>, +pub struct QueryIterator { + range: QueryRange, query: Query, offset: u64, count: u64, } #[derive(Debug)] -enum QueryRange<'a> { +enum QueryRange { AuthorKey { - range: RecordsRange<'a>, + range: RecordsRange, key_filter: KeyFilter, }, KeyAuthor { - range: RecordsByKeyRange<'a>, + range: RecordsByKeyRange, author_filter: AuthorFilter, selector: Option, }, } -impl<'a> QueryIterator<'a> { - pub fn new(db: &'a Arc, namespace: NamespaceId, query: Query) -> Result { +impl QueryIterator { + pub fn new(db: &Arc, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -86,7 +86,7 @@ impl<'a> QueryIterator<'a> { } } -impl<'a> Iterator for QueryIterator<'a> { +impl Iterator for QueryIterator { type Item = Result; fn next(&mut self) -> Option> { diff --git a/iroh-sync/src/store/fs/ranges.rs b/iroh-sync/src/store/fs/ranges.rs index 48eeefe49e..b0be1c1e6b 100644 --- a/iroh-sync/src/store/fs/ranges.rs +++ b/iroh-sync/src/store/fs/ranges.rs @@ -7,10 +7,9 @@ use std::{fmt, sync::Arc}; -use ouroboros::self_referencing; use redb::{ - Database, Range, ReadOnlyTable, ReadTransaction, ReadableTable, RedbKey, RedbValue, - StorageError, TableError, + Database, Key as RedbKey, Range, ReadOnlyTable, ReadTransaction, ReadableTable, StorageError, + TableError, Value as RedbValue, }; use crate::{store::SortDirection, SignedEntry}; @@ -24,40 +23,29 @@ use super::{ /// /// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`] and a [`ReadOnlyTable`] /// with self-referencing. -pub struct TableReader<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static>( - TableReaderInner<'a, K, V>, -); - -#[self_referencing] -struct TableReaderInner<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> { - #[debug("ReadTransaction")] - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - table: ReadOnlyTable<'this, K, V>, +pub struct TableReader { + read_tx: ReadTransaction, + table: ReadOnlyTable, } -impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableReader<'a, K, V> { +impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableReader { /// Create a new [`TableReader`] pub fn new( db: &'a Arc, - table_fn: impl for<'this> FnOnce( - &'this ReadTransaction<'this>, - ) -> Result, TableError>, + table_fn: impl FnOnce(&ReadTransaction) -> Result, TableError>, ) -> anyhow::Result { - let reader = TableReaderInner::try_new(db.begin_read()?, |read_tx| { - table_fn(read_tx).map_err(anyhow::Error::from) - })?; - Ok(Self(reader)) + let read_tx = db.begin_read()?; + let table = table_fn(&read_tx).map_err(anyhow::Error::from)?; + Ok(Self { read_tx, table }) } /// Get a reference to the [`ReadOnlyTable`]; pub fn table(&self) -> &ReadOnlyTable { - self.0.borrow_table() + &self.table } } -impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for TableReader<'a, K, V> { +impl fmt::Debug for TableReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TableReader({:?})", self.table()) } @@ -67,51 +55,41 @@ impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for Tabl /// /// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`], a [`ReadOnlyTable`] /// and a [`Range`] together. Useful to build iterators with. -pub struct TableRange<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static>( - TableRangeReaderInner<'a, K, V>, -); - -#[self_referencing] -struct TableRangeReaderInner<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> { - #[debug("ReadTransaction")] - read_tx: ReadTransaction<'a>, - #[borrows(read_tx)] - #[covariant] - table: ReadOnlyTable<'this, K, V>, - #[covariant] - #[borrows(table)] - range: Range<'this, K, V>, +pub struct TableRange { + read_tx: ReadTransaction, + table: ReadOnlyTable, + range: Range<'static, K, V>, } -impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableRange<'a, K, V> { +impl TableRange { /// Create a new [`TableReader`] - pub fn new(db: &'a Arc, table_fn: TF, range_fn: RF) -> anyhow::Result + pub fn new(db: &Arc, table_fn: TF, range_fn: RF) -> anyhow::Result where - TF: for<'s> FnOnce(&'s ReadTransaction<'s>) -> Result, TableError>, - RF: for<'s> FnOnce(&'s ReadOnlyTable<'s, K, V>) -> Result, StorageError>, + TF: FnOnce(&ReadTransaction) -> Result, TableError>, + RF: FnOnce(&ReadOnlyTable) -> Result, StorageError>, { - let reader = TableRangeReaderInner::try_new( - db.begin_read()?, - |tx| table_fn(tx).map_err(anyhow_err), - |table| range_fn(table).map_err(anyhow_err), - )?; - Ok(Self(reader)) + let read_tx = db.begin_read()?; + let table = table_fn(&read_tx).map_err(anyhow_err)?; + let range = range_fn(&table).map_err(anyhow_err)?; + Ok(Self { + read_tx, + table, + range, + }) } /// Get a reference to the [`ReadOnlyTable`]; pub fn table(&self) -> &ReadOnlyTable { - self.0.borrow_table() + &self.table } pub fn next_mapped( &mut self, map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, ) -> Option> { - self.0.with_range_mut(|records| { - records - .next() - .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value()))) - }) + self.range + .next() + .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value()))) } pub fn next_filtered( @@ -120,10 +98,10 @@ impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableRange<'a, K, V> { filter: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> bool, map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, ) -> Option> { - self.0.with_range_mut(|records| loop { + loop { let next = match direction { - SortDirection::Asc => records.next(), - SortDirection::Desc => records.next_back(), + SortDirection::Asc => self.range.next(), + SortDirection::Desc => self.range.next_back(), }; match next { None => break None, @@ -133,11 +111,11 @@ impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableRange<'a, K, V> { true => break Some(Ok(map(res.0.value(), res.1.value()))), }, } - }) + } } } -impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for TableRange<'a, K, V> { +impl fmt::Debug for TableRange { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TableRangeReader({:?})", self.table()) } @@ -145,14 +123,14 @@ impl<'a, K: RedbKey + 'static, V: redb::RedbValue + 'static> fmt::Debug for Tabl /// An iterator over a range of entries from the records table. #[derive(Debug)] -pub struct RecordsRange<'a>(TableRange<'a, RecordsId<'static>, RecordsValue<'static>>); -impl<'a> RecordsRange<'a> { - pub(super) fn new(db: &'a Arc, range_fn: RF) -> anyhow::Result +pub struct RecordsRange(TableRange, RecordsValue<'static>>); +impl RecordsRange { + pub(super) fn new(db: &Arc, range_fn: RF) -> anyhow::Result where - RF: for<'s> FnOnce( - &'s ReadOnlyTable<'s, RecordsId<'static>, RecordsValue<'static>>, + RF: FnOnce( + &ReadOnlyTable, RecordsValue<'static>>, ) -> Result< - Range<'s, RecordsId<'static>, RecordsValue<'static>>, + Range<'static, RecordsId<'static>, RecordsValue<'static>>, StorageError, >, { @@ -163,10 +141,7 @@ impl<'a> RecordsRange<'a> { )?)) } - pub(super) fn with_bounds( - db: &'a Arc, - bounds: RecordsBounds, - ) -> anyhow::Result { + pub(super) fn with_bounds(db: &Arc, bounds: RecordsBounds) -> anyhow::Result { Self::new(db, |table| table.range(bounds.as_ref())) } @@ -189,7 +164,7 @@ impl<'a> RecordsRange<'a> { } } -impl<'a> Iterator for RecordsRange<'a> { +impl Iterator for RecordsRange { type Item = anyhow::Result; fn next(&mut self) -> Option { self.0.next_mapped(into_entry) @@ -198,42 +173,35 @@ impl<'a> Iterator for RecordsRange<'a> { #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] -pub struct RecordsByKeyRange<'a>(RecordsByKeyRangeInner<'a>); - -#[self_referencing] -struct RecordsByKeyRangeInner<'a> { - read_tx: ReadTransaction<'a>, - - #[covariant] - #[borrows(read_tx)] - records_table: ReadOnlyTable<'this, RecordsId<'static>, RecordsValue<'static>>, - - #[covariant] - #[borrows(read_tx)] - by_key_table: ReadOnlyTable<'this, RecordsByKeyId<'static>, ()>, - - #[borrows(by_key_table)] - #[covariant] - by_key_range: Range<'this, RecordsByKeyId<'static>, ()>, +pub struct RecordsByKeyRange { + read_tx: ReadTransaction, + records_table: ReadOnlyTable, RecordsValue<'static>>, + by_key_table: ReadOnlyTable, ()>, + by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } -impl<'a> RecordsByKeyRange<'a> { - pub fn new(db: &'a Arc, range_fn: RF) -> anyhow::Result +impl RecordsByKeyRange { + pub fn new(db: &Arc, range_fn: RF) -> anyhow::Result where - RF: for<'s> FnOnce( - &'s ReadOnlyTable<'s, RecordsByKeyId<'static>, ()>, - ) -> Result, ()>, StorageError>, + RF: FnOnce( + &ReadOnlyTable, ()>, + ) -> Result, ()>, StorageError>, { - let inner = RecordsByKeyRangeInner::try_new( - db.begin_read()?, - |tx| tx.open_table(RECORDS_TABLE).map_err(anyhow_err), - |tx| tx.open_table(RECORDS_BY_KEY_TABLE).map_err(anyhow_err), - |table| range_fn(table).map_err(Into::into), - )?; - Ok(Self(inner)) + let read_tx = db.begin_read()?; + let records_table = read_tx.open_table(RECORDS_TABLE).map_err(anyhow_err)?; + let by_key_table = read_tx + .open_table(RECORDS_BY_KEY_TABLE) + .map_err(anyhow_err)?; + let by_key_range = range_fn(&by_key_table).map_err(anyhow_err)?; + Ok(Self { + read_tx, + records_table, + by_key_table, + by_key_range, + }) } - pub fn with_bounds(db: &'a Arc, bounds: ByKeyBounds) -> anyhow::Result { + pub fn with_bounds(db: &Arc, bounds: ByKeyBounds) -> anyhow::Result { Self::new(db, |table| table.range(bounds.as_ref())) } @@ -245,31 +213,29 @@ impl<'a> RecordsByKeyRange<'a> { direction: &SortDirection, filter: impl for<'x> Fn(RecordsByKeyId<'x>) -> bool, ) -> Option> { - self.0.with_mut(|fields| { - let by_key_id = loop { - let next = match direction { - SortDirection::Asc => fields.by_key_range.next(), - SortDirection::Desc => fields.by_key_range.next_back(), - }; - match next { - Some(Ok(res)) => match filter(res.0.value()) { - false => continue, - true => break res.0, - }, - Some(Err(err)) => return Some(Err(err.into())), - None => return None, - } + let by_key_id = loop { + let next = match direction { + SortDirection::Asc => self.by_key_range.next(), + SortDirection::Desc => self.by_key_range.next_back(), }; - - let (namespace, key, author) = by_key_id.value(); - let records_id = (namespace, author, key); - let entry = fields.records_table.get(&records_id); - match entry { - Ok(Some(entry)) => Some(Ok(into_entry(records_id, entry.value()))), - Ok(None) => None, - Err(err) => Some(Err(err.into())), + match next { + Some(Ok(res)) => match filter(res.0.value()) { + false => continue, + true => break res.0, + }, + Some(Err(err)) => return Some(Err(err.into())), + None => return None, } - }) + }; + + let (namespace, key, author) = by_key_id.value(); + let records_id = (namespace, author, key); + let entry = self.records_table.get(&records_id); + match entry { + Ok(Some(entry)) => Some(Ok(into_entry(records_id, entry.value()))), + Ok(None) => None, + Err(err) => Some(Err(err.into())), + } } } From 5e48fcfc68ecff5b42a7c027165f1f954ca0a437 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 25 Mar 2024 12:31:55 +0100 Subject: [PATCH 03/15] simplify code thanks to redb v2 --- iroh-sync/src/store/fs.rs | 75 ++++++----- iroh-sync/src/store/fs/query.rs | 10 +- iroh-sync/src/store/fs/ranges.rs | 221 ++++++++++--------------------- 3 files changed, 116 insertions(+), 190 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 3dd75f03d7..b72c1a63c4 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -16,8 +16,8 @@ use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_base::hash::Hash; use parking_lot::RwLock; use redb::{ - Database, MultimapTableDefinition, ReadOnlyTable, ReadableMultimapTable, ReadableTable, - ReadableTableMetadata, TableDefinition, + Database, MultimapTableDefinition, ReadOnlyTable, ReadTransaction, ReadableMultimapTable, + ReadableTable, ReadableTableMetadata, TableDefinition, }; use crate::{ @@ -38,9 +38,11 @@ mod migrations; mod query; mod ranges; -use self::bounds::{ByKeyBounds, RecordsBounds}; use self::query::QueryIterator; -use self::ranges::{TableRange, TableReader}; +use self::{ + bounds::{ByKeyBounds, RecordsBounds}, + ranges::RangeExt, +}; pub use self::ranges::RecordsRange; @@ -295,7 +297,8 @@ impl super::Store for Store { namespace: NamespaceId, query: impl Into, ) -> Result> { - QueryIterator::new(&self.db, namespace, query.into()) + let read_tx = self.db.begin_read()?; + QueryIterator::new(&read_tx, namespace, query.into()) } fn get_exact( @@ -311,11 +314,13 @@ impl super::Store for Store { } fn content_hashes(&self) -> Result> { - ContentHashesIterator::new(&self.db) + let tx = self.db.begin_read()?; + ContentHashesIterator::new(&tx) } fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result> { - LatestIterator::new(&self.db, namespace) + let tx = self.db.begin_read()?; + LatestIterator::new(&tx, namespace) } fn register_useful_peer(&self, namespace: NamespaceId, peer: crate::PeerIdBytes) -> Result<()> { @@ -582,12 +587,13 @@ impl crate::ranger::Store for StoreInstance { } fn get_range(&self, range: Range) -> Result> { + let read_tx = self.store.db.begin_read()?; let iter = match range.x().cmp(range.y()) { // identity range: iter1 = all, iter2 = none Ordering::Equal => { // iterator for all entries in replica let bounds = RecordsBounds::namespace(self.namespace); - let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + let iter = RecordsRange::with_bounds(&read_tx, bounds)?; chain_none(iter) } // regular range: iter1 = x <= t < y, iter2 = none @@ -596,7 +602,7 @@ impl crate::ranger::Store for StoreInstance { let start = Bound::Included(range.x().to_byte_tuple()); let end = Bound::Excluded(range.y().to_byte_tuple()); let bounds = RecordsBounds::new(start, end); - let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + let iter = RecordsRange::with_bounds(&read_tx, bounds)?; chain_none(iter) } // split range: iter1 = start <= t < y, iter2 = x <= t <= end @@ -604,12 +610,12 @@ impl crate::ranger::Store for StoreInstance { // iterator for entries from start to range.y let end = Bound::Excluded(range.y().to_byte_tuple()); let bounds = RecordsBounds::from_start(&self.namespace, end); - let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + let iter = RecordsRange::with_bounds(&read_tx, bounds)?; // iterator for entries from range.x to end let start = Bound::Included(range.x().to_byte_tuple()); let bounds = RecordsBounds::to_end(&self.namespace, start); - let iter2 = RecordsRange::with_bounds(&self.store.db, bounds)?; + let iter2 = RecordsRange::with_bounds(&read_tx, bounds)?; iter.chain(Some(iter2).into_iter().flatten()) } @@ -636,8 +642,9 @@ impl crate::ranger::Store for StoreInstance { } fn all(&self) -> Result> { + let read_tx = self.store.db.begin_read()?; let bounds = RecordsBounds::namespace(self.namespace); - let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + let iter = RecordsRange::with_bounds(&read_tx, bounds)?; Ok(chain_none(iter)) } @@ -651,8 +658,9 @@ impl crate::ranger::Store for StoreInstance { } fn prefixed_by(&self, id: &RecordIdentifier) -> Result> { + let read_tx = self.store.db.begin_read()?; let bounds = RecordsBounds::author_prefix(id.namespace(), id.author(), id.key_bytes()); - let iter = RecordsRange::with_bounds(&self.store.db, bounds)?; + let iter = RecordsRange::with_bounds(&read_tx, bounds)?; Ok(chain_none(iter)) } @@ -689,7 +697,7 @@ fn chain_none<'a, I: Iterator + 'a, T>( /// is a prefix of the key passed to the iterator. #[derive(Debug)] pub struct ParentIterator { - reader: TableReader, RecordsValue<'static>>, + table: ReadOnlyTable, RecordsValue<'static>>, namespace: NamespaceId, author: AuthorId, key: Vec, @@ -702,9 +710,10 @@ impl ParentIterator { author: AuthorId, key: Vec, ) -> anyhow::Result { - let reader = TableReader::new(db, |tx| tx.open_table(RECORDS_TABLE))?; + let tx = db.begin_read()?; + let table = tx.open_table(RECORDS_TABLE)?; Ok(Self { - reader, + table, namespace, author, key, @@ -716,9 +725,8 @@ impl Iterator for ParentIterator { type Item = Result; fn next(&mut self) -> Option { - let records_table = self.reader.table(); while !self.key.is_empty() { - let entry = get_exact(records_table, self.namespace, self.author, &self.key, false); + let entry = get_exact(&self.table, self.namespace, self.author, &self.key, false); self.key.pop(); match entry { Err(err) => return Some(Err(err)), @@ -735,8 +743,8 @@ impl Iterator for ParentIterator { pub struct ContentHashesIterator(RecordsRange); impl ContentHashesIterator { - fn new(db: &Arc) -> anyhow::Result { - let range = RecordsRange::new(db, |table| table.range::>(..))?; + fn new(tx: &ReadTransaction) -> anyhow::Result { + let range = RecordsRange::all(&tx)?; Ok(Self(range)) } } @@ -745,7 +753,7 @@ impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { - self.0.next_mapped(|_key, value| { + self.0.next_map(|_key, value| { let (_timestamp, _namespace_sig, _author_sig, _len, hash) = value; Hash::from(hash) }) @@ -753,20 +761,19 @@ impl Iterator for ContentHashesIterator { } /// Iterator over the latest entry per author. -#[derive(Debug)] -pub struct LatestIterator(TableRange, LatestPerAuthorValue<'static>>); +#[derive(derive_more::Debug)] +#[debug("LatestIterator")] +pub struct LatestIterator( + redb::Range<'static, LatestPerAuthorKey<'static>, LatestPerAuthorValue<'static>>, +); impl LatestIterator { - fn new(db: &Arc, namespace: NamespaceId) -> anyhow::Result { - Ok(Self(TableRange::new( - db, - |tx| tx.open_table(LATEST_PER_AUTHOR_TABLE), - |table| { - let start = (namespace.as_bytes(), &[u8::MIN; 32]); - let end = (namespace.as_bytes(), &[u8::MAX; 32]); - table.range(start..=end) - }, - )?)) + fn new(read_tx: &ReadTransaction, namespace: NamespaceId) -> anyhow::Result { + let start = (namespace.as_bytes(), &[u8::MIN; 32]); + let end = (namespace.as_bytes(), &[u8::MAX; 32]); + let table = read_tx.open_table(LATEST_PER_AUTHOR_TABLE)?; + let range = table.range(start..=end)?; + Ok(Self(range)) } } @@ -774,7 +781,7 @@ impl Iterator for LatestIterator { type Item = Result<(AuthorId, u64, Vec)>; fn next(&mut self) -> Option { - self.0.next_mapped(|key, value| { + self.0.next_map(|key, value| { let (_namespace, author) = key; let (timestamp, key) = value; (author.into(), timestamp, key.to_vec()) diff --git a/iroh-sync/src/store/fs/query.rs b/iroh-sync/src/store/fs/query.rs index 7cd37ffdd6..8210fcbb3e 100644 --- a/iroh-sync/src/store/fs/query.rs +++ b/iroh-sync/src/store/fs/query.rs @@ -1,8 +1,6 @@ -use std::sync::Arc; - use anyhow::Result; use iroh_base::hash::Hash; -use redb::Database; +use redb::{ReadTransaction}; use crate::{ store::{ @@ -41,7 +39,7 @@ enum QueryRange { } impl QueryIterator { - pub fn new(db: &Arc, namespace: NamespaceId, query: Query) -> Result { + pub fn new(read_tx: &ReadTransaction, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -55,7 +53,7 @@ impl QueryIterator { // no author set => full table scan with the provided key filter AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), }; - let range = RecordsRange::with_bounds(db, bounds)?; + let range = RecordsRange::with_bounds(read_tx, bounds)?; QueryRange::AuthorKey { range, key_filter: filter, @@ -67,7 +65,7 @@ impl QueryIterator { latest_per_key, } => { let bounds = ByKeyBounds::new(namespace, &range); - let range = RecordsByKeyRange::with_bounds(db, bounds)?; + let range = RecordsByKeyRange::with_bounds(read_tx, bounds)?; let selector = latest_per_key.then(LatestPerKeySelector::default); QueryRange::KeyAuthor { author_filter, diff --git a/iroh-sync/src/store/fs/ranges.rs b/iroh-sync/src/store/fs/ranges.rs index b0be1c1e6b..af295bb28c 100644 --- a/iroh-sync/src/store/fs/ranges.rs +++ b/iroh-sync/src/store/fs/ranges.rs @@ -1,16 +1,6 @@ -//! Ranges on [`redb`] tables -//! -//! Because the [`redb`] types all contain references, this uses [`ouroboros`] to create -//! self-referential structs so that we can embed the [`Range`] iterator together with the -//! [`ReadableTable`] and the [`ReadTransaction`] in a struct for our iterators returned from the -//! store. - -use std::{fmt, sync::Arc}; - -use redb::{ - Database, Key as RedbKey, Range, ReadOnlyTable, ReadTransaction, ReadableTable, StorageError, - TableError, Value as RedbValue, -}; +//! Ranges and helpers for working with [`redb`] tables + +use redb::{Key, Range, ReadOnlyTable, ReadTransaction, Value}; use crate::{store::SortDirection, SignedEntry}; @@ -19,130 +9,84 @@ use super::{ into_entry, RecordsByKeyId, RecordsId, RecordsValue, RECORDS_BY_KEY_TABLE, RECORDS_TABLE, }; -/// A [`ReadTransaction`] with a [`ReadOnlyTable`] that can be stored in a struct. -/// -/// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`] and a [`ReadOnlyTable`] -/// with self-referencing. -pub struct TableReader { - read_tx: ReadTransaction, - table: ReadOnlyTable, -} - -impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableReader { - /// Create a new [`TableReader`] - pub fn new( - db: &'a Arc, - table_fn: impl FnOnce(&ReadTransaction) -> Result, TableError>, - ) -> anyhow::Result { - let read_tx = db.begin_read()?; - let table = table_fn(&read_tx).map_err(anyhow::Error::from)?; - Ok(Self { read_tx, table }) - } +/// An extension trait for [`Range`] that provides methods for mapped retrieval. +pub trait RangeExt { + /// Get the next entry and map the item with a callback function. + fn next_map( + &mut self, + map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, + ) -> Option>; - /// Get a reference to the [`ReadOnlyTable`]; - pub fn table(&self) -> &ReadOnlyTable { - &self.table - } -} + /// Get the next entry, but only if the callback function returns Some, otherwise continue. + /// + /// With `direction` the range can be either process in forward or backward direction. + fn next_filter_map( + &mut self, + direction: &SortDirection, + filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option, + ) -> Option>; -impl fmt::Debug for TableReader { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "TableReader({:?})", self.table()) + /// Like [`Self::next_filter_map`], but the callback returns a `Result`, and the result is + /// flattened with the result from the range operation. + fn next_try_filter_map( + &mut self, + direction: &SortDirection, + filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option>, + ) -> Option> { + Some(self.next_filter_map(direction, filter_map)?.and_then(|r| r)) } } -/// A range reader for a [`ReadOnlyTable`] that can be stored in a struct. -/// -/// This uses [`ouroboros::self_referencing`] to store a [`ReadTransaction`], a [`ReadOnlyTable`] -/// and a [`Range`] together. Useful to build iterators with. -pub struct TableRange { - read_tx: ReadTransaction, - table: ReadOnlyTable, - range: Range<'static, K, V>, -} - -impl TableRange { - /// Create a new [`TableReader`] - pub fn new(db: &Arc, table_fn: TF, range_fn: RF) -> anyhow::Result - where - TF: FnOnce(&ReadTransaction) -> Result, TableError>, - RF: FnOnce(&ReadOnlyTable) -> Result, StorageError>, - { - let read_tx = db.begin_read()?; - let table = table_fn(&read_tx).map_err(anyhow_err)?; - let range = range_fn(&table).map_err(anyhow_err)?; - Ok(Self { - read_tx, - table, - range, - }) - } - - /// Get a reference to the [`ReadOnlyTable`]; - pub fn table(&self) -> &ReadOnlyTable { - &self.table - } - - pub fn next_mapped( +impl RangeExt for Range<'static, K, V> { + fn next_map( &mut self, map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, ) -> Option> { - self.range - .next() + self.next() .map(|r| r.map_err(Into::into).map(|r| map(r.0.value(), r.1.value()))) } - pub fn next_filtered( + fn next_filter_map( &mut self, direction: &SortDirection, - filter: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> bool, - map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> T, + filter_map: impl for<'x> Fn(K::SelfType<'x>, V::SelfType<'x>) -> Option, ) -> Option> { loop { let next = match direction { - SortDirection::Asc => self.range.next(), - SortDirection::Desc => self.range.next_back(), + SortDirection::Asc => self.next(), + SortDirection::Desc => self.next_back(), }; match next { None => break None, Some(Err(err)) => break Some(Err(err.into())), - Some(Ok(res)) => match filter(res.0.value(), res.1.value()) { - false => continue, - true => break Some(Ok(map(res.0.value(), res.1.value()))), + Some(Ok(res)) => match filter_map(res.0.value(), res.1.value()) { + None => continue, + Some(item) => break Some(Ok(item)), }, } } } } -impl fmt::Debug for TableRange { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "TableRangeReader({:?})", self.table()) - } -} - /// An iterator over a range of entries from the records table. -#[derive(Debug)] -pub struct RecordsRange(TableRange, RecordsValue<'static>>); +#[derive(derive_more::Debug)] +#[debug("RecordsRange")] +pub struct RecordsRange(Range<'static, RecordsId<'static>, RecordsValue<'static>>); + impl RecordsRange { - pub(super) fn new(db: &Arc, range_fn: RF) -> anyhow::Result - where - RF: FnOnce( - &ReadOnlyTable, RecordsValue<'static>>, - ) -> Result< - Range<'static, RecordsId<'static>, RecordsValue<'static>>, - StorageError, - >, - { - Ok(Self(TableRange::new( - db, - |tx| tx.open_table(RECORDS_TABLE), - range_fn, - )?)) + pub(super) fn all(read_tx: &ReadTransaction) -> anyhow::Result { + let table = read_tx.open_table(RECORDS_TABLE)?; + let range = table.range::>(..)?; + Ok(Self(range)) } - pub(super) fn with_bounds(db: &Arc, bounds: RecordsBounds) -> anyhow::Result { - Self::new(db, |table| table.range(bounds.as_ref())) + pub(super) fn with_bounds( + read_tx: &ReadTransaction, + bounds: RecordsBounds, + ) -> anyhow::Result { + let table = read_tx.open_table(RECORDS_TABLE)?; + let range = table.range(bounds.as_ref())?; + Ok(Self(range)) } /// Get the next item in the range. @@ -153,89 +97,66 @@ impl RecordsRange { direction: &SortDirection, filter: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> bool, ) -> Option> { - self.0.next_filtered(direction, filter, into_entry) + self.0 + .next_filter_map(direction, |k, v| filter(k, v).then(|| into_entry(k, v))) } - pub(super) fn next_mapped( + pub(super) fn next_map( &mut self, map: impl for<'x> Fn(RecordsId<'x>, RecordsValue<'x>) -> T, ) -> Option> { - self.0.next_mapped(map) + self.0.next_map(map) } } impl Iterator for RecordsRange { type Item = anyhow::Result; fn next(&mut self) -> Option { - self.0.next_mapped(into_entry) + self.0.next_map(into_entry) } } #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] pub struct RecordsByKeyRange { - read_tx: ReadTransaction, records_table: ReadOnlyTable, RecordsValue<'static>>, - by_key_table: ReadOnlyTable, ()>, by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } impl RecordsByKeyRange { - pub fn new(db: &Arc, range_fn: RF) -> anyhow::Result - where - RF: FnOnce( - &ReadOnlyTable, ()>, - ) -> Result, ()>, StorageError>, - { - let read_tx = db.begin_read()?; + pub fn with_bounds(read_tx: &ReadTransaction, bounds: ByKeyBounds) -> anyhow::Result { let records_table = read_tx.open_table(RECORDS_TABLE).map_err(anyhow_err)?; let by_key_table = read_tx .open_table(RECORDS_BY_KEY_TABLE) .map_err(anyhow_err)?; - let by_key_range = range_fn(&by_key_table).map_err(anyhow_err)?; + let by_key_range = by_key_table.range(bounds.as_ref())?; Ok(Self { - read_tx, records_table, - by_key_table, by_key_range, }) } - pub fn with_bounds(db: &Arc, bounds: ByKeyBounds) -> anyhow::Result { - Self::new(db, |table| table.range(bounds.as_ref())) - } - /// Get the next item in the range. /// - /// Omit items for which the `matcher` function returns false. + /// Omit items for which the `filter` function returns false. pub fn next_filtered( &mut self, direction: &SortDirection, filter: impl for<'x> Fn(RecordsByKeyId<'x>) -> bool, ) -> Option> { - let by_key_id = loop { - let next = match direction { - SortDirection::Asc => self.by_key_range.next(), - SortDirection::Desc => self.by_key_range.next_back(), + let entry = self.by_key_range.next_try_filter_map(direction, |k, _v| { + if !filter(k) { + return None; }; - match next { - Some(Ok(res)) => match filter(res.0.value()) { - false => continue, - true => break res.0, - }, - Some(Err(err)) => return Some(Err(err.into())), - None => return None, - } - }; - - let (namespace, key, author) = by_key_id.value(); - let records_id = (namespace, author, key); - let entry = self.records_table.get(&records_id); - match entry { - Ok(Some(entry)) => Some(Ok(into_entry(records_id, entry.value()))), - Ok(None) => None, - Err(err) => Some(Err(err.into())), - } + let (namespace, key, author) = k; + let records_id = (namespace, author, key); + let entry = self.records_table.get(&records_id).transpose()?; + let entry = entry + .map(|value| into_entry(records_id, value.value())) + .map_err(anyhow::Error::from); + Some(entry) + }); + entry } } From 9667f533a3a0d383c07632841a6eb4ca5b63ad5f Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 25 Mar 2024 12:34:32 +0100 Subject: [PATCH 04/15] chore: fmt & clippy --- iroh-sync/src/store/fs.rs | 2 +- iroh-sync/src/store/fs/query.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index b72c1a63c4..35e22c55cd 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -744,7 +744,7 @@ pub struct ContentHashesIterator(RecordsRange); impl ContentHashesIterator { fn new(tx: &ReadTransaction) -> anyhow::Result { - let range = RecordsRange::all(&tx)?; + let range = RecordsRange::all(tx)?; Ok(Self(range)) } } diff --git a/iroh-sync/src/store/fs/query.rs b/iroh-sync/src/store/fs/query.rs index 8210fcbb3e..85e454b233 100644 --- a/iroh-sync/src/store/fs/query.rs +++ b/iroh-sync/src/store/fs/query.rs @@ -1,6 +1,6 @@ use anyhow::Result; use iroh_base::hash::Hash; -use redb::{ReadTransaction}; +use redb::ReadTransaction; use crate::{ store::{ From 2fbc79ba2b9bf6457a9fe79cd4615de537c1bf41 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 25 Mar 2024 12:38:14 +0100 Subject: [PATCH 05/15] cleanup --- iroh-sync/src/store/fs.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 35e22c55cd..2a61ed29cb 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -314,8 +314,8 @@ impl super::Store for Store { } fn content_hashes(&self) -> Result> { - let tx = self.db.begin_read()?; - ContentHashesIterator::new(&tx) + let read_tx = self.db.begin_read()?; + ContentHashesIterator::new(&read_tx) } fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result> { @@ -649,6 +649,7 @@ impl crate::ranger::Store for StoreInstance { } fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { + let read_tx = self.store.db.begin_read()?; ParentIterator::new( &self.store.db, id.namespace(), @@ -705,12 +706,11 @@ pub struct ParentIterator { impl ParentIterator { fn new( - db: &Arc, + tx: &ReadTransaction, namespace: NamespaceId, author: AuthorId, key: Vec, ) -> anyhow::Result { - let tx = db.begin_read()?; let table = tx.open_table(RECORDS_TABLE)?; Ok(Self { table, From 04400dfd1845f7dfa22bcf2dbc23878a9ea29bd9 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Mon, 25 Mar 2024 12:55:38 +0100 Subject: [PATCH 06/15] fixup --- iroh-sync/src/store/fs.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 2a61ed29cb..a0e447cb85 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -650,12 +650,7 @@ impl crate::ranger::Store for StoreInstance { fn prefixes_of(&self, id: &RecordIdentifier) -> Result, Self::Error> { let read_tx = self.store.db.begin_read()?; - ParentIterator::new( - &self.store.db, - id.namespace(), - id.author(), - id.key().to_vec(), - ) + ParentIterator::new(&read_tx, id.namespace(), id.author(), id.key().to_vec()) } fn prefixed_by(&self, id: &RecordIdentifier) -> Result> { From e78886a92aa763182f702590529b098d7af93436 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 00:32:58 +0100 Subject: [PATCH 07/15] feat: iroh-sync migration for redb v1 to v2 --- Cargo.lock | 16 ++- iroh-bytes/src/store/fs/tables.rs | 2 +- iroh-sync/Cargo.toml | 6 +- iroh-sync/src/store/fs.rs | 11 +- iroh-sync/src/store/fs/migrate_v1_v2.rs | 141 ++++++++++++++++++++++++ 5 files changed, 167 insertions(+), 9 deletions(-) create mode 100644 iroh-sync/src/store/fs/migrate_v1_v2.rs diff --git a/Cargo.lock b/Cargo.lock index 8e21e7cec3..58e351627e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2343,7 +2343,7 @@ dependencies = [ "proptest", "rand", "rand_core", - "redb", + "redb 2.0.0", "serde", "serde-error", "serde_json", @@ -2398,7 +2398,7 @@ dependencies = [ "rand", "range-collections", "rcgen 0.12.1", - "redb", + "redb 2.0.0", "reflink-copy", "rustls", "self_cell", @@ -2659,7 +2659,8 @@ dependencies = [ "rand", "rand_chacha", "rand_core", - "redb", + "redb 1.5.1", + "redb 2.0.0", "serde", "strum 0.25.0", "tempfile", @@ -3996,6 +3997,15 @@ dependencies = [ "yasna", ] +[[package]] +name = "redb" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7f82ecd6ba647a39dd1a7172b8a1cd9453c0adee6da20cb553d83a9a460fa5" +dependencies = [ + "libc", +] + [[package]] name = "redb" version = "2.0.0" diff --git a/iroh-bytes/src/store/fs/tables.rs b/iroh-bytes/src/store/fs/tables.rs index ef6aee951a..d458f2918b 100644 --- a/iroh-bytes/src/store/fs/tables.rs +++ b/iroh-bytes/src/store/fs/tables.rs @@ -142,7 +142,7 @@ impl DeleteSet { /// Errors will just be logged. pub fn apply_and_clear(&mut self, options: &PathOptions) { for (hash, to_delete) in &self.0 { - tracing::info!("deleting {:?}", to_delete); + tracing::info!("deleting {:?} for {hash}", to_delete); let path = match to_delete { BaoFilePart::Data => options.owned_data_path(hash), BaoFilePart::Outboard => options.owned_outboard_path(hash), diff --git a/iroh-sync/Cargo.toml b/iroh-sync/Cargo.toml index a3958e7466..aa82cb178d 100644 --- a/iroh-sync/Cargo.toml +++ b/iroh-sync/Cargo.toml @@ -40,6 +40,8 @@ tokio = { version = "1", features = ["sync"] } # fs-store redb = { version = "2.0.0", optional = true } +redb_v1 = { package = "redb", version = "1.5.1", optional = true } +tempfile = { version = "3.4", optional = true } # net iroh-net = { version = "0.12.0", optional = true, path = "../iroh-net" } @@ -53,12 +55,12 @@ lru = "0.12" iroh-test = { path = "../iroh-test" } rand_chacha = "0.3.1" tokio = { version = "1", features = ["sync", "macros"] } -tempfile = "3.4" proptest = "1.2.0" +tempfile = "3.4" test-strategy = "0.3.1" [features] default = ["net", "fs-store", "metrics"] net = ["iroh-net", "tokio/io-util", "tokio-stream", "tokio-util", "quinn", "futures"] -fs-store = ["redb"] +fs-store = ["redb", "redb_v1", "tempfile"] metrics = ["iroh-metrics"] diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index a0e447cb85..8847a8a86c 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -16,8 +16,8 @@ use ed25519_dalek::{SignatureError, VerifyingKey}; use iroh_base::hash::Hash; use parking_lot::RwLock; use redb::{ - Database, MultimapTableDefinition, ReadOnlyTable, ReadTransaction, ReadableMultimapTable, - ReadableTable, ReadableTableMetadata, TableDefinition, + Database, DatabaseError, MultimapTableDefinition, ReadOnlyTable, ReadTransaction, + ReadableMultimapTable, ReadableTable, ReadableTableMetadata, TableDefinition, }; use crate::{ @@ -34,6 +34,7 @@ use super::{ }; mod bounds; +mod migrate_v1_v2; mod migrations; mod query; mod ranges; @@ -122,7 +123,11 @@ impl Store { /// /// The file will be created if it does not exist, otherwise it will be opened. pub fn new(path: impl AsRef) -> Result { - let db = Database::create(path)?; + let db = match Database::create(&path) { + Ok(db) => db, + Err(DatabaseError::UpgradeRequired(1)) => migrate_v1_v2::run(&path)?, + Err(err) => return Err(err.into()), + }; // Setup all tables let write_tx = db.begin_write()?; diff --git a/iroh-sync/src/store/fs/migrate_v1_v2.rs b/iroh-sync/src/store/fs/migrate_v1_v2.rs new file mode 100644 index 0000000000..c0b3914172 --- /dev/null +++ b/iroh-sync/src/store/fs/migrate_v1_v2.rs @@ -0,0 +1,141 @@ +use std::path::Path; + +use anyhow::{Context, Result}; +use redb::{MultimapTableHandle, TableHandle}; +use redb_v1::{ReadableMultimapTable, ReadableTable}; +use tempfile::NamedTempFile; +use tracing::info; + +macro_rules! migrate_table { + ($rtx:expr, $wtx:expr, $old:expr, $new:expr) => {{ + let old_table = $rtx.open_table($old)?; + let mut new_table = $wtx.open_table($new)?; + let name = $new.name(); + let len = old_table.len()?; + info!("migrate {name} ({len} rows).."); + let ind = (len as usize / 1000) + 1; + for (i, entry) in old_table.iter()?.enumerate() { + let (key, value) = entry?; + let key = key.value(); + let value = value.value(); + if i > 0 && i % 100 == 0 { + info!(" {name} {i:>ind$}/{len}"); + } + new_table.insert(key, value)?; + } + info!("migrate {name} done"); + }}; +} + +macro_rules! migrate_multimap_table { + ($rtx:expr, $wtx:expr, $old:expr, $new:expr) => {{ + let old_table = $rtx.open_multimap_table($old)?; + let mut new_table = $wtx.open_multimap_table($new)?; + let name = $new.name(); + let len = old_table.len()?; + info!("migrate {name} ({len} rows)"); + let ind = (len as usize / 1000) + 1; + for (i, entry) in old_table.iter()?.enumerate() { + let (key, values) = entry?; + let key = key.value(); + if i > 0 && i % 100 == 0 { + info!(" {name} {i:>ind$}/{len}"); + } + for value in values { + let value = value?; + new_table.insert(key, value.value())?; + } + } + info!("migrate {name} done"); + }}; +} + +pub fn run(source: impl AsRef) -> Result { + let source = source.as_ref(); + // create the database to a tempfile + let target = NamedTempFile::new()?; + let target = target.into_temp_path(); + info!("migrate {} to {}", source.display(), target.display()); + let old_db = redb_v1::Database::open(source)?; + let new_db = redb::Database::create(&target)?; + + let rtx = old_db.begin_read()?; + let wtx = new_db.begin_write()?; + + migrate_table!(rtx, wtx, old::AUTHORS_TABLE, new::AUTHORS_TABLE); + migrate_table!(rtx, wtx, old::NAMESPACES_TABLE, new::NAMESPACES_TABLE); + migrate_table!(rtx, wtx, old::RECORDS_TABLE, new::RECORDS_TABLE); + migrate_table!( + rtx, + wtx, + old::LATEST_PER_AUTHOR_TABLE, + new::LATEST_PER_AUTHOR_TABLE + ); + migrate_table!( + rtx, + wtx, + old::RECORDS_BY_KEY_TABLE, + new::RECORDS_BY_KEY_TABLE + ); + migrate_multimap_table!( + rtx, + wtx, + old::NAMESPACE_PEERS_TABLE, + new::NAMESPACE_PEERS_TABLE + ); + migrate_table!( + rtx, + wtx, + old::DOWNLOAD_POLICY_TABLE, + new::DOWNLOAD_POLICY_TABLE + ); + + wtx.commit()?; + drop(rtx); + drop(old_db); + drop(new_db); + + let backup_path = { + let mut file_name = source.file_name().context("must be a file")?.to_owned(); + file_name.push(".backup-redb-v1"); + let mut path = source.to_owned(); + path.set_file_name(file_name); + path + }; + info!("rename {} to {}", source.display(), backup_path.display()); + std::fs::rename(source, &backup_path)?; + info!("rename {} to {}", target.display(), source.display()); + target.persist_noclobber(source)?; + info!("opening migrated database from {}", source.display()); + let db = redb::Database::open(source)?; + Ok(db) +} + +mod new { + pub use super::super::*; +} + +mod old { + use redb_v1::{MultimapTableDefinition, TableDefinition}; + + use crate::PeerIdBytes; + + use super::new::{ + LatestPerAuthorKey, LatestPerAuthorValue, Nanos, RecordsByKeyId, RecordsId, RecordsValue, + }; + + pub const AUTHORS_TABLE: TableDefinition<&[u8; 32], &[u8; 32]> = + TableDefinition::new("authors-1"); + pub const NAMESPACES_TABLE: TableDefinition<&[u8; 32], (u8, &[u8; 32])> = + TableDefinition::new("namespaces-2"); + pub const RECORDS_TABLE: TableDefinition = + TableDefinition::new("records-1"); + pub const LATEST_PER_AUTHOR_TABLE: TableDefinition = + TableDefinition::new("latest-by-author-1"); + pub const RECORDS_BY_KEY_TABLE: TableDefinition = + TableDefinition::new("records-by-key-1"); + pub const NAMESPACE_PEERS_TABLE: MultimapTableDefinition<&[u8; 32], (Nanos, &PeerIdBytes)> = + MultimapTableDefinition::new("sync-peers-1"); + pub const DOWNLOAD_POLICY_TABLE: TableDefinition<&[u8; 32], &[u8]> = + TableDefinition::new("download-policy-1"); +} From 1ff162d1730cffb60a8ba416bcd13ca708fb5314 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 00:41:52 +0100 Subject: [PATCH 08/15] fix after merge --- iroh-sync/src/store/fs.rs | 36 ++++++++++++++---------------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index 0e5766120f..bd410e65fb 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -167,12 +167,12 @@ impl Store { } type Instance = StoreInstance; -type GetIter<'a> = QueryIterator<'a>; -type ContentHashesIter<'a> = ContentHashesIterator<'a>; -type LatestIter<'a> = LatestIterator<'a>; -type AuthorsIter<'a> = std::vec::IntoIter>; -type NamespaceIter<'a> = std::vec::IntoIter>; -type PeersIter<'a> = std::vec::IntoIter; +type GetIter = QueryIterator; +type ContentHashesIter = ContentHashesIterator; +type LatestIter = LatestIterator; +type AuthorsIter = std::vec::IntoIter>; +type NamespaceIter = std::vec::IntoIter>; +type PeersIter = std::vec::IntoIter; impl Store { /// Create a new replica for `namespace` and persist in this store. @@ -246,7 +246,7 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&self) -> Result> { + pub fn list_namespaces(&self) -> Result { // TODO: avoid collect let read_tx = self.db.begin_read()?; let namespace_table = read_tx.open_table(NAMESPACES_TABLE)?; @@ -284,7 +284,7 @@ impl Store { } /// List all author keys in this store. - pub fn list_authors(&self) -> Result> { + pub fn list_authors(&self) -> Result { // TODO: avoid collect let read_tx = self.db.begin_read()?; let authors_table = read_tx.open_table(AUTHORS_TABLE)?; @@ -364,7 +364,7 @@ impl Store { } /// Get an iterator over entries of a replica. - pub fn get_many(&self, namespace: NamespaceId, query: impl Into) -> Result> { + pub fn get_many(&self, namespace: NamespaceId, query: impl Into) -> Result { let read_tx = self.db.begin_read()?; QueryIterator::new(&read_tx, namespace, query.into()) } @@ -382,26 +382,18 @@ impl Store { get_exact(&record_table, namespace, author, key, include_empty) } - fn content_hashes(&self) -> Result> { + /// Get all content hashes of all replicas in the store. + pub fn content_hashes(&self) -> Result { let read_tx = self.db.begin_read()?; ContentHashesIterator::new(&read_tx) } - fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result> { + /// Get the latest entry for each author in a namespace. + pub fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result { let tx = self.db.begin_read()?; LatestIterator::new(&tx, namespace) } - /// Get all content hashes of all replicas in the store. - pub fn content_hashes(&self) -> Result> { - ContentHashesIterator::new(&self.db) - } - - /// Get the latest entry for each author in a namespace. - pub fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result> { - LatestIterator::new(&self.db, namespace) - } - /// Register a peer that has been useful to sync a document. pub fn register_useful_peer( &self, @@ -486,7 +478,7 @@ impl Store { } /// Get the peers that have been useful for a document. - pub fn get_sync_peers(&self, namespace: &NamespaceId) -> Result>> { + pub fn get_sync_peers(&self, namespace: &NamespaceId) -> Result> { let read_tx = self.db.begin_read()?; let peers_table = read_tx.open_multimap_table(NAMESPACE_PEERS_TABLE)?; let mut peers = Vec::with_capacity(super::PEERS_PER_DOC_CACHE_SIZE.get()); From 69a2f0cae54ad20c4f3f65d3b4c718a2599e0441 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 00:43:01 +0100 Subject: [PATCH 09/15] cleanup --- iroh-sync/src/store/fs.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index bd410e65fb..cd3c94abd2 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -166,17 +166,13 @@ impl Store { } } -type Instance = StoreInstance; -type GetIter = QueryIterator; -type ContentHashesIter = ContentHashesIterator; -type LatestIter = LatestIterator; type AuthorsIter = std::vec::IntoIter>; type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { /// Create a new replica for `namespace` and persist in this store. - pub fn new_replica(&self, namespace: NamespaceSecret) -> Result> { + pub fn new_replica(&self, namespace: NamespaceSecret) -> Result> { let id = namespace.id(); self.import_namespace(namespace.into())?; self.open_replica(&id).map_err(Into::into) @@ -364,7 +360,7 @@ impl Store { } /// Get an iterator over entries of a replica. - pub fn get_many(&self, namespace: NamespaceId, query: impl Into) -> Result { + pub fn get_many(&self, namespace: NamespaceId, query: impl Into) -> Result { let read_tx = self.db.begin_read()?; QueryIterator::new(&read_tx, namespace, query.into()) } @@ -383,13 +379,13 @@ impl Store { } /// Get all content hashes of all replicas in the store. - pub fn content_hashes(&self) -> Result { + pub fn content_hashes(&self) -> Result { let read_tx = self.db.begin_read()?; ContentHashesIterator::new(&read_tx) } /// Get the latest entry for each author in a namespace. - pub fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result { + pub fn get_latest_for_each_author(&self, namespace: NamespaceId) -> Result { let tx = self.db.begin_read()?; LatestIterator::new(&tx, namespace) } From 83c04a1728d1ffd43cb418b543c3609c9145ef0c Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 10:04:26 +0100 Subject: [PATCH 10/15] chore: fmt --- iroh-sync/src/store/fs.rs | 6 +++++- iroh-sync/src/store/fs/migrate_v1_v2.rs | 12 +++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/iroh-sync/src/store/fs.rs b/iroh-sync/src/store/fs.rs index cd3c94abd2..f9ad4a53b6 100644 --- a/iroh-sync/src/store/fs.rs +++ b/iroh-sync/src/store/fs.rs @@ -360,7 +360,11 @@ impl Store { } /// Get an iterator over entries of a replica. - pub fn get_many(&self, namespace: NamespaceId, query: impl Into) -> Result { + pub fn get_many( + &self, + namespace: NamespaceId, + query: impl Into, + ) -> Result { let read_tx = self.db.begin_read()?; QueryIterator::new(&read_tx, namespace, query.into()) } diff --git a/iroh-sync/src/store/fs/migrate_v1_v2.rs b/iroh-sync/src/store/fs/migrate_v1_v2.rs index c0b3914172..e06797df1c 100644 --- a/iroh-sync/src/store/fs/migrate_v1_v2.rs +++ b/iroh-sync/src/store/fs/migrate_v1_v2.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; use redb::{MultimapTableHandle, TableHandle}; @@ -95,12 +95,10 @@ pub fn run(source: impl AsRef) -> Result { drop(old_db); drop(new_db); - let backup_path = { - let mut file_name = source.file_name().context("must be a file")?.to_owned(); - file_name.push(".backup-redb-v1"); - let mut path = source.to_owned(); - path.set_file_name(file_name); - path + let backup_path: PathBuf = { + let mut p = source.to_owned().into_os_string(); + p.push(".backup-redb-v1"); + p.into() }; info!("rename {} to {}", source.display(), backup_path.display()); std::fs::rename(source, &backup_path)?; From 0eb43b6ea9ea39ca9f7ef448b9518242c86ab7bb Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 10:05:48 +0100 Subject: [PATCH 11/15] fixup --- iroh-sync/src/store/fs/migrate_v1_v2.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-sync/src/store/fs/migrate_v1_v2.rs b/iroh-sync/src/store/fs/migrate_v1_v2.rs index e06797df1c..92514ed890 100644 --- a/iroh-sync/src/store/fs/migrate_v1_v2.rs +++ b/iroh-sync/src/store/fs/migrate_v1_v2.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; -use anyhow::{Context, Result}; +use anyhow::{Result}; use redb::{MultimapTableHandle, TableHandle}; use redb_v1::{ReadableMultimapTable, ReadableTable}; use tempfile::NamedTempFile; From 37382938e613fa4674eb5d411e97ed236b491f8e Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 10:29:17 +0100 Subject: [PATCH 12/15] chore: fmt --- iroh-sync/src/store/fs/migrate_v1_v2.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh-sync/src/store/fs/migrate_v1_v2.rs b/iroh-sync/src/store/fs/migrate_v1_v2.rs index 92514ed890..6fd07e44f4 100644 --- a/iroh-sync/src/store/fs/migrate_v1_v2.rs +++ b/iroh-sync/src/store/fs/migrate_v1_v2.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; -use anyhow::{Result}; +use anyhow::Result; use redb::{MultimapTableHandle, TableHandle}; use redb_v1::{ReadableMultimapTable, ReadableTable}; use tempfile::NamedTempFile; From b5607713b5d9c9666abe6f865bdb3efe953058f5 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 11:39:01 +0100 Subject: [PATCH 13/15] feat: migrate iroh-bytes from redb v1 to v2 --- Cargo.lock | 1 + iroh-bytes/Cargo.toml | 4 +- iroh-bytes/src/store/fs.rs | 16 +- iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs | 321 ++++++++++++++++++ 4 files changed, 338 insertions(+), 4 deletions(-) create mode 100644 iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs diff --git a/Cargo.lock b/Cargo.lock index a1ecdfea13..f88eba15a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2398,6 +2398,7 @@ dependencies = [ "rand", "range-collections", "rcgen 0.12.1", + "redb 1.5.1", "redb 2.0.0", "reflink-copy", "rustls", diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index 2439d32ae8..34d4928a47 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -39,11 +39,13 @@ quinn = "0.10" rand = "0.8" range-collections = "0.4.0" redb = { version = "2.0.0", optional = true } +redb_v1 = { package = "redb", version = "1.5.1", optional = true } reflink-copy = { version = "0.1.8", optional = true } self_cell = "1.0.1" serde = { version = "1", features = ["derive"] } serde-error = "0.1.2" smallvec = { version = "1.10.0", features = ["serde", "const_new"] } +tempfile = { version = "3.10.0", optional = true } thiserror = "1" tokio = { version = "1", features = ["fs"] } tokio-util = { version = "0.7", features = ["io-util", "io", "rt"] } @@ -64,7 +66,7 @@ tempfile = "3.10.0" [features] default = ["fs-store"] -fs-store = ["reflink-copy", "redb"] +fs-store = ["reflink-copy", "redb", "redb_v1", "tempfile"] downloader = ["iroh-net", "parking_lot", "tokio-util/time"] metrics = ["iroh-metrics"] diff --git a/iroh-bytes/src/store/fs.rs b/iroh-bytes/src/store/fs.rs index 0780054ab5..6d35a6aeb9 100644 --- a/iroh-bytes/src/store/fs.rs +++ b/iroh-bytes/src/store/fs.rs @@ -82,14 +82,15 @@ use futures::{channel::oneshot, Stream, StreamExt}; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; use iroh_io::AsyncSliceReader; -use redb::{AccessGuard, ReadableTable, StorageError}; +use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tokio::io::AsyncWriteExt; use tracing::trace_span; mod import_flat_store; -mod tables; +mod migrate_redb_v1_v2; +pub(self) mod tables; #[doc(hidden)] pub mod test_support; #[cfg(test)] @@ -1218,6 +1219,8 @@ pub(crate) enum ActorError { Io(#[from] io::Error), #[error("inconsistent database state: {0}")] Inconsistent(String), + #[error("error during database migration: {0}")] + Migration(#[source] anyhow::Error), } impl From for io::Error { @@ -1435,7 +1438,14 @@ impl Actor { temp: Arc>, rt: tokio::runtime::Handle, ) -> ActorResult<(Self, flume::Sender)> { - let db = redb::Database::create(path)?; + let db = match redb::Database::create(&path) { + Ok(db) => db, + Err(DatabaseError::UpgradeRequired(1)) => { + migrate_redb_v1_v2::run(&path).map_err(ActorError::Migration)? + } + Err(err) => return Err(err.into()), + }; + let txn = db.begin_write()?; // create tables and drop them just to create them. let mut t = Default::default(); diff --git a/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs b/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs new file mode 100644 index 0000000000..f417f40516 --- /dev/null +++ b/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs @@ -0,0 +1,321 @@ +use std::path::{Path, PathBuf}; + +use anyhow::Result; +use redb_v1::ReadableTable; +use tempfile::NamedTempFile; +use tracing::info; + +pub fn run(source: impl AsRef) -> Result { + let source = source.as_ref(); + // create the database to a tempfile + let target = NamedTempFile::new()?; + let target = target.into_temp_path(); + info!("migrate {} to {}", source.display(), target.display()); + let old_db = redb_v1::Database::open(source)?; + let new_db = redb::Database::create(&target)?; + + let rtx = old_db.begin_read()?; + let wtx = new_db.begin_write()?; + + { + let old_blobs = rtx.open_table(old::BLOBS_TABLE)?; + let mut new_blobs = wtx.open_table(new::BLOBS_TABLE)?; + let len = old_blobs.len()?; + info!("migrate blobs table ({len} rows)"); + for (i, entry) in old_blobs.iter()?.enumerate() { + let (key, value) = entry?; + let key: crate::Hash = key.value().into(); + let value = value.value(); + if i > 0 && i % 100 == 0 { + info!(" row {i:>6} of {len}"); + } + new_blobs.insert(key, value)?; + } + info!("migrate blobs table done"); + let old_tags = rtx.open_table(old::TAGS_TABLE)?; + let mut new_tags = wtx.open_table(new::TAGS_TABLE)?; + let len = old_tags.len()?; + info!("migrate tags table ({len} rows)"); + for (i, entry) in old_tags.iter()?.enumerate() { + let (key, value) = entry?; + let key = key.value(); + let value: crate::HashAndFormat = value.value().into(); + if i > 0 && i % 100 == 0 { + info!(" row {i:>6} of {len}"); + } + new_tags.insert(key, value)?; + } + info!("migrate tags table done"); + let old_inline_data = rtx.open_table(old::INLINE_DATA_TABLE)?; + let mut new_inline_data = wtx.open_table(new::INLINE_DATA_TABLE)?; + let len = old_inline_data.len()?; + info!("migrate inline data table ({len} rows)"); + for (i, entry) in old_inline_data.iter()?.enumerate() { + let (key, value) = entry?; + let key: crate::Hash = key.value().into(); + let value = value.value(); + if i > 0 && i % 100 == 0 { + info!(" row {i:>6} of {len}"); + } + new_inline_data.insert(key, value)?; + } + info!("migrate inline data table done"); + let old_inline_outboard = rtx.open_table(old::INLINE_OUTBOARD_TABLE)?; + let mut new_inline_outboard = wtx.open_table(new::INLINE_OUTBOARD_TABLE)?; + let len = old_inline_outboard.len()?; + info!("migrate inline outboard table ({len} rows)"); + for (i, entry) in old_inline_outboard.iter()?.enumerate() { + let (key, value) = entry?; + let key: crate::Hash = key.value().into(); + let value = value.value(); + if i > 0 && i % 100 == 0 { + info!(" row {i:>6} of {len}"); + } + new_inline_outboard.insert(key, value)?; + } + info!("migrate inline outboard table done"); + } + + wtx.commit()?; + drop(rtx); + drop(old_db); + drop(new_db); + + let backup_path: PathBuf = { + let mut p = source.to_owned().into_os_string(); + p.push(".backup-redb-v1"); + p.into() + }; + info!("rename {} to {}", source.display(), backup_path.display()); + std::fs::rename(source, &backup_path)?; + info!("rename {} to {}", target.display(), source.display()); + target.persist_noclobber(source)?; + info!("opening migrated database from {}", source.display()); + let db = redb::Database::open(source)?; + Ok(db) +} + +mod new { + pub(super) use super::super::tables::*; +} + +mod old { + use super::super::EntryState; + use crate::util::Tag; + use bytes::Bytes; + use iroh_base::hash::BlobFormat; + use postcard::experimental::max_size::MaxSize; + use redb_v1::{RedbKey, RedbValue, TableDefinition, TypeName}; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use smallvec::SmallVec; + + pub const BLOBS_TABLE: TableDefinition = TableDefinition::new("blobs-0"); + + pub const TAGS_TABLE: TableDefinition = TableDefinition::new("tags-0"); + + pub const INLINE_DATA_TABLE: TableDefinition = + TableDefinition::new("inline-data-0"); + + pub const INLINE_OUTBOARD_TABLE: TableDefinition = + TableDefinition::new("inline-outboard-0"); + + impl redb_v1::RedbValue for EntryState { + type SelfType<'a> = EntryState; + + type AsBytes<'a> = SmallVec<[u8; 128]>; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + postcard::from_bytes(data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + postcard::to_extend(value, SmallVec::new()).unwrap() + } + + fn type_name() -> TypeName { + TypeName::new("EntryState") + } + } + + impl RedbValue for HashAndFormat { + type SelfType<'a> = Self; + + type AsBytes<'a> = [u8; Self::POSTCARD_MAX_SIZE]; + + fn fixed_width() -> Option { + Some(Self::POSTCARD_MAX_SIZE) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let t: &'a [u8; Self::POSTCARD_MAX_SIZE] = data.try_into().unwrap(); + postcard::from_bytes(t.as_slice()).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + let mut res = [0u8; 33]; + postcard::to_slice(&value, &mut res).unwrap(); + res + } + + fn type_name() -> TypeName { + TypeName::new("iroh_base::HashAndFormat") + } + } + + impl RedbValue for Tag { + type SelfType<'a> = Self; + + type AsBytes<'a> = bytes::Bytes; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + Self(Bytes::copy_from_slice(data)) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.0.clone() + } + + fn type_name() -> TypeName { + TypeName::new("Tag") + } + } + + impl RedbKey for Tag { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + data1.cmp(data2) + } + } + + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] + pub struct Hash([u8; 32]); + + impl Serialize for Hash { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + self.0.serialize(serializer) + } + } + + impl<'de> Deserialize<'de> for Hash { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let data: [u8; 32] = Deserialize::deserialize(deserializer)?; + Ok(Self(data)) + } + } + + impl MaxSize for Hash { + const POSTCARD_MAX_SIZE: usize = 32; + } + + impl From for crate::Hash { + fn from(value: Hash) -> Self { + value.0.into() + } + } + + impl RedbValue for Hash { + type SelfType<'a> = Self; + + type AsBytes<'a> = &'a [u8; 32]; + + fn fixed_width() -> Option { + Some(32) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let contents: &'a [u8; 32] = data.try_into().unwrap(); + Hash(*contents) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + &value.0 + } + + fn type_name() -> TypeName { + TypeName::new("iroh_base::Hash") + } + } + + impl RedbKey for Hash { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + data1.cmp(data2) + } + } + + /// A hash and format pair + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, MaxSize)] + pub struct HashAndFormat { + /// The hash + pub hash: Hash, + /// The format + pub format: BlobFormat, + } + + impl From for crate::HashAndFormat { + fn from(value: HashAndFormat) -> Self { + crate::HashAndFormat { + hash: value.hash.into(), + format: value.format, + } + } + } + impl Serialize for HashAndFormat { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + (self.hash, self.format).serialize(serializer) + } + } + + impl<'de> Deserialize<'de> for HashAndFormat { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let (hash, format) = <(Hash, BlobFormat)>::deserialize(deserializer)?; + Ok(Self { hash, format }) + } + } +} From 8764f6e7693117a515c216fe00a33ea6df4e1491 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 26 Mar 2024 11:43:09 +0100 Subject: [PATCH 14/15] fixup --- iroh-bytes/src/store/fs.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh-bytes/src/store/fs.rs b/iroh-bytes/src/store/fs.rs index 6d35a6aeb9..c649940abc 100644 --- a/iroh-bytes/src/store/fs.rs +++ b/iroh-bytes/src/store/fs.rs @@ -90,7 +90,7 @@ use tracing::trace_span; mod import_flat_store; mod migrate_redb_v1_v2; -pub(self) mod tables; +mod tables; #[doc(hidden)] pub mod test_support; #[cfg(test)] @@ -1438,10 +1438,10 @@ impl Actor { temp: Arc>, rt: tokio::runtime::Handle, ) -> ActorResult<(Self, flume::Sender)> { - let db = match redb::Database::create(&path) { + let db = match redb::Database::create(path) { Ok(db) => db, Err(DatabaseError::UpgradeRequired(1)) => { - migrate_redb_v1_v2::run(&path).map_err(ActorError::Migration)? + migrate_redb_v1_v2::run(path).map_err(ActorError::Migration)? } Err(err) => return Err(err.into()), }; From 8a677ee47afc09b98397d3a2d92224d78e1df0a9 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 27 Mar 2024 22:17:28 +0100 Subject: [PATCH 15/15] fix: review comments --- iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs | 13 +++++++------ iroh-sync/src/store/fs/migrate_v1_v2.rs | 9 +++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs b/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs index f417f40516..5cdf17d91b 100644 --- a/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs +++ b/iroh-bytes/src/store/fs/migrate_redb_v1_v2.rs @@ -7,8 +7,9 @@ use tracing::info; pub fn run(source: impl AsRef) -> Result { let source = source.as_ref(); - // create the database to a tempfile - let target = NamedTempFile::new()?; + let dir = source.parent().expect("database is not in root"); + // create the new database in a tempfile in the same directory as the old db + let target = NamedTempFile::with_prefix_in("blobs.db.migrate", dir)?; let target = target.into_temp_path(); info!("migrate {} to {}", source.display(), target.display()); let old_db = redb_v1::Database::open(source)?; @@ -26,7 +27,7 @@ pub fn run(source: impl AsRef) -> Result { let (key, value) = entry?; let key: crate::Hash = key.value().into(); let value = value.value(); - if i > 0 && i % 100 == 0 { + if i > 0 && i % 1000 == 0 { info!(" row {i:>6} of {len}"); } new_blobs.insert(key, value)?; @@ -40,7 +41,7 @@ pub fn run(source: impl AsRef) -> Result { let (key, value) = entry?; let key = key.value(); let value: crate::HashAndFormat = value.value().into(); - if i > 0 && i % 100 == 0 { + if i > 0 && i % 1000 == 0 { info!(" row {i:>6} of {len}"); } new_tags.insert(key, value)?; @@ -54,7 +55,7 @@ pub fn run(source: impl AsRef) -> Result { let (key, value) = entry?; let key: crate::Hash = key.value().into(); let value = value.value(); - if i > 0 && i % 100 == 0 { + if i > 0 && i % 1000 == 0 { info!(" row {i:>6} of {len}"); } new_inline_data.insert(key, value)?; @@ -68,7 +69,7 @@ pub fn run(source: impl AsRef) -> Result { let (key, value) = entry?; let key: crate::Hash = key.value().into(); let value = value.value(); - if i > 0 && i % 100 == 0 { + if i > 0 && i % 1000 == 0 { info!(" row {i:>6} of {len}"); } new_inline_outboard.insert(key, value)?; diff --git a/iroh-sync/src/store/fs/migrate_v1_v2.rs b/iroh-sync/src/store/fs/migrate_v1_v2.rs index 6fd07e44f4..23cfebeb95 100644 --- a/iroh-sync/src/store/fs/migrate_v1_v2.rs +++ b/iroh-sync/src/store/fs/migrate_v1_v2.rs @@ -18,7 +18,7 @@ macro_rules! migrate_table { let (key, value) = entry?; let key = key.value(); let value = value.value(); - if i > 0 && i % 100 == 0 { + if i > 0 && i % 1000 == 0 { info!(" {name} {i:>ind$}/{len}"); } new_table.insert(key, value)?; @@ -38,7 +38,7 @@ macro_rules! migrate_multimap_table { for (i, entry) in old_table.iter()?.enumerate() { let (key, values) = entry?; let key = key.value(); - if i > 0 && i % 100 == 0 { + if i > 0 && i % 1000 == 0 { info!(" {name} {i:>ind$}/{len}"); } for value in values { @@ -52,8 +52,9 @@ macro_rules! migrate_multimap_table { pub fn run(source: impl AsRef) -> Result { let source = source.as_ref(); - // create the database to a tempfile - let target = NamedTempFile::new()?; + let dir = source.parent().expect("database is not in root"); + // create the new database in a tempfile in the same directory as the old db + let target = NamedTempFile::with_prefix_in("docs.db.migrate", dir)?; let target = target.into_temp_path(); info!("migrate {} to {}", source.display(), target.display()); let old_db = redb_v1::Database::open(source)?;