Skip to content

Commit

Permalink
unify asyncIO & syncIO.
Browse files Browse the repository at this point in the history
Signed-off-by: root <1019495690@qq.com>
  • Loading branch information
ustc-wxy committed Mar 25, 2023
1 parent 198ea08 commit a732d52
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 116 deletions.
155 changes: 57 additions & 98 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,49 +301,34 @@ where
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity((end - begin) as usize);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}
Ok(0)
}

pub fn fetch_entries_to_aio<M: Message + MessageExt<Entry = M>>(
&self,
region_id: u64,
begin: u64,
end: u64,
max_size: Option<usize>,
vec: &mut Vec<M::Entry>,
) -> Result<usize> {
let _t = StopWatch::new(&*ENGINE_READ_ENTRY_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
let length = (end - begin) as usize;
let mut ents_idx: Vec<EntryIndex> = Vec::with_capacity(length);
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;

let mut blocks: Vec<FileBlockHandle> = Vec::new();
let mut total_bytes = 0;
for (t, i) in ents_idx.iter().enumerate() {
if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) {
blocks.push(i.entries.unwrap());
total_bytes += i.entries.unwrap().len;
}
}

if blocks.len() > 5 && total_bytes > 1024 * 1024 {
//Async IO
let bytes = self.pipe_log.async_read_bytes(blocks)?;
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec)?;
} else {
//Sync IO
for i in ents_idx.iter() {
vec.push(read_entry_from_file::<M, _>(self.pipe_log.as_ref(), i)?);
}
}
let bytes = self.pipe_log.async_read_bytes(blocks)?;
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec)?;

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);

return Ok(ents_idx.len());
}

Ok(0)
}

Expand Down Expand Up @@ -578,32 +563,39 @@ impl BlockCache {
thread_local! {
static BLOCK_CACHE: BlockCache = BlockCache::new();
}

pub(crate) fn parse_entries_from_bytes<M: MessageExt>(
bytes: Vec<Vec<u8>>,
ents_idx: &mut [EntryIndex],
vec: &mut Vec<M::Entry>,
) -> Result<()> {
let mut decode_buf = vec![];
let mut seq: i32 = -1;
for (t, idx) in ents_idx.iter().enumerate() {
decode_buf =
match t == 0 || ents_idx[t - 1].entries.unwrap() != ents_idx[t].entries.unwrap() {
true => {
seq += 1;
bytes[seq as usize].to_vec()
}
false => decode_buf,
};
vec.push(parse_from_bytes(
&LogBatch::decode_entries_block(
&decode_buf,
idx.entries.unwrap(),
idx.compression_type,
)?[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)?);
let mut seq = 0;
for idx in ents_idx {
BLOCK_CACHE.with(|cache| {
if cache.key.get() != idx.entries.unwrap() {
cache.insert(
idx.entries.unwrap(),
LogBatch::decode_entries_block(
&bytes[seq],
idx.entries.unwrap(),
idx.compression_type,
)
.unwrap(),
);
seq += 1;
}
let e = parse_from_bytes(
&cache.block.borrow()
[idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize],
)
.unwrap();
assert_eq!(M::index(&e), idx.index);
vec.push(e);
});
}
Ok(())
}

pub(crate) fn read_entry_from_file<M, P>(pipe_log: &P, idx: &EntryIndex) -> Result<M::Entry>
where
M: MessageExt,
Expand Down Expand Up @@ -742,41 +734,6 @@ mod tests {
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}
fn scan_entries_aio<FR: Fn(u64, LogQueue, &[u8])>(
&self,
rid: u64,
start: u64,
end: u64,
reader: FR,
) {
let mut entries = Vec::new();
self.fetch_entries_to_aio::<Entry>(
rid,
self.first_index(rid).unwrap(),
self.last_index(rid).unwrap() + 1,
None,
&mut entries,
)
.unwrap();
assert_eq!(entries.len(), (end - start) as usize);
assert_eq!(entries.first().unwrap().index, start);
assert_eq!(
entries.last().unwrap().index,
self.decode_last_index(rid).unwrap()
);
assert_eq!(entries.last().unwrap().index + 1, end);
for e in entries.iter() {
let entry_index = self
.memtables
.get(rid)
.unwrap()
.read()
.get_entry(e.index)
.unwrap();
assert_eq!(&self.get_entry::<Entry>(rid, e.index).unwrap().unwrap(), e);
reader(e.index, entry_index.entries.unwrap().id.queue, &e.data);
}
}

fn file_count(&self, queue: Option<LogQueue>) -> usize {
if let Some(queue) = queue {
Expand Down Expand Up @@ -854,12 +811,12 @@ mod tests {
}

#[test]
fn test_async_get_entry() {
let normal_batch_size = 10;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size, compressed_batch_size] {
fn test_multi_read_entry() {
let sync_batch_size = 1024;
let async_batch_size = 1024 * 1024;
for &entry_size in &[sync_batch_size, async_batch_size] {
let dir = tempfile::Builder::new()
.prefix("test_get_entry")
.prefix("test_multi_read_entry")
.tempdir()
.unwrap();
let cfg = Config {
Expand All @@ -875,15 +832,17 @@ mod tests {
.unwrap();
assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];
for i in 10..20 {
let rid = i;
let index = i;
engine.append(rid, index, index + 2, Some(&data));

for i in 0..10 {
for rid in 10..20 {
let index = i + rid;
engine.append(rid, index, index + 1, Some(&data));
}
}
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| {
engine.scan_entries(rid, index, index + 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
Expand All @@ -894,7 +853,7 @@ mod tests {
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| {
engine.scan_entries(rid, index, index + 10, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
Expand Down Expand Up @@ -2137,18 +2096,18 @@ mod tests {
type Handle = <ObfuscatedFileSystem as FileSystem>::Handle;
type Reader = <ObfuscatedFileSystem as FileSystem>::Reader;
type Writer = <ObfuscatedFileSystem as FileSystem>::Writer;
type AsyncIoContext = <ObfuscatedFileSystem as FileSystem>::AsyncIoContext;
type MultiReadContext = <ObfuscatedFileSystem as FileSystem>::MultiReadContext;

fn async_read(
fn multi_read(
&self,
ctx: &mut Self::AsyncIoContext,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> std::io::Result<()> {
self.inner.async_read(ctx, handle, block)
self.inner.multi_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::AsyncIoContext) -> std::io::Result<Vec<Vec<u8>>> {
fn async_finish(&self, ctx: Self::MultiReadContext) -> std::io::Result<Vec<Vec<u8>>> {
self.inner.async_finish(ctx)
}

Expand Down Expand Up @@ -2214,7 +2173,7 @@ mod tests {
self.inner.new_writer(h)
}

fn new_async_io_context(&self) -> std::io::Result<Self::AsyncIoContext> {
fn new_async_io_context(&self) -> std::io::Result<Self::MultiReadContext> {
self.inner.new_async_io_context()
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/env/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ impl FileSystem for DefaultFileSystem {
type Handle = LogFd;
type Reader = LogFile;
type Writer = LogFile;
type AsyncIoContext = AioContext;
type MultiReadContext = AioContext;

fn async_read(
fn multi_read(
&self,
ctx: &mut Self::AsyncIoContext,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
Expand All @@ -298,7 +298,8 @@ impl FileSystem for DefaultFileSystem {

Ok(())
}
fn async_finish(&self, mut ctx: Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {

fn async_finish(&self, mut ctx: Self::MultiReadContext) -> IoResult<Vec<Vec<u8>>> {
for seq in 0..ctx.aio_vec.len() {
let buf_len = ctx.buf_vec[seq].len();
aio_suspend(&[&*ctx.aio_vec[seq]], None)?;
Expand Down Expand Up @@ -333,7 +334,7 @@ impl FileSystem for DefaultFileSystem {
Ok(LogFile::new(handle))
}

fn new_async_io_context(&self) -> IoResult<Self::AsyncIoContext> {
fn new_async_io_context(&self) -> IoResult<Self::MultiReadContext> {
Ok(AioContext::default())
}
}
10 changes: 5 additions & 5 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ pub trait FileSystem: Send + Sync {
type Handle: Send + Sync + Handle;
type Reader: Seek + Read + Send;
type Writer: Seek + Write + Send + WriteExt;
type AsyncIoContext;
type MultiReadContext;

fn async_read(
fn multi_read(
&self,
ctx: &mut Self::AsyncIoContext,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> Result<()>;
fn async_finish(&self, ctx: Self::AsyncIoContext) -> Result<Vec<Vec<u8>>>;
fn async_finish(&self, ctx: Self::MultiReadContext) -> Result<Vec<Vec<u8>>>;

fn create<P: AsRef<Path>>(&self, path: P) -> Result<Self::Handle>;

Expand Down Expand Up @@ -66,7 +66,7 @@ pub trait FileSystem: Send + Sync {

fn new_writer(&self, handle: Arc<Self::Handle>) -> Result<Self::Writer>;

fn new_async_io_context(&self) -> Result<Self::AsyncIoContext>;
fn new_async_io_context(&self) -> Result<Self::MultiReadContext>;
}

pub trait Handle {
Expand Down
12 changes: 6 additions & 6 deletions src/env/obfuscated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ impl FileSystem for ObfuscatedFileSystem {
type Handle = <DefaultFileSystem as FileSystem>::Handle;
type Reader = ObfuscatedReader;
type Writer = ObfuscatedWriter;
type AsyncIoContext = <DefaultFileSystem as FileSystem>::AsyncIoContext;
type MultiReadContext = <DefaultFileSystem as FileSystem>::MultiReadContext;

fn async_read(
fn multi_read(
&self,
ctx: &mut Self::AsyncIoContext,
ctx: &mut Self::MultiReadContext,
handle: Arc<Self::Handle>,
block: &FileBlockHandle,
) -> IoResult<()> {
self.inner.async_read(ctx, handle, block)
self.inner.multi_read(ctx, handle, block)
}

fn async_finish(&self, ctx: Self::AsyncIoContext) -> IoResult<Vec<Vec<u8>>> {
fn async_finish(&self, ctx: Self::MultiReadContext) -> IoResult<Vec<Vec<u8>>> {
let mut base = self.inner.async_finish(ctx).unwrap();

for v in base.iter_mut() {
Expand Down Expand Up @@ -151,7 +151,7 @@ impl FileSystem for ObfuscatedFileSystem {
Ok(ObfuscatedWriter(self.inner.new_writer(handle)?))
}

fn new_async_io_context(&self) -> IoResult<Self::AsyncIoContext> {
fn new_async_io_context(&self) -> IoResult<Self::MultiReadContext> {
self.inner.new_async_io_context()
}
}
4 changes: 2 additions & 2 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ impl<F: FileSystem> SinglePipe<F> {
reader.read(handle)
}

fn async_read(&self, ctx: &mut F::AsyncIoContext, blocks: Vec<FileBlockHandle>) {
fn async_read(&self, ctx: &mut F::MultiReadContext, blocks: Vec<FileBlockHandle>) {
for block in blocks.iter() {
let fd = self.get_fd(block.id.seq).unwrap();
self.file_system
.async_read(ctx, fd, block)
.multi_read(ctx, fd, block)
.expect("Async read failed.");
}
}
Expand Down

0 comments on commit a732d52

Please sign in to comment.