Skip to content

Commit

Permalink
introduce iovec
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Oct 20, 2020
1 parent b23f403 commit 29c68df
Show file tree
Hide file tree
Showing 8 changed files with 647 additions and 185 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ fxhash = "0.2"
nix = "0.18.0"
crossbeam = "0.7"
thiserror = "1.0"
libc = "0.2"

[dev-dependencies]
raft = { git = "https://github.com/tikv/raft-rs", branch = "master", default-features = false, features = ["protobuf-codec"] }
tempfile = "3.1"
toml = "0.5"
ctor = "0.1"
env_logger = "0.8"

[patch.crates-io]
protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev = "ec745253ab847481647887bc4c7cac3949449cfe" }
protobuf = { git = "https://github.com/pingcap/rust-protobuf", rev = "ac10abf324a6f2b3e19e10f82b568a293ca5bd3d" }
176 changes: 176 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::i32;
use std::ptr::{copy_nonoverlapping, read_unaligned};

use libc::c_int;
use lz4_sys::{
LZ4StreamEncode, LZ4_compressBound, LZ4_compress_default, LZ4_createStreamDecode,
LZ4_decompress_safe, LZ4_decompress_safe_continue, LZ4_freeStreamDecode,
};

// Layout of single block compression:
// header + decoded_size + content + cap(tail).
pub fn encode_block(src: &[u8], head_reserve: usize, tail_alloc: usize) -> Vec<u8> {
unsafe {
let bound = LZ4_compressBound(src.len() as i32);
assert!(bound > 0 && src.len() <= i32::MAX as usize);

let capacity = head_reserve + 4 + bound as usize + tail_alloc;
let mut output: Vec<u8> = Vec::with_capacity(capacity);

let le_len = src.len().to_le_bytes();
copy_nonoverlapping(le_len.as_ptr(), output.as_mut_ptr().add(head_reserve), 4);

let size = LZ4_compress_default(
src.as_ptr() as _,
output.as_mut_ptr().add(head_reserve + 4) as _,
src.len() as i32,
bound,
);
assert!(size > 0);
output.set_len(head_reserve + 4 + size as usize);
output
}
}

pub fn decode_block(src: &[u8]) -> Vec<u8> {
assert!(src.len() > 4, "data is too short: {} <= 4", src.len());
unsafe {
let len = u32::from_le(read_unaligned(src.as_ptr() as *const u32));
let mut dst = Vec::with_capacity(len as usize);
let l = LZ4_decompress_safe(
src.as_ptr().add(4) as _,
dst.as_mut_ptr() as _,
src.len() as i32 - 4,
dst.capacity() as i32,
);
assert_eq!(l, len as i32);
dst.set_len(l as usize);
dst
}
}

// Layout of multi blocks compression:
// header + decoded_size + vec[encoded_len_and_content] + cap(tail).
pub fn encode_blocks<'a, F, I>(inputs: F, head_reserve: usize, tail_alloc: usize) -> Vec<u8>
where
F: Fn() -> I,
I: Iterator<Item = &'a [u8]>,
{
let (mut encoded_len, mut decoded_len) = (0, 0u64);
for buffer in inputs() {
let len = buffer.len();
decoded_len += len as u64;
let size = unsafe { lz4_sys::LZ4_compressBound(len as i32) };
assert!(size > 0);
encoded_len += (4 + size) as usize; // Length and content.
}

let capacity = head_reserve + 8 + encoded_len + tail_alloc;
let mut output: Vec<u8> = Vec::with_capacity(capacity);
unsafe {
copy_nonoverlapping(
decoded_len.to_le_bytes().as_ptr(),
output.as_mut_ptr().add(head_reserve),
8,
);

let (stream, mut offset) = (lz4_sys::LZ4_createStream(), head_reserve + 8);
for buffer in inputs() {
let bytes = LZ4_compress_fast_continue(
stream,
buffer.as_ptr() as _,
output.as_mut_ptr().add(offset + 4),
buffer.len() as i32,
(capacity - offset) as i32,
1, /* acceleration */
);
assert!(bytes > 0);
copy_nonoverlapping(
(bytes as u32).to_le_bytes().as_ptr(),
output.as_mut_ptr().add(offset),
4,
);
offset += (bytes + 4) as usize;
}

lz4_sys::LZ4_freeStream(stream);
output.set_len(offset);
}
output
}

pub fn decode_blocks(mut src: &[u8]) -> Vec<u8> {
assert!(src.len() > 8, "data is too short: {} <= 8", src.len());
unsafe {
let decoded_len = u64::from_le(read_unaligned(src.as_ptr() as *const u64));
let mut dst: Vec<u8> = Vec::with_capacity(decoded_len as usize);
src = &src[8..];

let (decoder, mut offset) = (LZ4_createStreamDecode(), 0);
while !src.is_empty() {
let len = u32::from_le(read_unaligned(src.as_ptr() as *const u32));
let bytes = LZ4_decompress_safe_continue(
decoder,
src.as_ptr().add(4) as _,
dst.as_mut_ptr().add(offset) as _,
len as i32,
(dst.capacity() - offset) as i32,
);
assert!(bytes >= 0);
offset += bytes as usize;
src = &src[(4 + len as usize)..];
}
LZ4_freeStreamDecode(decoder);
assert_eq!(offset, decoded_len as usize);
dst.set_len(offset);
dst
}
}

extern "C" {
// It's not in lz4_sys.
fn LZ4_compress_fast_continue(
LZ4_stream: *mut LZ4StreamEncode,
source: *const u8,
dest: *mut u8,
input_size: c_int,
dest_capacity: c_int,
acceleration: c_int,
) -> c_int;
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_basic() {
let data: Vec<&'static [u8]> = vec![b"", b"123", b"12345678910"];
for d in data {
let compressed = encode_block(d, 0, 0);
assert!(compressed.len() > 4);
let res = decode_block(&compressed);
assert_eq!(res, d);
}
}

#[test]
fn test_blocks() {
let raw_inputs = vec![
b"".to_vec(),
b"123".to_vec(),
b"12345678910".to_vec(),
vec![b'x'; 99999],
vec![0; 33333],
];

let mut input = Vec::with_capacity(raw_inputs.iter().map(|x| x.len()).sum());
for x in &raw_inputs {
input.extend_from_slice(x);
}

let encoded = encode_blocks(|| raw_inputs.iter().map(|x| x.as_slice()), 0, 0);
let decoded = decode_blocks(&encoded);
assert_eq!(input, decoded);
}
}
17 changes: 11 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use protobuf::Message;
use crate::cache_evict::{
CacheSubmitor, CacheTask, Runner as CacheEvictRunner, DEFAULT_CACHE_CHUNK_SIZE,
};
use crate::compression::{decode_block, decode_blocks};
use crate::config::{Config, RecoveryMode};
use crate::log_batch::{
self, Command, CompressionType, EntryExt, LogBatch, LogItemContent, OpType, CHECKSUM_LEN,
Expand Down Expand Up @@ -520,15 +521,19 @@ where
let offset = base_offset + offset;
pipe_log.fread(queue, file_num, offset, len)?
}
CompressionType::Lz4 => {
let read_len = batch_len + HEADER_LEN as u64;
c_type @ CompressionType::Lz4 | c_type @ CompressionType::Lz4Blocks => {
let read_len = batch_len + HEADER_LEN as u64 + CHECKSUM_LEN as u64;
let compressed = pipe_log.fread(queue, file_num, base_offset, read_len)?;
let mut reader = compressed.as_ref();
log_batch::test_batch_checksum(compressed.as_slice())?;

let mut reader = compressed.as_slice();
let header = codec::decode_u64(&mut reader)?;
assert_eq!(header >> 8, batch_len);

log_batch::test_batch_checksum(reader)?;
let buf = log_batch::decompress(&reader[..batch_len as usize - CHECKSUM_LEN]);
let buf = match c_type {
CompressionType::Lz4 => decode_block(&reader[..batch_len as usize]),
CompressionType::Lz4Blocks => decode_blocks(&reader[..batch_len as usize]),
_ => unreachable!(),
};
let start = offset as usize - HEADER_LEN;
let end = (offset + len) as usize - HEADER_LEN;
buf[start..end].to_vec()
Expand Down

0 comments on commit 29c68df

Please sign in to comment.