Skip to content

Commit

Permalink
Create SSTables struct to encapsulate SSTable management
Browse files Browse the repository at this point in the history
  • Loading branch information
mikerhodes committed Apr 24, 2024
1 parent c5f879b commit 1c2364c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 26 deletions.
25 changes: 6 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};
use std::{collections::BTreeMap, path::Path};

use sstable::SSTableFileReader;
use sstable::SSTables;
use wal::WAL;

mod kvrecord;
Expand Down Expand Up @@ -39,10 +36,9 @@ pub struct ToyKVMetrics {

pub struct ToyKV {
/// d is the folder that the KV store owns.
d: PathBuf,
memtable: BTreeMap<Vec<u8>, Vec<u8>>,
wal: WAL,
sstables: SSTableFileReader,
sstables: SSTables,
pub metrics: ToyKVMetrics,
}

Expand All @@ -57,9 +53,8 @@ pub fn with_sync(d: &Path, sync: WALSync) -> Result<ToyKV, ToyKVError> {

let mut wal = wal::new(d, sync);
let memtable = wal.replay()?;
let sstables = sstable::new_reader(d)?;
let sstables = sstable::new_sstables(d)?;
Ok(ToyKV {
d: d.to_path_buf(),
memtable,
wal,
sstables,
Expand All @@ -71,23 +66,15 @@ pub fn with_sync(d: &Path, sync: WALSync) -> Result<ToyKV, ToyKVError> {
const WAL_WRITE_THRESHOLD: u32 = 1000;

impl ToyKV {
/// Sets key k to v.
/// Set key k to v.
pub fn set(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<(), ToyKVError> {
self.wal.write(&k, &v)?;
self.memtable.insert(k, v);
if self.wal.wal_writes >= WAL_WRITE_THRESHOLD {
let mut sst = sstable::new_writer(self.d.as_path())?;
for entry in &self.memtable {
sst.write(entry.0, entry.1)?;
}
sst.finalise()?;
self.sstables.write_new_sstable(&self.memtable)?;
self.wal.reset()?;
self.memtable.clear();
self.metrics.sst_flushes += 1;

// Need a new reader to regenerate the file list to
// read from.
self.sstables = sstable::new_reader(self.d.as_path())?;
}
self.metrics.writes += 1;
Ok(())
Expand Down
58 changes: 51 additions & 7 deletions src/sstable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
collections::BTreeMap,
fs::{self, DirEntry, File, OpenOptions},
io::{BufReader, Error, Seek, SeekFrom, Write},
path::{Path, PathBuf},
Expand Down Expand Up @@ -34,6 +35,49 @@ use crate::kvrecord::{KVRecord, KVWriteRecord};
/// have exclusive use of a dir, whereas multiple readers
/// can safely access concurrently.

/// Manage and search a set of SSTable files on disk.
pub(crate) struct SSTables {
d: PathBuf,
sstables: SSTableFileReader,
}

/// Create a new SSTables whose files live in the directory d.
pub(crate) fn new_sstables(d: &Path) -> Result<SSTables, Error> {
let sstables = new_reader(d)?;
Ok(SSTables {
d: d.to_path_buf(),
sstables,
})
}

impl SSTables {
/// Write a new SSTable to the set managed by this
/// SSTables. After this method returns, the contents
/// of the memtable are durable on disk and are used
/// by future calls to `get`.
pub(crate) fn write_new_sstable(
&mut self,
memtable: &BTreeMap<Vec<u8>, Vec<u8>>,
) -> Result<(), Error> {
let mut sst = new_writer(self.d.as_path())?;
for entry in memtable {
sst.write(entry.0, entry.1)?;
}
sst.finalise()?;

// Need a new reader to regenerate the file list to
// read from.
self.sstables = new_reader(self.d.as_path())?;
Ok(())
}

/// Retrieve the latest value for `k` in the on disk
/// set of sstables.
pub(crate) fn get(&mut self, k: &[u8]) -> Result<Option<Vec<u8>>, Error> {
self.sstables.get(k)
}
}

/// Return the SSTables in dir, ordered newest to oldest.
///
/// The ordering is such that a get for a key should proceed
Expand Down Expand Up @@ -133,14 +177,14 @@ mod tests_sorted_sstable_files {
}

/// Provides methods to write already-sorted KVRecords to an on-disk file.
pub(crate) struct SSTableFileWriter {
struct SSTableFileWriter {
#[allow(dead_code)] // used in tests to check filepath
p: PathBuf,
f: File,
start: Instant,
}
/// Start writing a new immutable file in the dir path.
pub(crate) fn new_writer(dir: &Path) -> Result<SSTableFileWriter, Error> {
fn new_writer(dir: &Path) -> Result<SSTableFileWriter, Error> {
// So basically here we have to read 00000001.data etc and find the
// greatest number we have so far, then create a path that is inc that
// number by one.
Expand Down Expand Up @@ -186,15 +230,15 @@ impl SSTableFileWriter {
/// It is assumed that successive calls to write will always use
/// increasing keys; that is, SSTableFileWriter does not sort keys
/// itself.
pub(crate) fn write(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
fn write(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
let buf = KVWriteRecord { key, value }.serialize();
self.f.write_all(&buf)?;
Ok(())
}

/// Perform the final writes to the file. After this the writer
/// can be safely dropped.
pub(crate) fn finalise(&mut self) -> Result<(), Error> {
fn finalise(&mut self) -> Result<(), Error> {
self.f.flush()?;
self.f.sync_all()?;
let elapsed_time = self.start.elapsed();
Expand Down Expand Up @@ -231,13 +275,13 @@ mod tests_writer {
}

/// Iterate entries in an on-disk SSTable.
pub(crate) struct SSTableFileReader {
struct SSTableFileReader {
tables: Vec<BufReader<File>>,
}

/// Create a new SSTableFileReader that is able to search
/// for keys in the set of SSTables managed within a folder.
pub(crate) fn new_reader(dir: &Path) -> Result<SSTableFileReader, Error> {
fn new_reader(dir: &Path) -> Result<SSTableFileReader, Error> {
let files = sorted_sstable_files(dir)?;
let mut tables: Vec<BufReader<File>> = vec![];
for p in files {
Expand All @@ -250,7 +294,7 @@ pub(crate) fn new_reader(dir: &Path) -> Result<SSTableFileReader, Error> {
impl SSTableFileReader {
/// Search through the SSTables available to this reader for
/// a key. Return an Option with its value.
pub(crate) fn get(&mut self, k: &[u8]) -> Result<Option<Vec<u8>>, Error> {
fn get(&mut self, k: &[u8]) -> Result<Option<Vec<u8>>, Error> {
// self.tables is in the right order for scanning the sstables
// on disk. Read each until we find k. If we fall off the end,
// return None.
Expand Down

0 comments on commit 1c2364c

Please sign in to comment.