Skip to content

Commit

Permalink
fix fs storage impl (#1461)
Browse files Browse the repository at this point in the history
* fix fs storage impl

* fmt

* fix warning

---------

Co-authored-by: Lucio Franco <luciofranco14@gmail.com>
  • Loading branch information
MarinPostma and LucioFranco authored Jun 11, 2024
1 parent 818e166 commit cd586ed
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 62 deletions.
14 changes: 9 additions & 5 deletions libsql-wal/src/bottomless/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ pub(crate) struct Job<C, T> {
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
#[repr(C)]
pub struct CompactedSegmentDataHeader {
frame_count: lu64,
segment_id: lu128,
start_frame_no: lu64,
end_frame_no: lu64,
pub(crate) frame_count: lu64,
pub(crate) segment_id: lu128,
pub(crate) start_frame_no: lu64,
pub(crate) end_frame_no: lu64,
}

#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
Expand Down Expand Up @@ -327,6 +327,10 @@ mod test {
fn uuid(&self) -> Uuid {
Uuid::from_u128(0)
}

fn hard_link(&self, src: &Path, dst: &Path) -> std::io::Result<()> {
todo!()
}
}

struct TestStorage {
Expand Down Expand Up @@ -369,7 +373,7 @@ mod test {
_config: &Self::Config,
_namespace: NamespaceName,
_frame_no: u64,
_dest: impl tokio::io::AsyncWrite,
_dest_path: &Path,
) -> Result<()> {
todo!()
}
Expand Down
90 changes: 46 additions & 44 deletions libsql-wal/src/bottomless/storage/fs.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::future::Future;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use tokio::io::{AsyncWrite, AsyncWriteExt};
use zerocopy::FromBytes;

use crate::bottomless::job::CompactedSegmentDataHeader;
use crate::bottomless::{Error, Result};
use crate::io::{FileExt, Io};
use crate::name::NamespaceName;
use crate::segment::SegmentHeader;

use super::Storage;

Expand Down Expand Up @@ -48,9 +45,9 @@ impl<I: Io> Storage for FsStorage<I> {

let path = self.prefix.join("segments").join(key);

let buf = vec![0u8; segment_data.len().unwrap() as usize];
let buf = Vec::with_capacity(dbg!(segment_data.len().unwrap()) as usize);

let f = self.io.open(true, true, true, &path).unwrap();
let f = self.io.open(true, false, true, dbg!(&path)).unwrap();
async move {
let (buf, res) = segment_data.read_exact_at_async(buf, 0).await;

Expand All @@ -66,16 +63,16 @@ impl<I: Io> Storage for FsStorage<I> {
_config: &Self::Config,
_namespace: NamespaceName,
frame_no: u64,
dest: impl AsyncWrite,
dest_path: &Path,
) -> Result<()> {
let dir = self.prefix.join("segments");

// TODO(lucio): optimization would be to cache this list, since we update the files in the
// store fn we can keep track without having to go to the OS each time.
let mut dirs = tokio::fs::read_dir(dir).await?;

while let Some(dir) = dirs.next_entry().await? {
let file = dir.file_name();
while let Some(entry) = dirs.next_entry().await? {
let file = entry.file_name();
let key = file.to_str().unwrap().split(".").next().unwrap();
let mut comp = key.split("-");

Expand All @@ -86,25 +83,28 @@ impl<I: Io> Storage for FsStorage<I> {
let end_frame: u64 = end_frame.parse().unwrap();

if start_frame <= frame_no && end_frame >= frame_no {
let file = self.io.open(false, true, true, &dir.path()).unwrap();

let len = file.len().unwrap();
#[cfg(debug_assertions)]
{
use crate::io::buf::ZeroCopyBuf;

let buf = vec![0u8; len as usize];
let (mut buf, res) = file.read_exact_at_async(buf, 0).await;
res.unwrap();
let header_buf = ZeroCopyBuf::<CompactedSegmentDataHeader>::new_uninit();
let file = self
.io
.open(false, true, false, dbg!(&entry.path()))
.unwrap();
let (header_buf, res) = file.read_exact_at_async(header_buf, 0).await;
res.unwrap();

// Assert the header from the segment matches the key in its path
let header = SegmentHeader::ref_from_prefix(&buf[..]).unwrap();
let start_frame_from_header = header.start_frame_no.get();
let end_frame_from_header = header.last_commited_frame_no.get();
let header = header_buf.get_ref();
let start_frame_from_header = header.start_frame_no.get();
let end_frame_from_header = header.end_frame_no.get();

// TOOD(lucio): convert these into errors before prod
assert_eq!(start_frame, start_frame_from_header);
assert_eq!(end_frame, end_frame_from_header);
// TOOD(lucio): convert these into errors before prod
assert_eq!(start_frame, start_frame_from_header);
assert_eq!(end_frame, end_frame_from_header);
}

tokio::pin!(dest);
dest.write_all(&mut buf[..]).await.unwrap();
self.io.hard_link(&entry.path(), dest_path)?;

return Ok(());
}
Expand All @@ -128,35 +128,29 @@ impl<I: Io> Storage for FsStorage<I> {

#[cfg(test)]
mod tests {
use std::io::Read;

use chrono::Utc;
use tempfile::tempdir;
use uuid::Uuid;
use zerocopy::AsBytes;
use zerocopy::{AsBytes, FromZeroes};

use super::*;
use crate::{bottomless::Storage, io::StdIO};

#[tokio::test]
async fn read_write() {
let temp_dir = Uuid::new_v4().to_string();
let dir = std::env::temp_dir().join(temp_dir);

let fs = FsStorage::new(dir, StdIO::default()).unwrap();
let dir = tempdir().unwrap();
let fs = FsStorage::new(dir.path().into(), StdIO::default()).unwrap();

let namespace = NamespaceName::from_string("".into());
let mut segment = vec![0u8; 4096];

let header = SegmentHeader {
let segment = CompactedSegmentDataHeader {
start_frame_no: 0.into(),
last_commited_frame_no: 64.into(),
db_size: 0.into(),
index_offset: 0.into(),
index_size: 0.into(),
header_cheksum: 0.into(),
flags: 0.into(),
frame_count: 10.into(),
segment_id: 0.into(),
end_frame_no: 64.into(),
};

header.write_to_prefix(&mut segment[..]);

fs.store(
&(),
crate::bottomless::storage::SegmentMeta {
Expand All @@ -166,15 +160,23 @@ mod tests {
end_frame_no: 64,
created_at: Utc::now(),
},
segment,
segment.as_bytes().to_vec(),
Vec::new(),
)
.await
.unwrap();

let mut dest = Vec::new();
fs.fetch_segment(&(), namespace.clone(), 5, &mut dest)
let path = dir.path().join("fetched_segment");
fs.fetch_segment(&(), namespace.clone(), 5, &path)
.await
.unwrap();

let mut file = std::fs::File::open(path).unwrap();
let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed();

file.read_exact(header.as_bytes_mut()).unwrap();

assert_eq!(header.start_frame_no.get(), 0);
assert_eq!(header.end_frame_no.get(), 64);
}
}
7 changes: 4 additions & 3 deletions libsql-wal/src/bottomless/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::future::Future;
use std::path::Path;
use std::sync::Arc;

use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -53,7 +54,7 @@ pub trait Storage: Send + Sync + 'static {
_config: &Self::Config,
_namespace: NamespaceName,
_frame_no: u64,
_dest: impl AsyncWrite,
_dest_path: &Path,
) -> Result<()>;

/// Fetch meta for `namespace`
Expand Down Expand Up @@ -107,10 +108,10 @@ impl<T: Storage> Storage for Arc<T> {
config: &Self::Config,
namespace: NamespaceName,
frame_no: u64,
dest: impl AsyncWrite,
dest_path: &Path,
) -> Result<()> {
self.as_ref()
.fetch_segment(config, namespace, frame_no, dest)
.fetch_segment(config, namespace, frame_no, dest_path)
.await
}

Expand Down
4 changes: 2 additions & 2 deletions libsql-wal/src/bottomless/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! S3 implementation of storage

use std::future::Future;
use std::{future::Future, path::Path};

use super::Storage;
use crate::{bottomless::Result, io::file::FileExt, name::NamespaceName};
Expand All @@ -17,7 +17,7 @@ impl Storage for S3Storage {
_config: &Self::Config,
_namespace: NamespaceName,
_frame_no: u64,
_dest: impl tokio::io::AsyncWrite,
_dest: &Path,
) -> Result<()> {
todo!()
}
Expand Down
31 changes: 23 additions & 8 deletions libsql-wal/src/io/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,32 @@ impl FileExt for Vec<u8> {
mut buf: B,
offset: u64,
) -> impl Future<Output = (B, Result<()>)> + Send {
let slice = &self[offset as usize..];
async move {
let slice = &self[offset as usize..];

if slice.len() < buf.bytes_total() {
return (
buf,
Err(io::Error::new(ErrorKind::UnexpectedEof, "early eof")),
);
}

let chunk = unsafe {
let len = buf.bytes_total();
let ptr = buf.stable_mut_ptr();
std::slice::from_raw_parts_mut(ptr, len)
};

let chunk = unsafe {
let len = buf.bytes_total();
let ptr = buf.stable_mut_ptr();
std::slice::from_raw_parts_mut(ptr, len)
};
debug_assert_eq!(chunk.len(), slice.len());

chunk.clone_from_slice(slice);
chunk.clone_from_slice(slice);

async move { (buf, Ok(())) }
unsafe {
buf.set_init(chunk.len());
}

(buf, Ok(()))
}
}

async fn write_all_at_async<B: IoBuf + Send + 'static>(
Expand Down
9 changes: 9 additions & 0 deletions libsql-wal/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub trait Io: Send + Sync + 'static {
fn tempfile(&self) -> io::Result<Self::TempFile>;
fn now(&self) -> DateTime<Utc>;
fn uuid(&self) -> Uuid;
fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()>;
}

#[derive(Default, Debug, Clone, Copy)]
Expand Down Expand Up @@ -65,6 +66,10 @@ impl Io for StdIO {
fn uuid(&self) -> Uuid {
Uuid::new_v4()
}

fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()> {
std::fs::hard_link(src, dst)
}
}

impl<T: Io> Io for Arc<T> {
Expand Down Expand Up @@ -96,4 +101,8 @@ impl<T: Io> Io for Arc<T> {
fn uuid(&self) -> Uuid {
self.as_ref().uuid()
}

fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()> {
self.as_ref().hard_link(src, dst)
}
}
4 changes: 4 additions & 0 deletions libsql-wal/tests/flaky_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl Io for FlakyIo {
fn uuid(&self) -> uuid::Uuid {
todo!()
}

fn hard_link(&self, _src: &Path, _dst: &Path) -> std::io::Result<()> {
todo!()
}
}

macro_rules! assert_not_corrupt {
Expand Down

0 comments on commit cd586ed

Please sign in to comment.