Skip to content

Commit

Permalink
Cleanup.
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Feinberg <alex@strlen.net>
  • Loading branch information
afeinberg committed Jan 23, 2024
1 parent 0401d60 commit 4d5b30e
Showing 1 changed file with 11 additions and 203 deletions.
214 changes: 11 additions & 203 deletions components/region_cache_memory_engine/src/write_batch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::ops::RangeBounds;

use bytes::Bytes;
use engine_traits::{
CacheRange, Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions, CF_DEFAULT,
};
use engine_traits::{Mutable, Result, WriteBatch, WriteBatchExt, WriteOptions, CF_DEFAULT};
use tikv_util::box_err;

use crate::{
Expand All @@ -21,77 +17,24 @@ use crate::{
/// stabilizes.
type ApplyEncodedEntryCb = Box<dyn FnMut(&str, Bytes, Bytes) -> Result<()> + Send + Sync>;

/// RangeCacheWriteBatch maintains its own in-memory buffer.
pub struct RangeCacheWriteBatch {
buffer: Vec<RangeCacheWriteBatchEntry>,
apply_cb: ApplyEncodedEntryCb,
sequence_number: Option<u64>,
save_points: Vec<usize>,
}

pub struct RangeCacheWriteBatch2 {
buffer: Vec<RangeCacheWriteBatchEntry2>,
engine: RangeCacheMemoryEngine,
save_points: Vec<usize>,
sequence_number: Option<u64>,
}

impl std::fmt::Debug for RangeCacheWriteBatch2 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RangeCacheWriteBatch2")
.field("buffer", &self.buffer)
.field("save_points", &self.save_points)
.field("sequence_number", &self.sequence_number)
.finish()
}
}

impl std::fmt::Debug for RangeCacheWriteBatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RangeCacheWriteBatch")
.field("buffer", &self.buffer)
.field("save_points", &self.save_points)
.field("sequence_number", &self.sequence_number)
.finish()
}
}

impl RangeCacheWriteBatch {
pub fn new(apply_cb: ApplyEncodedEntryCb) -> Self {
Self {
buffer: Vec::new(),
apply_cb,
sequence_number: None,
save_points: Vec::new(),
}
}

pub fn with_capacity(apply_cb: ApplyEncodedEntryCb, cap: usize) -> Self {
Self {
buffer: Vec::with_capacity(cap),
apply_cb,
sequence_number: None,
save_points: Vec::new(),
}
}

/// Sets the sequence number for this batch. This should only be called
/// prior to writing the batch.
pub fn set_sequence_number(&mut self, seq: u64) -> Result<()> {
if let Some(seqno) = self.sequence_number {
return Err(box_err!("Sequence number {} already set", seqno));
};
self.sequence_number = Some(seq);
Ok(())
}

fn write_impl(&mut self, seq: u64) -> Result<()> {
self.buffer
.iter()
.map(|e| (e.cf.as_str(), e.encode(seq)))
.try_for_each(|(cf, (key, value))| (self.apply_cb)(cf, key, value))
}
}

impl From<&RangeCacheMemoryEngine> for RangeCacheWriteBatch2 {
impl From<&RangeCacheMemoryEngine> for RangeCacheWriteBatch {
fn from(engine: &RangeCacheMemoryEngine) -> Self {
Self {
buffer: Vec::new(),
Expand All @@ -102,7 +45,7 @@ impl From<&RangeCacheMemoryEngine> for RangeCacheWriteBatch2 {
}
}

impl RangeCacheWriteBatch2 {
impl RangeCacheWriteBatch {
pub fn with_capacity(engine: &RangeCacheMemoryEngine, cap: usize) -> Self {
Self {
buffer: Vec::with_capacity(cap),
Expand Down Expand Up @@ -170,46 +113,12 @@ impl WriteBatchEntryInternal {

#[derive(Clone, Debug)]
struct RangeCacheWriteBatchEntry {
cf: String,
key: Bytes,
mutation: WriteBatchEntryInternal,
}

impl RangeCacheWriteBatchEntry {
pub fn put_value(cf: &str, key: &[u8], value: &[u8]) -> Self {
Self {
cf: cf.to_owned(),
key: Bytes::copy_from_slice(key),
mutation: WriteBatchEntryInternal::PutValue(Bytes::copy_from_slice(value)),
}
}

pub fn deletion(cf: &str, key: &[u8]) -> Self {
Self {
cf: cf.to_owned(),
key: Bytes::copy_from_slice(key),
mutation: WriteBatchEntryInternal::Deletion,
}
}

#[inline]
pub fn encode(&self, seq: u64) -> (Bytes, Bytes) {
self.mutation.encode(&self.key, seq)
}

pub fn data_size(&self) -> usize {
self.key.len() + std::mem::size_of::<u64>() + self.mutation.data_size()
}
}

#[derive(Clone, Debug)]
struct RangeCacheWriteBatchEntry2 {
cf: usize,
key: Bytes,
inner: WriteBatchEntryInternal,
}

impl RangeCacheWriteBatchEntry2 {
impl RangeCacheWriteBatchEntry {
pub fn put_value(cf: &str, key: &[u8], value: &[u8]) -> Self {
Self {
cf: cf_to_id(cf),
Expand Down Expand Up @@ -256,31 +165,19 @@ impl RangeCacheMemoryEngine {
}
}

impl From<&SkiplistEngine> for RangeCacheWriteBatch {
fn from(engine: &SkiplistEngine) -> Self {
let engine_clone = engine.clone();
let apply_cb = Box::new(move |cf: &'_ str, key, value| {
engine_clone.data[cf_to_id(cf)].put(key, value);
Ok(())
});
RangeCacheWriteBatch::new(apply_cb)
}
}

impl WriteBatchExt for RangeCacheMemoryEngine {
type WriteBatch = RangeCacheWriteBatch;
// todo: adjust it
const WRITE_BATCH_MAX_KEYS: usize = 256;

fn write_batch(&self) -> Self::WriteBatch {
RangeCacheWriteBatch::new(self.apply_cb())
RangeCacheWriteBatch::from(self)
}

fn write_batch_with_cap(&self, cap: usize) -> Self::WriteBatch {
RangeCacheWriteBatch::with_capacity(self.apply_cb(), cap)
RangeCacheWriteBatch::with_capacity(self, cap)
}
}

impl WriteBatch for RangeCacheWriteBatch {
fn write_opt(&mut self, _: &WriteOptions) -> Result<u64> {
self.sequence_number
Expand Down Expand Up @@ -370,95 +267,6 @@ impl Mutable for RangeCacheWriteBatch {
}
}

impl WriteBatch for RangeCacheWriteBatch2 {
fn write_opt(&mut self, _: &WriteOptions) -> Result<u64> {
self.sequence_number
.map(|seq| self.write_impl(seq).map(|()| seq))
.transpose()
.map(|o| o.ok_or_else(|| box_err!("sequence_number must be set!")))?
}

fn data_size(&self) -> usize {
self.buffer
.iter()
.map(RangeCacheWriteBatchEntry2::data_size)
.sum()
}

fn count(&self) -> usize {
self.buffer.len()
}

fn is_empty(&self) -> bool {
self.buffer.is_empty()
}

fn should_write_to_engine(&self) -> bool {
unimplemented!()
}

fn clear(&mut self) {
self.buffer.clear();
self.save_points.clear();
_ = self.sequence_number.take();
}

fn set_save_point(&mut self) {
self.save_points.push(self.buffer.len())
}

fn pop_save_point(&mut self) -> Result<()> {
self.save_points
.pop()
.map(|_| ())
.ok_or_else(|| box_err!("no save points available"))
}

fn rollback_to_save_point(&mut self) -> Result<()> {
self.save_points
.pop()
.map(|sp| {
self.buffer.truncate(sp);
})
.ok_or_else(|| box_err!("no save point available!"))
}

fn merge(&mut self, mut other: Self) -> Result<()> {
self.buffer.append(&mut other.buffer);
Ok(())
}
}

impl Mutable for RangeCacheWriteBatch2 {
fn put(&mut self, key: &[u8], val: &[u8]) -> Result<()> {
self.put_cf(CF_DEFAULT, key, val)
}

fn put_cf(&mut self, cf: &str, key: &[u8], val: &[u8]) -> Result<()> {
self.buffer
.push(RangeCacheWriteBatchEntry2::put_value(cf, key, val));
Ok(())
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
self.delete_cf(CF_DEFAULT, key)
}

fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> {
self.buffer
.push(RangeCacheWriteBatchEntry2::deletion(cf, key));
Ok(())
}

fn delete_range(&mut self, _: &[u8], _: &[u8]) -> Result<()> {
unimplemented!()
}

fn delete_range_cf(&mut self, _: &str, _: &[u8], _: &[u8]) -> Result<()> {
unimplemented!()
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -477,7 +285,7 @@ mod tests {
core.mut_range_manager().set_range_readable(&r, true);
core.mut_range_manager().set_safe_ts(&r, 10);
}
let mut wb = RangeCacheWriteBatch2::from(&engine);
let mut wb = RangeCacheWriteBatch::from(&engine);
wb.put(b"aaa", b"bbb").unwrap();
wb.set_sequence_number(1).unwrap();
assert_eq!(wb.write().unwrap(), 1);
Expand All @@ -496,7 +304,7 @@ mod tests {
core.mut_range_manager().set_range_readable(&r, true);
core.mut_range_manager().set_safe_ts(&r, 10);
}
let mut wb = RangeCacheWriteBatch2::from(&engine);
let mut wb = RangeCacheWriteBatch::from(&engine);
wb.put(b"aaa", b"bbb").unwrap();
wb.set_save_point();
wb.put(b"aaa", b"ccc").unwrap();
Expand All @@ -520,7 +328,7 @@ mod tests {
core.mut_range_manager().set_range_readable(&r, true);
core.mut_range_manager().set_safe_ts(&r, 10);
}
let mut wb = RangeCacheWriteBatch2::from(&engine);
let mut wb = RangeCacheWriteBatch::from(&engine);
wb.put(b"aaa", b"bbb").unwrap();
wb.set_sequence_number(1).unwrap();
_ = wb.write().unwrap();
Expand Down

0 comments on commit 4d5b30e

Please sign in to comment.