Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Apr 17, 2024
1 parent ad077fd commit 05080cb
Show file tree
Hide file tree
Showing 1,599 changed files with 65,371 additions and 18,873 deletions.
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ members = [
"vendored/sqlite3-parser",

"xtask", "libsql-hrana",
"libsql-wal",
"libsql-wal", "libsql-sqlite3/test/extractor",
]

exclude = [
Expand Down
4 changes: 4 additions & 0 deletions libsql-wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ zerocopy = { version = "0.7.32", features = ["derive"] }

[dev-dependencies]
criterion = "0.5.1"
hex = "0.4.3"
once_cell = "1.19.0"
rand_chacha = "0.3.1"
regex = "1.10.4"
tempfile = "3.10.1"

[[bench]]
Expand Down
134 changes: 87 additions & 47 deletions libsql-wal/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fs::File;
use std::io::{BufWriter, IoSlice, Write};
use std::mem::size_of;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -85,6 +86,35 @@ pub struct LogHeader {
index_size: U64,
}

impl LogHeader {
fn is_empty(&self) -> bool {
self.last_commited_frame_no.get() == 0
}

fn count_committed(&self) -> usize {
self.last_commited_frame_no.get().checked_sub(self.start_frame_no.get() - 1).unwrap_or(0) as usize
}

fn last_committed(&self) -> u64 {
// either the current log is empty, and the start frame_no is the last frame_no commited on
// the previous log (start_frame_no - 1), or it's the last committed frame_no from this
// log.
if self.is_empty() {
self.start_frame_no.get() - 1
} else {
self.last_commited_frame_no.get()
}
}

pub(crate) fn next_frame_no(&self) -> NonZeroU64 {
if self.is_empty() {
NonZeroU64::new(self.start_frame_no.get()).unwrap()
} else {
NonZeroU64::new(self.last_commited_frame_no.get() + 1).unwrap()
}
}
}

/// split the index entry value into it's components: (frame_no, offset)
pub fn index_entry_split(k: u64) -> (u32, u32) {
let offset = (k & u32::MAX as u64) as u32;
Expand Down Expand Up @@ -118,15 +148,15 @@ fn byte_offset(offset: u32) -> u64 {
impl Log {
/// Create a new log from the given path and metadata. The file pointed to by path must not
/// exist.
pub fn create(path: &Path, start_frame_no: u64, db_size: u32) -> Result<Self> {
pub fn create(path: &Path, start_frame_no: NonZeroU64, db_size: u32) -> Result<Self> {
let log_file = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.read(true)
.open(path)?;

let header = LogHeader {
start_frame_no: start_frame_no.into(),
start_frame_no: start_frame_no.get().into(),
last_commited_frame_no: 0.into(),
db_size: db_size.into(),
index_offset: 0.into(),
Expand All @@ -145,25 +175,26 @@ impl Log {
})
}

pub fn len(&self) -> usize {
let header = self.header.lock();
(header.last_commited_frame_no.get() - header.start_frame_no.get()) as usize
pub fn is_empty(&self) -> bool {
self.count_committed() == 0
}

/// Returns the db size and the last commited frame_no
#[tracing::instrument(skip_all)]
pub fn begin_read_infos(&self) -> (u64, u32) {
let header = self.header.lock();
(header.last_commited_frame_no.get(), header.db_size.get())
(header.last_committed(), header.db_size.get())
}

pub fn last_commited(&self) -> u64 {
self.header.lock().last_commited_frame_no.get()
pub fn last_committed(&self) -> u64 {
self.header.lock().last_committed()
}

pub fn frames_in_log(&self) -> u64 {
let header = self.header.lock();
header.last_commited_frame_no.get() - header.start_frame_no.get()
pub fn next_frame_no(&self) -> NonZeroU64 {
self.header.lock().next_frame_no()
}

pub fn count_committed(&self) -> usize {
self.header.lock().count_committed()
}

pub fn db_size(&self) -> u32 {
Expand All @@ -181,36 +212,44 @@ impl Log {
tx.enter(move |tx| {
let mut new_index = fst::map::MapBuilder::memory();
let mut pages = pages.peekable();
let mut commit_frame_written = false;
// let mut commit_frame_written = false;
let current_savepoint = tx.savepoints.last_mut().expect("no savepoints initialized");
while let Some((page_no, page)) = pages.next() {
tracing::trace!(page_no, "inserting page");
match current_savepoint
.index
.as_ref()
.and_then(|i| i.get(&page_no.to_be_bytes()))
{
Some(x) => {
let header = FrameHeader {
page_no: page_no.into(),
// set the size_after if it's the last frame in a commit batch
size_after: 0.into(),
};
// there is already an occurence of this page in the current transaction, replace it
let (_, offset) = index_entry_split(x);
let slices = &[IoSlice::new(header.as_bytes()), IoSlice::new(page)];

self.file
.write_at_vectored(slices, byte_offset(offset) as u64)?;
}
None => {
// match current_savepoint
// .index
// .as_ref()
// .and_then(|i| i.get(&page_no.to_be_bytes()))
// {
// Some(x) => {
// let header = FrameHeader {
// page_no: page_no.into(),
// // set the size_after if it's the last frame in a commit batch
// size_after: 0.into(),
// };
// // there is already an occurence of this page in the current transaction, replace it
// let (fno_offset, offset) = index_entry_split(x);
// let fno = self.header.lock().start_frame_no.get() + fno_offset as u64;
// let fno_bytes = &fno.to_be_bytes()[..];
// let slices = &[
// IoSlice::new(header.as_bytes()),
// IoSlice::new(&page[..4096 - 8]),
// // store the replication index in big endian as per SQLite convention,
// // at the end of the page
// IoSlice::new(fno_bytes),
// ];
//
// self.file
// .write_at_vectored(slices, byte_offset(offset) as u64)?;
// }
// None => {
let size_after = if let Some(size) = size_after {
pages.peek().is_none().then_some(size).unwrap_or(0)
} else {
0
};

commit_frame_written = size_after != 0;
// commit_frame_written = size_after != 0;

let header = FrameHeader {
page_no: page_no.into(),
Expand All @@ -237,8 +276,8 @@ impl Log {
),
)?;
}
}
}
// }
// }

if let Some(ref old_index) = current_savepoint.index {
let indexes = &[old_index, &new_index.into_map()];
Expand Down Expand Up @@ -266,10 +305,10 @@ impl Log {
header.last_commited_frame_no = last_frame_no.into();
header.db_size = size_after.into();

if !commit_frame_written {
// need to patch the last frame header
self.patch_frame_size_after(tx.next_offset - 1, size_after)?;
}
// if !commit_frame_written {
// // need to patch the last frame header
// self.patch_frame_size_after(tx.next_offset - 1, size_after)?;
// }

self.file.write_all_at(header.as_bytes(), 0)?;
// self.file.sync_data().unwrap();
Expand All @@ -289,11 +328,11 @@ impl Log {
// println!("full_insert: {}", before.elapsed().as_micros());1
}

fn patch_frame_size_after(&self, offset: u32, size_after: u32) -> Result<()> {
let offset = byte_offset(offset) + memoffset::offset_of!(FrameHeader, size_after) as u64;
self.file.write_all_at(&size_after.to_le_bytes(), offset)?;
Ok(())
}
// fn patch_frame_size_after(&self, offset: u32, size_after: u32) -> Result<()> {
// let offset = byte_offset(offset) + memoffset::offset_of!(FrameHeader, size_after) as u64;
// self.file.write_all_at(&size_after.to_le_bytes(), offset)?;
// Ok(())
// }

/// return the offset of the frame for page_no, with frame_no no larger that max_frame_no, if
/// it exists
Expand Down Expand Up @@ -347,8 +386,8 @@ impl Log {
"attempt to seal an already sealed log"
);
let mut header = self.header.lock();
let index_offset = header.last_commited_frame_no.get() - header.start_frame_no.get();
let index_byte_offset = byte_offset(index_offset as u32);
let index_offset = header.count_committed() as u32;
let index_byte_offset = byte_offset(index_offset);
let mut cursor = self.file.cursor(index_byte_offset);
let mut writer = BufWriter::new(&mut cursor);
self.index.merge_all(&mut writer)?;
Expand Down Expand Up @@ -442,8 +481,9 @@ impl SealedLog {

let index = self.index();
if let Some(value) = index.get(page_no.to_be_bytes()) {
let (_, offset) = index_entry_split(value);
let (f, offset) = index_entry_split(value);
self.read_offset(offset, buf)?;

return Ok(true);
}

Expand Down
17 changes: 13 additions & 4 deletions libsql-wal/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code, unused_variables, unreachable_code)]
use std::fs::OpenOptions;
use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -91,10 +92,10 @@ impl WalRegistry {
let header = log.header();
(
header.db_size.get(),
header.last_commited_frame_no.get() + 1,
header.next_frame_no(),
)
})
.unwrap_or((1, 0));
.unwrap_or((1, NonZeroU64::new(1).unwrap()));

let current_path = path.join(format!("{namespace}:{start_frame_no:020}.log"));
let current = arc_swap::ArcSwap::new(Arc::new(Log::create(
Expand All @@ -112,7 +113,7 @@ impl WalRegistry {
// end of the file to store the replication index
let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed();
db_file.read_exact_at(header.as_bytes_mut(), 0)?;
assert_eq!(header.reserved_in_page, 8, "bad db");
// assert_eq!(header.reserved_in_page, 8, "bad db");

let shared = Arc::new(SharedWal {
current,
Expand Down Expand Up @@ -144,7 +145,10 @@ impl WalRegistry {

// we have the lock, now create a new log
let current = shared.current.load();
let start_frame_no = current.last_commited() + 1;
if current.is_empty() {
return Ok(())
}
let start_frame_no = current.next_frame_no();
let path = self
.path
.join(shared.namespace.as_str())
Expand All @@ -167,11 +171,16 @@ impl WalRegistry {
self.shutdown.store(true, Ordering::SeqCst);
let mut openned = self.openned.write();
for (_, shared) in openned.drain() {
dbg!(&shared.namespace);
let mut tx = Transaction::Read(shared.begin_read(u64::MAX));
shared.upgrade(&mut tx)?;
dbg!();
tx.commit();
dbg!();
self.swap_current(&shared, &mut tx.as_write_mut().unwrap())?;
dbg!();
shared.current.load().seal()?;
dbg!();
drop(tx);
shared.segments.checkpoint(&shared.db_file)?;
}
Expand Down
Loading

0 comments on commit 05080cb

Please sign in to comment.