Skip to content

Commit

Permalink
feat(parallel): add feature to enable/disable parallel processing
Browse files Browse the repository at this point in the history
  • Loading branch information
uhmarcel committed Nov 20, 2022
1 parent f9d3ee4 commit a78a8d4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 30 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"

[dependencies]
clap = { version = "4.0", features = ["derive"], optional = true }
rayon = "1.6.0"
rayon = { version = "1.6.0", optional = true }

[dev-dependencies]
assert_cmd = "2.0"
Expand All @@ -23,8 +23,9 @@ criterion = "0.4.0"
rand = { version = "0.8.5", features = ["small_rng"] }

[features]
default = ["cli"]
default = ["cli", "parallel"]
cli = ["clap"]
parallel = ["rayon"]

[[bin]]
name = "rbase64"
Expand Down
1 change: 1 addition & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub const DEC_CHUNK_SIZE: usize = 2;
pub const ENC_U128_OFFSET: usize = (ENC_CHUNK_SIZE * 3 - 1) * 8;
pub const DEC_U64_OFFSET: usize = (DEC_CHUNK_SIZE * 4 - 1) * 6;

#[cfg(feature = "parallel")]
pub const PARALLEL_THRESHOLD_BYTES: usize = 2 << 16; // 128 KiB

const fn construct_decode_map() -> [u8; 256] {
Expand Down
22 changes: 20 additions & 2 deletions src/decode.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
use crate::common::*;
use rayon::prelude::*;
use std::iter::zip;

#[inline(always)]
#[cfg(not(feature = "parallel"))]
pub(crate) fn decode_u64_chunks(input: &[u8], buffer: &mut [u8], total_chunks: usize) {
decode_u64_chunks_sync(input, buffer, total_chunks);
}

#[inline(always)]
#[cfg(feature = "parallel")]
pub(crate) fn decode_u64_chunks(input: &[u8], buffer: &mut [u8], total_chunks: usize) {
if input.len() < PARALLEL_THRESHOLD_BYTES {
decode_u64_chunks_sync(input, buffer, total_chunks);
} else {
decode_u64_chunks_parallel(input, buffer, total_chunks);
};
}

#[inline(always)]
fn decode_u64_chunks_sync(input: &[u8], buffer: &mut [u8], total_chunks: usize) {
let in_chunks = input.chunks_exact(DEC_CHUNK_SIZE * 4);
let out_chunks = buffer.chunks_exact_mut(DEC_CHUNK_SIZE * 3);

Expand All @@ -13,7 +28,10 @@ pub(crate) fn decode_u64_chunks(input: &[u8], buffer: &mut [u8], total_chunks: u
}

#[inline(always)]
pub(crate) fn decode_u64_chunks_parallel(input: &[u8], buffer: &mut [u8], total_chunks: usize) {
#[cfg(feature = "parallel")]
fn decode_u64_chunks_parallel(input: &[u8], buffer: &mut [u8], total_chunks: usize) {
use rayon::prelude::*;

let in_chunks = input.par_chunks_exact(DEC_CHUNK_SIZE * 4);
let out_chunks = buffer.par_chunks_exact_mut(DEC_CHUNK_SIZE * 3);

Expand Down
42 changes: 30 additions & 12 deletions src/encode.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,60 @@
use crate::common::*;
use rayon::prelude::*;
use std::cmp::min;
use std::iter::zip;

#[inline(always)]
#[cfg(not(feature = "parallel"))]
pub(crate) fn encode_u64_chunks(input: &[u8], buffer: &mut [u8]) {
encode_u64_chunks_sync(input, buffer);
}

#[inline(always)]
#[cfg(feature = "parallel")]
pub(crate) fn encode_u64_chunks(input: &[u8], buffer: &mut [u8]) {
if input.len() < PARALLEL_THRESHOLD_BYTES {
encode_u64_chunks_sync(input, buffer);
} else {
encode_u64_chunks_parallel(input, buffer);
};
}

#[inline(always)]
fn encode_u64_chunks_sync(input: &[u8], buffer: &mut [u8]) {
let in_chunks = input.chunks_exact(ENC_CHUNK_SIZE * 3);
let out_chunks = buffer.chunks_exact_mut(ENC_CHUNK_SIZE * 4);

zip(in_chunks, out_chunks).for_each(|(in_chunk, out_chunk)| {
encode_u128(in_chunk, out_chunk);
encode_u64(in_chunk, out_chunk);
});
}

#[inline(always)]
pub(crate) fn encode_u64_chunks_parallel(input: &[u8], buffer: &mut [u8]) {
#[cfg(feature = "parallel")]
fn encode_u64_chunks_parallel(input: &[u8], buffer: &mut [u8]) {
use rayon::prelude::*;

let in_chunks = input.par_chunks_exact(ENC_CHUNK_SIZE * 3);
let out_chunks = buffer.par_chunks_exact_mut(ENC_CHUNK_SIZE * 4);

in_chunks.zip(out_chunks).for_each(|(in_chunk, out_chunk)| {
encode_u128(in_chunk, out_chunk);
encode_u64(in_chunk, out_chunk);
});
}

#[inline(always)]
pub(crate) fn encode_u128_remainder(input: &[u8], buffer: &mut [u8]) -> usize {
let in_u128 = read_u128_partial(input);
pub(crate) fn encode_u64_remainder(input: &[u8], buffer: &mut [u8]) -> usize {
let in_u64 = read_u64_partial(input);
let mut in_bits = 8 * input.len();
let mut out_index = 0;

while in_bits >= 6 {
in_bits -= 6;
buffer[out_index] = encode_byte(((in_u128 >> in_bits) & SIX_BIT_MASK) as u8);
buffer[out_index] = encode_byte(((in_u64 >> in_bits) & SIX_BIT_MASK) as u8);
out_index += 1;
}

if in_bits > 0 {
buffer[out_index] = encode_byte(((in_u128 << (6 - in_bits)) & SIX_BIT_MASK) as u8);
buffer[out_index] = encode_byte(((in_u64 << (6 - in_bits)) & SIX_BIT_MASK) as u8);
out_index += 1;
}

Expand All @@ -48,16 +66,16 @@ pub(crate) fn encode_u128_remainder(input: &[u8], buffer: &mut [u8]) -> usize {
}

#[inline(always)]
fn encode_u128(input: &[u8], buffer: &mut [u8]) {
let in_u128 = read_u128_partial(input);
fn encode_u64(input: &[u8], buffer: &mut [u8]) {
let in_u64 = read_u64_partial(input);

buffer.iter_mut().enumerate().for_each(|(i, out_b)| {
*out_b = encode_byte(((in_u128 >> (2 + ENC_U128_OFFSET - 6 * i)) & SIX_BIT_MASK) as u8);
*out_b = encode_byte(((in_u64 >> (2 + ENC_U128_OFFSET - 6 * i)) & SIX_BIT_MASK) as u8);
});
}

#[inline(always)]
fn read_u128_partial(bytes: &[u8]) -> u64 {
fn read_u64_partial(bytes: &[u8]) -> u64 {
let mut buffer = [0u8; 8];
let size = min(bytes.len(), 8);

Expand Down
18 changes: 4 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::common::*;
use crate::decode::*;
use crate::encode::*;

mod common;
mod decode;
Expand All @@ -10,13 +8,9 @@ pub fn encode(input: &[u8]) -> String {
let mut buffer = vec![0; ((input.len() / 3) + 1) * 4];
let total_chunks = input.len() / (ENC_CHUNK_SIZE * 3);

if input.len() < PARALLEL_THRESHOLD_BYTES {
encode::encode_u64_chunks(input, &mut buffer);
} else {
encode::encode_u64_chunks_parallel(input, &mut buffer);
};
encode::encode_u64_chunks(input, &mut buffer);

let bytes_rem = encode_u128_remainder(
let bytes_rem = encode::encode_u64_remainder(
&input[ENC_CHUNK_SIZE * total_chunks * 3..],
&mut buffer[ENC_CHUNK_SIZE * total_chunks * 4..],
);
Expand All @@ -36,13 +30,9 @@ pub fn decode(encoded: &str) -> Vec<u8> {
.saturating_sub(DEC_CHUNK_SIZE)
.saturating_div(DEC_CHUNK_SIZE * 4);

if input.len() < PARALLEL_THRESHOLD_BYTES {
decode_u64_chunks(input, &mut buffer, total_chunks);
} else {
decode_u64_chunks_parallel(input, &mut buffer, total_chunks);
}
decode::decode_u64_chunks(input, &mut buffer, total_chunks);

let bytes_rem = decode_u64_remainder(
let bytes_rem = decode::decode_u64_remainder(
&input[DEC_CHUNK_SIZE * total_chunks * 4..],
&mut buffer[DEC_CHUNK_SIZE * total_chunks * 3..],
);
Expand Down

0 comments on commit a78a8d4

Please sign in to comment.