diff --git a/Cargo.lock b/Cargo.lock index 56d037e..5c8d8fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -513,6 +513,7 @@ dependencies = [ "serde_sqlite", "toml", "ureq", + "utils", ] [[package]] diff --git a/examples/Cargo.lock b/examples/Cargo.lock index bd0b237..de31357 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -544,6 +544,7 @@ dependencies = [ "chrono", "serde", "serde_sqlite", + "tokio", ] [[package]] diff --git a/examples/sync-backend/Cargo.toml b/examples/sync-backend/Cargo.toml index 33aa863..bf32f31 100644 --- a/examples/sync-backend/Cargo.toml +++ b/examples/sync-backend/Cargo.toml @@ -6,7 +6,7 @@ publish = false [dependencies] libsqlite-sys = { path = "../../libsqlite-sys" } -journal = { path = "../../journal" } +journal = { path = "../../journal", features = ["async"] } serde_sqlite = { path = "../../serde_sqlite" } tokio = { version = "1", features = ["full"] } diff --git a/examples/sync-backend/src/main.rs b/examples/sync-backend/src/main.rs index a7a9fb4..b6d9960 100644 --- a/examples/sync-backend/src/main.rs +++ b/examples/sync-backend/src/main.rs @@ -2,14 +2,7 @@ //! //! ** Strictly for the demo purposes only ** //! ** Known issues **: -//! - It works only for single database and single client -//! - There is no Sync procotol implemented here at all – direct stream of journal. -//! - The server assumes the client sends only new snapshots so the local version is not checked, and it's -//! possible to write the same snapshots multiple times. -//! - No sanity checks that the client actually sends valid data not random garbage -//! - Calling Journal::add_page directly is a hack and rewrites snapshot timestamps/numbers. -//! - The Journal API doesn't allow to write headers directly (yet). -//! - The Journal is experimental, it only supports blocking IO so the scheduler is blocked on the journal IO ops. +//! - It works only for single database //! //! Run with //! @@ -19,21 +12,20 @@ use axum::{ extract::{BodyStream, Path, State, Query}, + http::StatusCode, + body, response, routing::get, Router, Server, }; use futures::StreamExt; -use journal::{Journal, Protocol, Stream}; -use serde_sqlite::de; -use std::io::Read; -use std::sync::Arc; -use tokio::sync::Mutex; +use journal::{Journal, AsyncReadJournalStream, AsyncWriteJournalStream}; +use tokio::io::AsyncWriteExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use serde::Deserialize; -fn to_error(e: T) -> String { - format!("{e:?}") +fn to_error(_e: T) -> StatusCode { + StatusCode::INTERNAL_SERVER_ERROR } #[derive(Debug, Default, Deserialize)] @@ -48,38 +40,12 @@ async fn post_snapshot( State(state): State, Path(_domain): Path, mut stream: BodyStream, -) -> Result<&'static str, String> { - let mut whole_body = vec![]; +) -> Result<&'static str, StatusCode> { + let mut write_stream = AsyncWriteJournalStream::new(state.journal_path).spawn(); while let Some(chunk) = stream.next().await { - whole_body.extend(chunk.map_err(to_error)?); - } - if whole_body.is_empty() { - return Ok(""); - } - let mut whole_body = std::io::Cursor::new(whole_body); - let mut journal = state.journal.lock().await; - - loop { - match de::from_reader::(&mut whole_body) { - Ok(Protocol::SnapshotHeader(snapshot_header)) => { - journal.commit().map_err(to_error)?; - journal.add_snapshot(&snapshot_header).map_err(to_error)?; - tracing::info!("snapshot: {:?}", snapshot_header.id); - } - Ok(Protocol::PageHeader(page_header)) => { - let mut page = vec![0; page_header.page_size as usize]; - whole_body.read_exact(page.as_mut_slice()).map_err(to_error)?; - journal.add_page(&page_header, page.as_slice()).map_err(to_error)?; - tracing::info!(" page: {:?}", page_header.page_num); - }, - Ok(Protocol::EndOfStream(_)) => { - journal.commit().map_err(to_error)?; - tracing::info!("end of stream"); - break; - }, - Err(e) => return Err(to_error(e)), - } - } + let chunk = chunk.map_err(to_error)?; + write_stream.write_all(&chunk).await.map_err(to_error)?; + }; Ok("OK") } @@ -87,13 +53,14 @@ async fn post_snapshot( async fn head_snapshot( State(state): State, Path(_domain): Path, -) -> Result { - let journal = state.journal.lock().await; - let snapshot_id = match journal.current_snapshot() { - Some(v) => format!("{v}"), - None => "".into(), - }; - let headers = response::AppendHeaders([("x-snapshot-id", snapshot_id)]); +) -> Result { + let res = tokio::task::spawn_blocking(move ||{ + let journal = Journal::try_from(state.journal_path) + .or_else(|_e| Journal::create(state.journal_path))?; + Ok::<_, journal::Error>(journal.get_header().snapshot_counter) + }); + let snapshot_id = res.await.map_err(to_error)?.map_err(to_error)?; + let headers = response::AppendHeaders([("x-snapshot-id", snapshot_id.to_string())]); Ok((headers, "head")) } @@ -102,30 +69,23 @@ async fn get_snapshot( State(state): State, Path(_domain): Path, params: Option>, -) -> Result { - let snapshot_id: u64 = params.unwrap_or_default().snapshot_id; - let mut journal = state.journal.lock().await; - let iter = journal - .into_iter() - .skip_snapshots(snapshot_id); - let mut buf = vec![]; - Stream::new(iter).read_to_end(&mut buf).map_err(to_error)?; - Ok(buf) +) -> Result { + let stream = AsyncReadJournalStream::new( + state.journal_path, + params.map(|p| p.snapshot_id).unwrap_or(0) + ).spawn(); + Ok(body::StreamBody::new(tokio_util::io::ReaderStream::new(stream))) } #[derive(Debug, Clone)] struct AppState { - journal: Arc>, + journal_path: &'static str } impl AppState { fn new() -> Self { - let journal_path = "/tmp/journal"; - let journal = Journal::try_from(journal_path) - .or_else(|_e| Journal::create(journal_path)) - .unwrap(); Self { - journal: Arc::new(Mutex::new(journal)), + journal_path: "/tmp/journal" } } } diff --git a/journal/src/async_wrap.rs b/journal/src/async_bridge.rs similarity index 61% rename from journal/src/async_wrap.rs rename to journal/src/async_bridge.rs index d32cf71..513e316 100644 --- a/journal/src/async_wrap.rs +++ b/journal/src/async_bridge.rs @@ -3,6 +3,7 @@ use crate::{Error as JournalError, Journal, Protocol, Stream as JournalStream}; use serde_sqlite::de; use std::io::{BufRead, Read, Write}; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; @@ -13,55 +14,63 @@ fn to_err(err: E) -> std::io::Erro } pub struct AsyncReadJournalStream { - rx: Receiver, - tx: Sender>, snapshot_id: u64, - journal: Journal, + journal_path: PathBuf, } impl AsyncReadJournalStream { - pub fn new( - journal_path: &str, - snapshot_id: u64, - ) -> Result<(Self, AsyncReadJournalStreamHandle), JournalError> { - let journal = Journal::try_from(&journal_path)?; - let (waker_tx, waker_rx) = channel::(1); - let (buffer_tx, buffer_rx) = channel::>(1); - Ok(( - AsyncReadJournalStream { - tx: buffer_tx, - rx: waker_rx, - journal, - snapshot_id, - }, - AsyncReadJournalStreamHandle { - tx: waker_tx, - rx: buffer_rx, - buf: None, - read: 0, - }, - )) + pub fn new>(journal_path: P, snapshot_id: u64) -> Self { + AsyncReadJournalStream { + journal_path: journal_path.into(), + snapshot_id, + } } - pub fn enter_loop(mut self) { - let mut stream = - JournalStream::new(self.journal.into_iter().skip_snapshots(self.snapshot_id)); + pub fn spawn(self) -> AsyncReadJournalStreamHandle { + let (waker_tx, mut waker_rx) = channel::(1); + let (mut buffer_tx, buffer_rx) = channel::>(1); + let join_handle = + tokio::task::spawn_blocking(move || self.enter_loop(&mut waker_rx, &mut buffer_tx)); + AsyncReadJournalStreamHandle { + tx: waker_tx, + rx: buffer_rx, + buf: None, + read: 0, + join_handle, + } + } - while let Some(waker) = self.rx.blocking_recv() { + pub fn enter_loop( + self, + rx: &mut Receiver, + tx: &mut Sender>, + ) -> Result<(), JournalError> { + let mut journal = Journal::try_from(self.journal_path.as_path())?; + let version = journal.get_header().version; + let mut stream = JournalStream::new( + journal.into_iter().skip_snapshots(self.snapshot_id), + version, + ); + + while let Some(waker) = rx.blocking_recv() { let mut buf = Vec::::with_capacity(0x0001_0000); // 65kb buffer unsafe { buf.set_len(buf.capacity()) }; let read = match stream.read(buf.as_mut_slice()) { Ok(read) => read, - // FIXME: ? - Err(_) => { + Err(e) => { waker.wake(); - return; + return Err(e.into()); } }; unsafe { buf.set_len(read) }; - self.tx.blocking_send(buf).ok(); + let res = tx.blocking_send(buf); waker.wake(); + if let Err(tokio::sync::mpsc::error::SendError(_)) = res { + let err = std::io::Error::new(std::io::ErrorKind::Other, "channel closed"); + return Err(err.into()); + } } + Ok(()) } } @@ -71,6 +80,13 @@ pub struct AsyncReadJournalStreamHandle { read: usize, rx: Receiver>, tx: Sender, + join_handle: tokio::task::JoinHandle>, +} + +impl AsyncReadJournalStreamHandle { + pub async fn join(self) -> Result, tokio::task::JoinError> { + self.join_handle.await + } } impl AsyncRead for AsyncReadJournalStreamHandle { @@ -131,7 +147,7 @@ enum AsyncWriteProto { B(Vec), } -struct ReadReceiver { +pub struct ReadReceiver { buf: Vec, buf_pos: usize, waker: Option, @@ -215,55 +231,85 @@ impl Read for ReadReceiver { } } +impl Drop for ReadReceiver { + fn drop(&mut self) { + self.rx.close(); + self.waker.take().map(|waker| waker.wake()); + while let Ok(message) = self.rx.try_recv() { + if let AsyncWriteProto::W(waker) = message { + waker.wake() + } + } + } +} + pub struct AsyncWriteJournalStream { - journal: Journal, - read_receiver: ReadReceiver, + journal_path: PathBuf, } impl AsyncWriteJournalStream { - pub fn new(journal_path: &str) -> Result<(Self, AsyncWriteJournalStreamHandle), JournalError> { - let journal = match Journal::try_from(&journal_path) { - Ok(j) => j, - Err(e) if e.journal_not_exists() => Journal::create(&journal_path)?, - Err(e) => return Err(e), - }; + pub fn new>(journal_path: P) -> Self { + Self { + journal_path: journal_path.into(), + } + } + + pub fn spawn(mut self) -> AsyncWriteJournalStreamHandle { let (buf_tx, buf_rx) = channel(2); // enough space to store waker and buf let (req_tx, req_rx) = channel(1); - Ok(( - Self { - journal, - read_receiver: ReadReceiver::new(buf_rx, req_tx), - }, - AsyncWriteJournalStreamHandle { - tx: buf_tx, - rx: req_rx, - }, - )) + let read_receiver = ReadReceiver::new(buf_rx, req_tx); + let join_handle = tokio::task::spawn_blocking(move || self.enter_loop(read_receiver)); + AsyncWriteJournalStreamHandle { + tx: buf_tx, + rx: req_rx, + join_handle, + } } - pub fn enter_loop(&mut self) -> std::io::Result<()> { + pub fn enter_loop(&mut self, mut read_receiver: ReadReceiver) -> Result<(), JournalError> { + let mut journal = match Journal::try_from(self.journal_path.as_path()) { + Ok(j) => j, + Err(e) if e.journal_not_exists() => Journal::create(self.journal_path.as_path())?, + Err(e) => return Err(e), + }; + + let expected = Protocol::JournalVersion(1.into()); + match de::from_reader::(&mut read_receiver).map_err(to_err)? { + msg if msg == expected => (), + other => { + let err = std::io::Error::new( + std::io::ErrorKind::Other, + format!("expected {}, got: {}", expected, other), + ); + return Err(err.into()); + } + } loop { - match de::from_reader::(&mut self.read_receiver) { - Ok(Protocol::SnapshotHeader(snapshot_header)) => { - self.journal.commit().map_err(to_err)?; - self.journal - .add_snapshot(&snapshot_header) - .map_err(to_err)?; + match de::from_reader::(&mut read_receiver).map_err(to_err)? { + Protocol::SnapshotHeader(snapshot_header) => { + journal.commit().map_err(to_err)?; + journal.add_snapshot(&snapshot_header).map_err(to_err)?; } - Ok(Protocol::PageHeader(page_header)) => { - let mut page = vec![0; page_header.page_size as usize]; - self.read_receiver - .read_exact(page.as_mut_slice()) + Protocol::BlobHeader(blob_header) => { + let mut blob = vec![0; blob_header.blob_size as usize]; + read_receiver + .read_exact(blob.as_mut_slice()) .map_err(to_err)?; - self.journal - .add_page(&page_header, page.as_slice()) + journal + .add_blob(&blob_header, blob.as_slice()) .map_err(to_err)?; } - Ok(Protocol::EndOfStream(_)) => { - self.journal.commit().map_err(to_err)?; + Protocol::EndOfStream(_) => { + journal.commit().map_err(to_err)?; return Ok(()); } - Err(e) => return Err(to_err(e)), + msg => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("unexpected message: {msg:?}"), + ) + .into()) + } } } } @@ -273,6 +319,13 @@ impl AsyncWriteJournalStream { pub struct AsyncWriteJournalStreamHandle { tx: Sender, rx: Receiver<()>, + join_handle: tokio::task::JoinHandle>, +} + +impl AsyncWriteJournalStreamHandle { + pub async fn join(self) -> Result, tokio::task::JoinError> { + self.join_handle.await + } } impl AsyncWrite for AsyncWriteJournalStreamHandle { diff --git a/journal/src/error.rs b/journal/src/error.rs index 131d025..dd38244 100644 --- a/journal/src/error.rs +++ b/journal/src/error.rs @@ -16,11 +16,15 @@ pub enum Error { snapshot_id: u64, journal_snapshot_id: u64, }, - /// attemt to add out of order page - OutOfOrderPage { - page_num: u32, - page_count: Option, + /// Snapshot not started + SnapshotNotStarted, + /// Attemt to add out of order blob + OutOfOrderBlob { + blob_num: u32, + blob_count: Option, }, + /// Unexpected Journal Version + UnexpectedJournalVersion { expected: u32, got: u32 }, } impl From for Error { diff --git a/journal/src/journal.rs b/journal/src/journal.rs index ab65ecc..2e89936 100644 --- a/journal/src/journal.rs +++ b/journal/src/journal.rs @@ -23,7 +23,7 @@ where /// Wrapped into Fd reader/writer/seeker fd: Fd, BufReader>, /// snapshot page count - page_count: Option, + blob_count: Option, /// Buffer size buffer_sz: usize, } @@ -143,17 +143,17 @@ impl Journal { impl Journal { /// Instantiate journal & force header write - pub fn new(header: Header, mut fd: F, page_count: Option) -> Result { + pub fn new(header: Header, mut fd: F, blob_count: Option) -> Result { Self::write_header(&mut fd, &header)?; - Ok(Self::from(header, fd, page_count)) + Ok(Self::from(header, fd, blob_count)) } /// Instantiate journal - pub fn from(header: Header, fd: F, page_count: Option) -> Self { + pub fn from(header: Header, fd: F, blob_count: Option) -> Self { Self { header, fd: Fd::Raw(fd), - page_count, + blob_count, buffer_sz: DEFAULT_BUFFER_SIZE, } } @@ -170,32 +170,31 @@ impl Journal { /// Initiate new snapshot /// - /// * udpate journal header to correctly setup offset + /// * update journal header to correctly setup offset /// * to initiate snapshot we seek to current end of the file (value stored in header) /// * switch fd to buffered mode /// * write snapshot header with current header counter number - pub fn new_snapshot(&mut self) -> Result<()> { - if self.page_count.is_some() { + pub fn new_snapshot(&mut self, page_size: u32) -> Result<()> { + if self.blob_count.is_some() { return Ok(()); } self.update_header()?; let snapshot_header = SnapshotHeader::new( self.header.snapshot_counter, chrono::Utc::now().timestamp_micros(), + Some(page_size), ); self.write_snapshot(&snapshot_header) } - /// Add new sqlite page - /// - /// Automatically starts new snapshot if there is none - pub fn new_page(&mut self, offset: u64, page: &[u8]) -> Result<()> { - if !self.snapshot_started() { - self.new_snapshot()?; + /// Add new blob + pub fn new_blob(&mut self, offset: u64, blob: &[u8]) -> Result<()> { + let blob_num = match self.blob_count { + Some(c) => c, + None => return Err(Error::SnapshotNotStarted), }; - let page_num = self.page_count.unwrap(); - let page_header = PageHeader::new(offset, page_num, page.len() as u32); - self.add_page(&page_header, page) + let blob_header = BlobHeader::new(offset, blob_num, blob.len() as u32); + self.add_blob(&blob_header, blob) } /// Add existing snapshot @@ -219,24 +218,24 @@ impl Journal { self.fd.seek(SeekFrom::Start(self.header.eof))?; self.fd.as_writer(self.buffer_sz); self.fd.write_all(&to_bytes(snapshot_header)?)?; - self.page_count = Some(0); + self.blob_count = Some(0); Ok(()) } - /// Add page - pub fn add_page(&mut self, page_header: &PageHeader, page: &[u8]) -> Result<()> { - if Some(page_header.page_num) != self.page_count { - return Err(Error::OutOfOrderPage { - page_num: page_header.page_num, - page_count: self.page_count, + /// Add blob + pub fn add_blob(&mut self, blob_header: &BlobHeader, blob: &[u8]) -> Result<()> { + if Some(blob_header.blob_num) != self.blob_count { + return Err(Error::OutOfOrderBlob { + blob_num: blob_header.blob_num, + blob_count: self.blob_count, }); } - self.page_count.as_mut().map(|x| { + self.blob_count.as_mut().map(|x| { *x += 1; *x }); - self.fd.write_all(&to_bytes(page_header)?)?; - self.fd.write_all(page)?; + self.fd.write_all(&to_bytes(blob_header)?)?; + self.fd.write_all(blob)?; Ok(()) } @@ -252,8 +251,8 @@ impl Journal { return Ok(()); } // commit snapshot by writting final empty page - self.fd.write_all(&to_bytes(&PageHeader::last())?)?; - self.page_count = None; + self.fd.write_all(&to_bytes(&BlobHeader::last())?)?; + self.blob_count = None; self.header.snapshot_counter += 1; self.header.eof = self.fd.stream_position()?; @@ -304,7 +303,7 @@ impl Journal { /// Check if snapshot was already started fn snapshot_started(&self) -> bool { - self.page_count.is_some() + self.blob_count.is_some() } } @@ -350,7 +349,7 @@ impl<'a, F> Iterator for IntoIter<'a, F> where F: Read + Write + Seek, { - type Item = Result<(SnapshotHeader, PageHeader, Vec)>; + type Item = Result<(SnapshotHeader, BlobHeader, Vec)>; fn next(&mut self) -> Option { if !self.initialized { @@ -384,14 +383,14 @@ where } }; } - let page_header = match from_reader::(&mut self.journal.fd) { + let blob_header = match from_reader::(&mut self.journal.fd) { Ok(p) => p, Err(e) => { self.eoi = true; return Some(Err(e.into())); } }; - if page_header.is_last() { + if blob_header.is_last() { if self.current_snapshot.as_ref().unwrap().id + 1 == self.journal.header.snapshot_counter { @@ -403,14 +402,14 @@ where } } let mut buf = vec![]; - match buf.try_reserve(page_header.page_size as usize) { + match buf.try_reserve(blob_header.blob_size as usize) { Ok(_) => (), Err(e) => { self.eoi = true; return Some(Err(e.into())); } } - buf.resize(page_header.page_size as usize, 0); + buf.resize(blob_header.blob_size as usize, 0); match self.journal.fd.read_exact(buf.as_mut_slice()) { Ok(_) => (), Err(e) => { @@ -420,7 +419,7 @@ where } Some(Ok(( self.current_snapshot.as_ref().unwrap().clone(), - page_header, + blob_header, buf, ))) } @@ -457,29 +456,38 @@ impl Default for Header { pub struct SnapshotHeader { pub id: u64, pub timestamp: i64, + #[serde( + serialize_with = "serde_sqlite::se::none_as_zero", + deserialize_with = "serde_sqlite::de::zero_as_none" + )] + pub page_size: Option, } impl SnapshotHeader { - pub fn new(id: u64, timestamp: i64) -> Self { - Self { id, timestamp } + pub fn new(id: u64, timestamp: i64, page_size: Option) -> Self { + Self { + id, + timestamp, + page_size, + } } } -/// Page Header +/// Blob Header #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[block(16)] -pub struct PageHeader { +pub struct BlobHeader { pub offset: u64, - pub page_num: u32, - pub page_size: u32, + pub blob_num: u32, + pub blob_size: u32, } -impl PageHeader { - fn new(offset: u64, page_num: u32, page_size: u32) -> Self { +impl BlobHeader { + fn new(offset: u64, blob_num: u32, blob_size: u32) -> Self { Self { offset, - page_num, - page_size, + blob_num, + blob_size, } } @@ -487,13 +495,13 @@ impl PageHeader { pub fn last() -> Self { Self { offset: 0, - page_num: 0, - page_size: 0, + blob_num: 0, + blob_size: 0, } } // FIXME: should not be public pub fn is_last(&self) -> bool { - self.offset == 0 && self.page_num == 0 && self.page_size == 0 + self.offset == 0 && self.blob_num == 0 && self.blob_size == 0 } } diff --git a/journal/src/lib.rs b/journal/src/lib.rs index 0d37387..0fd1245 100644 --- a/journal/src/lib.rs +++ b/journal/src/lib.rs @@ -1,14 +1,14 @@ #[cfg(feature = "async")] -mod async_wrap; +mod async_bridge; mod error; mod journal; mod stream; #[cfg(feature = "async")] -pub use crate::async_wrap::{ +pub use crate::async_bridge::{ AsyncReadJournalStream, AsyncReadJournalStreamHandle, AsyncWriteJournalStream, AsyncWriteJournalStreamHandle, }; pub use crate::error::Error; -pub use crate::journal::{Header, Journal, PageHeader, SnapshotHeader}; -pub use crate::stream::{Protocol, Stream}; +pub use crate::journal::{BlobHeader, Header, Journal, SnapshotHeader}; +pub use crate::stream::{JournalVersion, Protocol, Stream}; diff --git a/journal/src/stream.rs b/journal/src/stream.rs index fcb3773..d7b1318 100644 --- a/journal/src/stream.rs +++ b/journal/src/stream.rs @@ -1,7 +1,7 @@ //! Streaming protocol for journal use crate::error::Error as JournalError; -use crate::journal::{IntoIter, Journal, PageHeader, SnapshotHeader}; +use crate::journal::{BlobHeader, IntoIter, Journal, SnapshotHeader}; use block::{block, Block}; use serde::{Deserialize, Serialize}; use serde_sqlite::to_writer; @@ -15,8 +15,39 @@ pub struct End {} #[block] pub enum Protocol { SnapshotHeader(SnapshotHeader), - PageHeader(PageHeader), + BlobHeader(BlobHeader), EndOfStream(End), + JournalVersion(JournalVersion), +} + +impl std::fmt::Display for Protocol { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::SnapshotHeader(_) => write!(f, "SnapshotHeader"), + Self::BlobHeader(_) => write!(f, "BlobHeader"), + Self::EndOfStream(_) => write!(f, "EndOfStream"), + Self::JournalVersion(v) => write!(f, "JournalVersion({})", v.version), + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy)] +#[repr(transparent)] +#[block(4)] +pub struct JournalVersion { + version: u32, +} + +impl From for JournalVersion { + fn from(version: u32) -> Self { + Self { version } + } +} + +impl From for u32 { + fn from(val: JournalVersion) -> Self { + val.version + } } impl From for Protocol { @@ -25,9 +56,15 @@ impl From for Protocol { } } -impl From for Protocol { - fn from(p: PageHeader) -> Self { - Self::PageHeader(p) +impl From for Protocol { + fn from(p: BlobHeader) -> Self { + Self::BlobHeader(p) + } +} + +impl From for Protocol { + fn from(v: JournalVersion) -> Self { + Self::JournalVersion(v) } } @@ -41,6 +78,8 @@ impl Protocol { /// Converts iteration over journal into serialized Protocol stream pub struct Stream<'a, I: Iterator as Iterator>::Item>> { iter: I, + version: u32, + version_written: bool, buf: Vec, read: usize, cur_snapshot_id: Option, @@ -51,22 +90,25 @@ pub struct Stream<'a, I: Iterator as Iterator>::Item>> { // stream, which starts from 'scratch' impl<'a, F: Read + Write + Seek> From<&'a mut Journal> for Stream<'a, IntoIter<'a, F>> { fn from(journal: &'a mut Journal) -> Self { - Stream::new(journal.into_iter()) + let version = journal.get_header().version; + Stream::new(journal.into_iter(), version) } } // stream with any iterator with same Item type -impl<'a, I: Iterator as Iterator>::Item>> From for Stream<'a, I> { - fn from(iter: I) -> Self { - Stream::new(iter) +impl<'a, I: Iterator as Iterator>::Item>> From<(u32, I)> for Stream<'a, I> { + fn from((version, iter): (u32, I)) -> Self { + Stream::new(iter, version) } } impl<'a, I: Iterator as Iterator>::Item>> Stream<'a, I> { - pub fn new(iter: I) -> Self { + pub fn new(iter: I, version: u32) -> Self { Self { iter, - buf: vec![], + version, + version_written: false, + buf: Vec::with_capacity(8192), read: 0, cur_snapshot_id: None, finished: false, @@ -104,6 +146,17 @@ impl<'a, I: Iterator as Iterator>::Item>> BufRead for Strea self.read = 0; self.buf.clear(); } + + // always write version first + if !self.version_written { + let version: Protocol = JournalVersion::from(self.version).into(); + self.resize_buf(version.iblock_size()); + to_writer(self.buf.as_mut_slice(), &version).map_err(Self::to_io_error)?; + self.version_written = true; + return Ok(self.buf.as_slice()); + } + + // body write match self.iter.next() { Some(Ok((snapshot_h, page_h, page))) => { let snapshot_id = snapshot_h.id; @@ -115,6 +168,7 @@ impl<'a, I: Iterator as Iterator>::Item>> BufRead for Strea self.resize_buf(total_len); let mut read_buf = Cursor::new(self.buf.as_mut_slice()); + if self.cur_snapshot_id != Some(snapshot_id) { to_writer(&mut read_buf, &snapshot_h).map_err(Self::to_io_error)?; self.cur_snapshot_id = Some(snapshot_id) @@ -129,6 +183,7 @@ impl<'a, I: Iterator as Iterator>::Item>> BufRead for Strea Some(Err(e)) => return Err(Self::to_io_error(e)), None if !self.finished => { self.finished = true; + let eos = Protocol::end(); self.resize_buf(eos.iblock_size()); to_writer(self.buf.as_mut_slice(), &eos).map_err(Self::to_io_error)?; diff --git a/journal/tests/journal_tests.rs b/journal/tests/journal_tests.rs index ee429ff..00469e4 100644 --- a/journal/tests/journal_tests.rs +++ b/journal/tests/journal_tests.rs @@ -19,12 +19,12 @@ fn test_journal_not_exists() { } #[derive(Debug, Clone, PartialEq)] -struct TestPage { +struct TestBlob { offset: u64, data: Vec, } -impl Arbitrary for TestPage { +impl Arbitrary for TestBlob { fn arbitrary(gen: &mut Gen) -> Self { Self { offset: u64::arbitrary(gen), @@ -35,38 +35,38 @@ impl Arbitrary for TestPage { #[derive(Debug, Clone, PartialEq)] struct TestSnapshot { - pages: Vec, + blobs: Vec, } impl Arbitrary for TestSnapshot { fn arbitrary(gen: &mut Gen) -> Self { - // limit min/max pages per snapshot - let page_count = 1 + usize::arbitrary(gen) % 49; - let pages = (0..page_count) + // limit min/max blob per snapshot + let blob_count = 1 + usize::arbitrary(gen) % 49; + let blobs = (0..blob_count) .enumerate() .fold(vec![], |mut acc, (pos, _)| { - let mut page = TestPage::arbitrary(gen); + let mut blob = TestBlob::arbitrary(gen); // *edge case* - // quickcheck is able to quickly find a way to insert 'last page' as a first page of snapshot - // last page is a page where all values are set to 0 and technically it's not possible - // to insert such page from sqlite calls - // for now we just override such scenario, but pages with zero sizes are still part of - // the test case, even though empty page as a concept doesn't make sense. - if pos == 0 && page.data.is_empty() { - page.data = vec![0]; + // quickcheck is able to quickly find a way to insert 'last blob' as a first blob of snapshot + // last blob is a blob where all values are set to 0 and technically it's not possible + // to insert such blob from sqlite calls + // for now we just override such scenario, but blobs with zero sizes are still part of + // the test case, even though empty blob as a concept doesn't make sense. + if pos == 0 && blob.data.is_empty() { + blob.data = vec![0]; } - acc.push(page); + acc.push(blob); acc }); - TestSnapshot { pages } + TestSnapshot { blobs } } fn shrink(&self) -> Box> { Box::new( - self.pages + self.blobs .shrink() - .filter(|pages| !pages.is_empty()) // snapshot with no pages is not valid input - .map(|pages| TestSnapshot { pages }), + .filter(|blobs| !blobs.is_empty()) // snapshot with no blobs is not valid input + .map(|blobs| TestSnapshot { blobs }), ) } } @@ -76,8 +76,9 @@ fn test_journal_snapshotting() { fn check(input: Vec) { let mut journal = Journal::new(Header::default(), Cursor::new(vec![]), None).unwrap(); for snapshot in input.iter() { - for page in snapshot.pages.iter() { - journal.new_page(page.offset, page.data.as_slice()).unwrap(); + for blob in snapshot.blobs.iter() { + journal.new_snapshot(0).unwrap(); + journal.new_blob(blob.offset, blob.data.as_slice()).unwrap(); } journal.commit().unwrap(); } @@ -87,14 +88,14 @@ fn test_journal_snapshotting() { .map(Result::unwrap) .fold( (vec![], None), - |(mut acc, mut snapshot_id), (snapshot_h, page_h, page)| { + |(mut acc, mut snapshot_id), (snapshot_h, blob_h, blob)| { if snapshot_id != Some(snapshot_h.id) { snapshot_id = Some(snapshot_h.id); - acc.push(TestSnapshot { pages: vec![] }); + acc.push(TestSnapshot { blobs: vec![] }); }; - acc.last_mut().unwrap().pages.push(TestPage { - offset: page_h.offset, - data: page, + acc.last_mut().unwrap().blobs.push(TestBlob { + offset: blob_h.offset, + data: blob, }); (acc, snapshot_id) }, @@ -137,12 +138,13 @@ impl Arbitrary for XorShift { fn test_journal_stream() { fn check(input: Vec, mut prng: XorShift) -> TestResult { let mut journal = Journal::new(Header::default(), Cursor::new(vec![]), None).unwrap(); - let mut expected_len = 4; // end of stream + let mut expected_len = 12; // version + end of stream for snapshot in input.iter() { expected_len += journal::SnapshotHeader::block_size() + 4; - for page in snapshot.pages.iter() { - expected_len += journal::PageHeader::block_size() + 4 + page.data.len(); - journal.new_page(page.offset, page.data.as_slice()).unwrap(); + for blob in snapshot.blobs.iter() { + expected_len += journal::BlobHeader::block_size() + 4 + blob.data.len(); + journal.new_snapshot(0).unwrap(); + journal.new_blob(blob.offset, blob.data.as_slice()).unwrap(); } journal.commit().unwrap(); } @@ -170,18 +172,23 @@ fn test_journal_stream() { let mut reader = Cursor::new(buf.as_slice()); let mut expected = vec![]; + assert_eq!( + serde_sqlite::from_reader::(&mut reader).unwrap(), + Protocol::JournalVersion(1.into()) + ); loop { match serde_sqlite::from_reader::(&mut reader) { - Ok(Protocol::SnapshotHeader(_)) => expected.push(TestSnapshot { pages: vec![] }), - Ok(Protocol::PageHeader(p)) => { - let mut buf = vec![0; p.page_size as usize]; + Ok(Protocol::SnapshotHeader(_)) => expected.push(TestSnapshot { blobs: vec![] }), + Ok(Protocol::BlobHeader(p)) => { + let mut buf = vec![0; p.blob_size as usize]; reader.read_exact(buf.as_mut_slice()).unwrap(); - expected.last_mut().unwrap().pages.push(TestPage { + expected.last_mut().unwrap().blobs.push(TestBlob { offset: p.offset, data: buf, }); } Ok(Protocol::EndOfStream(_)) => break, + Ok(msg) => panic!("unexpected {msg:?}"), Err(e) => return TestResult::error(format!("unexpected error: {e}")), } } @@ -197,22 +204,23 @@ fn test_journal_stream_with_offset() { // init journal let mut journal = Journal::new(Header::default(), Cursor::new(vec![]), None).unwrap(); for snapshot in input.iter() { - for page in snapshot.pages.iter() { - journal.new_page(page.offset, page.data.as_slice()).unwrap(); + for blob in snapshot.blobs.iter() { + journal.new_snapshot(0).unwrap(); + journal.new_blob(blob.offset, blob.data.as_slice()).unwrap(); } journal.commit().unwrap(); } // count how many serialized bytes are expected let skip = prng.next() % input.len().max(1) as u64; - let mut expected_len = 4; // end of stream + let mut expected_len = 12; // version + end of stream for snapshot in input.iter().skip(skip as usize) { expected_len += journal::SnapshotHeader::block_size() + 4; - for page in snapshot.pages.iter() { - expected_len += journal::PageHeader::block_size() + 4 + page.data.len(); + for blob in snapshot.blobs.iter() { + expected_len += journal::BlobHeader::block_size() + 4 + blob.data.len(); } } - let mut stream: Stream<_> = Stream::from(journal.into_iter().skip_snapshots(skip)); + let mut stream: Stream<_> = Stream::from((1, journal.into_iter().skip_snapshots(skip))); let mut writer = Cursor::new(vec![]); loop { let buf_size = (prng.next() % 100) as usize; @@ -235,18 +243,24 @@ fn test_journal_stream_with_offset() { let mut reader = Cursor::new(buf.as_slice()); let mut expected = vec![]; + + assert_eq!( + serde_sqlite::from_reader::(&mut reader).unwrap(), + Protocol::JournalVersion(1.into()) + ); loop { match serde_sqlite::from_reader::(&mut reader) { - Ok(Protocol::SnapshotHeader(_)) => expected.push(TestSnapshot { pages: vec![] }), - Ok(Protocol::PageHeader(p)) => { - let mut buf = vec![0; p.page_size as usize]; + Ok(Protocol::SnapshotHeader(_)) => expected.push(TestSnapshot { blobs: vec![] }), + Ok(Protocol::BlobHeader(p)) => { + let mut buf = vec![0; p.blob_size as usize]; reader.read_exact(buf.as_mut_slice()).unwrap(); - expected.last_mut().unwrap().pages.push(TestPage { + expected.last_mut().unwrap().blobs.push(TestBlob { offset: p.offset, data: buf, }); } Ok(Protocol::EndOfStream(_)) => break, + Ok(msg) => panic!("unexpected {msg:?}"), Err(e) => return TestResult::error(format!("unexpected error: {e}")), } } @@ -262,8 +276,9 @@ fn test_journal_rebuild_from_stream() { fn check(input: Vec, mut prng: XorShift) { let mut journal = Journal::new(Header::default(), Cursor::new(vec![]), None).unwrap(); for snapshot in input.iter() { - for page in snapshot.pages.iter() { - journal.new_page(page.offset, page.data.as_slice()).unwrap(); + for blob in snapshot.blobs.iter() { + journal.new_snapshot(0).unwrap(); + journal.new_blob(blob.offset, blob.data.as_slice()).unwrap(); } journal.commit().unwrap(); } @@ -285,21 +300,29 @@ fn test_journal_rebuild_from_stream() { let mut reader = Cursor::new(buf.as_slice()); let mut recovered_journal = Journal::new(Header::default(), Cursor::new(vec![]), None).unwrap(); + + assert_eq!( + serde_sqlite::from_reader::(&mut reader).unwrap(), + Protocol::JournalVersion(1.into()) + ); loop { match serde_sqlite::from_reader::(&mut reader) { Ok(Protocol::SnapshotHeader(s)) => { recovered_journal.commit().unwrap(); recovered_journal.add_snapshot(&s).unwrap(); } - Ok(Protocol::PageHeader(p)) => { - let mut buf = vec![0; p.page_size as usize]; + Ok(Protocol::BlobHeader(p)) => { + let mut buf = vec![0; p.blob_size as usize]; reader.read_exact(buf.as_mut_slice()).unwrap(); - recovered_journal.add_page(&p, buf.as_slice()).unwrap(); + recovered_journal.add_blob(&p, buf.as_slice()).unwrap(); } Ok(Protocol::EndOfStream(_)) => { recovered_journal.commit().unwrap(); break; } + Ok(Protocol::JournalVersion(_)) => { + panic!("version header should not appear in loop") + } Err(e) => panic!("unexpected stream error: {e}"), } } @@ -420,10 +443,11 @@ fn test_journal_concurrent_updates() { // test concurrent snapshot creation std::thread::scope(|s| { s.spawn(|| { - s1.iter().for_each(|page| { + s1.iter().for_each(|blob| { let guard = lock.lock().unwrap(); + journal_1.new_snapshot(0).unwrap(); journal_1 - .new_page(page.len() as u64, page.as_slice()) + .new_blob(blob.len() as u64, blob.as_slice()) .unwrap(); journal_1.commit().unwrap(); drop(guard); @@ -431,10 +455,11 @@ fn test_journal_concurrent_updates() { }); }); s.spawn(|| { - s2.iter().for_each(|page| { + s2.iter().for_each(|blob| { let guard = lock.lock().unwrap(); + journal_2.new_snapshot(0).unwrap(); journal_2 - .new_page(page.len() as u64, page.as_slice()) + .new_blob(blob.len() as u64, blob.as_slice()) .unwrap(); journal_2.commit().unwrap(); drop(guard); @@ -449,7 +474,7 @@ fn test_journal_concurrent_updates() { .all(|(left, right)| left.unwrap() == right.unwrap())); assert_eq!(journal_1.into_iter().count(), journal_2.into_iter().count()); - // it's matches only because we have one page per snapshot + // it's matches only because we have one blob per snapshot assert_eq!(journal_1.into_iter().count(), size); assert_eq!(journal_1.get_header().snapshot_counter, size as u64); @@ -463,9 +488,9 @@ fn test_journal_concurrent_updates() { s.spawn(|| loop { let mut i = iter.lock().unwrap(); if let Some(res) = i.next() { - let (snapshot_h, page_h, page) = res.unwrap(); + let (snapshot_h, blob_h, blob) = res.unwrap(); journal_1_re.add_snapshot(&snapshot_h).unwrap(); - journal_1_re.add_page(&page_h, page.as_slice()).unwrap(); + journal_1_re.add_blob(&blob_h, blob.as_slice()).unwrap(); journal_1_re.commit().unwrap(); } else { break; @@ -476,9 +501,9 @@ fn test_journal_concurrent_updates() { s.spawn(|| loop { let mut i = iter.lock().unwrap(); if let Some(res) = i.next() { - let (snapshot_h, page_h, page) = res.unwrap(); + let (snapshot_h, blob_h, blob) = res.unwrap(); journal_2_re.add_snapshot(&snapshot_h).unwrap(); - journal_2_re.add_page(&page_h, page.as_slice()).unwrap(); + journal_2_re.add_blob(&blob_h, blob.as_slice()).unwrap(); journal_2_re.commit().unwrap(); } else { break; diff --git a/mycelite/Cargo.toml b/mycelite/Cargo.toml index dbc7a79..06c6ad8 100644 --- a/mycelite/Cargo.toml +++ b/mycelite/Cargo.toml @@ -16,6 +16,7 @@ replicator = ["dep:ureq", "dep:base64"] [dependencies] libsqlite-sys = { path = "../libsqlite-sys" } journal = { path = "../journal" } +utils = { path = "../utils" } page_parser = { path = "../page_parser" } serde_sqlite = { path = "../serde_sqlite" } once_cell = "1" diff --git a/mycelite/src/replicator/replicator.rs b/mycelite/src/replicator/http_replicator.rs similarity index 90% rename from mycelite/src/replicator/replicator.rs rename to mycelite/src/replicator/http_replicator.rs index c8de646..dbb12a2 100644 --- a/mycelite/src/replicator/replicator.rs +++ b/mycelite/src/replicator/http_replicator.rs @@ -114,7 +114,12 @@ impl Replicator { if let Some(b) = self.get_basic_auth_header(client_id.as_deref(), secret.as_deref()) { req = req.set("Authorization", &b) } - let stream = Stream::from(self.journal.into_iter().skip_snapshots(remote_snapshot_id)); + + let version = self.journal.get_header().version; + let stream = Stream::from(( + version, + self.journal.into_iter().skip_snapshots(remote_snapshot_id), + )); // FIXME: status code are not checked req.send(stream)?; @@ -148,21 +153,30 @@ impl Replicator { let res = req.call()?; let mut reader = res.into_reader(); + + match de::from_reader::(&mut reader)? { + Protocol::JournalVersion(v) if v == 1_u32.into() => (), + Protocol::JournalVersion(v) => { + return Err(format!("unexpected journal version: {v:?}").into()) + } + _ => return Err("expected version header".into()), + }; loop { match de::from_reader::(&mut reader)? { Protocol::SnapshotHeader(snapshot_header) => { self.journal.commit()?; self.journal.add_snapshot(&snapshot_header)? } - Protocol::PageHeader(page_header) => { - let mut page = vec![0; page_header.page_size as usize]; - reader.read_exact(page.as_mut_slice())?; - self.journal.add_page(&page_header, page.as_slice())?; + Protocol::BlobHeader(blob_header) => { + let mut blob = vec![0; blob_header.blob_size as usize]; + reader.read_exact(blob.as_mut_slice())?; + self.journal.add_blob(&blob_header, blob.as_slice())?; } Protocol::EndOfStream(_) => { self.journal.commit()?; break; } + Protocol::JournalVersion(_) => return Err("version header was not expected".into()), } } Ok((local_snapshot_id, self.journal.current_snapshot())) diff --git a/mycelite/src/replicator/mod.rs b/mycelite/src/replicator/mod.rs index 4715056..1cc64c6 100644 --- a/mycelite/src/replicator/mod.rs +++ b/mycelite/src/replicator/mod.rs @@ -1,10 +1,5 @@ -#[cfg(not(feature = "replicator"))] -mod noop_replicator; -#[cfg(feature = "replicator")] -mod replicator; +#[cfg_attr(not(feature = "replicator"), path = "noop_replicator.rs")] +#[cfg_attr(feature = "replicator", path = "http_replicator.rs")] +mod replicator_impl; -#[cfg(feature = "replicator")] -pub use replicator::*; - -#[cfg(not(feature = "replicator"))] -pub use noop_replicator::*; +pub use replicator_impl::*; diff --git a/mycelite/src/vfs.rs b/mycelite/src/vfs.rs index c37bf37..8282cd9 100644 --- a/mycelite/src/vfs.rs +++ b/mycelite/src/vfs.rs @@ -148,9 +148,13 @@ impl MclVFSFile { for res in iter { let (offset, page) = res?; let page = page.as_slice(); - journal.new_page(offset, page)?; + // it's ok to call snapshot multiple times, if it's already started, it's noop + journal.new_snapshot(page.len() as u32)?; + for (diff_offset, blob) in utils::get_diff(page, &[]) { + journal.new_blob(offset + diff_offset as u64, blob)?; + } } - journal.commit().map_err(Into::into) + Ok(journal.commit()?) } fn setup_journal( @@ -378,16 +382,30 @@ unsafe extern "C" fn mvfs_io_write( return ffi::SQLITE_READONLY; } } - let result = file.journal.as_mut().map(|journal| { - journal.new_page( - offset as u64, - std::slice::from_raw_parts(buf as *const u8, amt as usize), - ) - }); - match result { - None | Some(Ok(_)) => (), - Some(Err(_e)) => return ffi::SQLITE_ERROR, + let result = match file.journal.as_mut() { + Some(journal) => { + let new_page = std::slice::from_raw_parts(buf.cast::(), amt as usize); + let mut old_page = vec![0_u8; amt as usize]; + let mut iter = + match MclVFSIO.xRead.unwrap()(pfile, old_page.as_mut_ptr().cast(), amt, offset) { + // existing page + ffi::SQLITE_OK => utils::get_diff(new_page, &old_page), + // new page + ffi::SQLITE_IOERR_SHORT_READ => utils::get_diff(new_page, &[]), + _other => return ffi::SQLITE_ERROR, + }; + iter.try_for_each(|(diff_offset, diff)| { + let diff_offset = diff_offset as i64 + offset; + journal + .new_snapshot(amt as u32) + .and_then(|_| journal.new_blob(diff_offset as u64, diff)) + }) + } + None => Ok(()), }; + if let Err(_e) = result { + return ffi::SQLITE_ERROR; + } (*file.real.pMethods).xWrite.unwrap()(&mut file.real, buf, amt, offset) } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index e24bdf5..460c5b4 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -32,8 +32,9 @@ where I: Iterator, { type Item = (usize, usize); + fn next(&mut self) -> Option { - while let Some(item) = self.iter.next() { + for item in self.iter.by_ref() { match item { (i, (old, new)) if old != new => { self.range = match self.range {