Skip to content

Commit

Permalink
Merge pull request #1615 from jqnatividad/1609-split-refactor
Browse files Browse the repository at this point in the history
`split`: refactored to actually create chunks <= desired `--kb-size`, obviating need for hacky `--sep-factor` option
  • Loading branch information
jqnatividad committed Feb 22, 2024
2 parents 336dc03 + 2c25c77 commit 3d4b81e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 39 deletions.
85 changes: 62 additions & 23 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ chunks. The number of rows in each chunk is determined by the number of records
the CSV data and the number of desired chunks. If the number of records is not evenly
divisible by the number of chunks, the last chunk will have fewer records.
When splitting by size, the CSV data is split into chunks of the given size in kilobytes.
The number of rows in each chunk may vary, but the size of each chunk will be close to the
When splitting by kb-size, the CSV data is split into chunks of the given size in kilobytes.
The number of rows in each chunk may vary, but the size of each chunk will not exceed the
desired size.
Uses multithreading to go faster if the CSV has an index when splitting by rowcount or
Expand Down Expand Up @@ -70,12 +70,8 @@ split options:
have fewer records.
-k, --kb-size <arg> The size of each chunk in kilobytes. The number of rows
in each chunk may vary, but the size of each chunk will
be close to the desired size.
not exceed the desired size.
This option is mutually exclusive with --size and --chunks.
--sep-factor <arg> The factor to use when estimating the size of the
separators (delimiters, quotes & spaces) in the CSV data
when splitting by --kb-size. This is multiplied by the
number of fields in the header. [default: 1.5]
-j, --jobs <arg> The number of splitting jobs to run in parallel.
This only works when the given CSV data has
Expand Down Expand Up @@ -111,7 +107,7 @@ use crate::{
config::{Config, Delimiter},
index::Indexed,
util::{self, FilenameTemplate},
CliResult,
CliError, CliResult,
};

#[derive(Clone, Deserialize)]
Expand All @@ -121,7 +117,6 @@ struct Args {
flag_size: usize,
flag_chunks: Option<usize>,
flag_kb_size: Option<usize>,
flag_sep_factor: f32,
flag_jobs: Option<usize>,
flag_filename: FilenameTemplate,
flag_pad: usize,
Expand All @@ -130,6 +125,8 @@ struct Args {
flag_quiet: bool,
}

static UTF8_ERROR: &str = "UTF-8 Encoding error";

pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
if args.flag_size == 0 {
Expand All @@ -154,39 +151,81 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}
}

impl From<simdutf8::basic::Utf8Error> for CliError {
fn from(_: simdutf8::basic::Utf8Error) -> Self {
CliError::Encoding(UTF8_ERROR.to_string())
}
}

impl Args {
fn split_by_kb_size(&self) -> CliResult<()> {
let rconfig = self.rconfig();
let mut rdr = rconfig.reader()?;
let headers = rdr.byte_headers()?.clone();
let num_fields = headers.len();

// estimate the size of the separators
// the sep_factor is to account for delimiters, quotes and spaces
#[allow(clippy::cast_precision_loss)]
let separators_byte_size =
((num_fields as f32 - 1.0) * self.flag_sep_factor).ceil() as usize;
let mut headerbuf_wtr = csv::WriterBuilder::new().from_writer(vec![]);

let header_byte_size = headers.as_slice().len() + separators_byte_size;
headerbuf_wtr.write_byte_record(&headers)?;
// safety: we know the inner vec is valid
let header_string =
simdutf8::basic::from_utf8(&headerbuf_wtr.into_inner().unwrap())?.to_string();
let header_byte_size = header_string.len();

// safety: we know that the flag is set
let chunk_size = self.flag_kb_size.unwrap();

let mut wtr = self.new_writer(&headers, 0, self.flag_pad)?;
let mut i = 0;
let mut num_chunks = 0;
let mut row = csv::ByteRecord::new();
let chunk_size_bytes = chunk_size * 1024;
let mut buf_curr_string = String::with_capacity(chunk_size_bytes);
let mut buf_next_string = String::with_capacity(chunk_size_bytes);
let mut chunk_size_bytes_left = chunk_size_bytes - header_byte_size;
while rdr.read_byte_record(&mut row)? {
let row_size_bytes = row.as_slice().len() + separators_byte_size;
if row_size_bytes >= chunk_size_bytes_left {

let mut not_empty = rdr.read_byte_record(&mut row)?;
let mut curr_size_bytes = buf_curr_string.len();
chunk_size_bytes_left -= curr_size_bytes;
wtr.write_byte_record(&row)?;

while not_empty {
let mut buf_curr_wtr = csv::WriterBuilder::new().from_writer(vec![]);
buf_curr_wtr.write_byte_record(&row)?;
buf_curr_string.clear();
buf_curr_string.push_str(simdutf8::basic::from_utf8(
&buf_curr_wtr
.into_inner()
.map_err(|_| CliError::Encoding(UTF8_ERROR.to_string()))?,
)?);
curr_size_bytes = buf_curr_string.len();

not_empty = rdr.read_byte_record(&mut row)?;
let next_size_bytes = if not_empty {
let mut buf_next_wtr = csv::WriterBuilder::new().from_writer(vec![]);
buf_next_wtr.write_byte_record(&row)?;
buf_next_string.clear();
buf_next_string.push_str(simdutf8::basic::from_utf8(
&buf_next_wtr
.into_inner()
.map_err(|_| CliError::Encoding(UTF8_ERROR.to_string()))?,
)?);
buf_next_string.len()
} else {
buf_next_string.clear();
0
};

if curr_size_bytes + next_size_bytes >= chunk_size_bytes_left {
wtr.flush()?;
wtr = self.new_writer(&headers, i, self.flag_pad)?;
chunk_size_bytes_left = chunk_size_bytes;
chunk_size_bytes_left = chunk_size_bytes - header_byte_size;
num_chunks += 1;
}
wtr.write_byte_record(&row)?;
chunk_size_bytes_left -= row_size_bytes;
i += 1;
if next_size_bytes > 0 {
wtr.write_byte_record(&row)?;
chunk_size_bytes_left -= curr_size_bytes;
i += 1;
}
}
wtr.flush()?;

Expand Down
34 changes: 18 additions & 16 deletions tests/test_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,15 +792,16 @@ fn split_kbsize_boston_2k() {

assert!(wrk.path("0.csv").exists());
assert!(wrk.path("11.csv").exists());
assert!(wrk.path("20.csv").exists());
assert!(wrk.path("29.csv").exists());
assert!(wrk.path("39.csv").exists());
assert!(wrk.path("48.csv").exists());
assert!(wrk.path("57.csv").exists());
assert!(wrk.path("68.csv").exists());
assert!(wrk.path("19.csv").exists());
assert!(wrk.path("27.csv").exists());
assert!(wrk.path("36.csv").exists());
assert!(wrk.path("45.csv").exists());
assert!(wrk.path("52.csv").exists());
assert!(wrk.path("61.csv").exists());
assert!(wrk.path("70.csv").exists());
assert!(wrk.path("78.csv").exists());
assert!(wrk.path("88.csv").exists());
assert!(wrk.path("98.csv").exists());
assert!(wrk.path("86.csv").exists());
assert!(wrk.path("95.csv").exists());
}

#[test]
Expand All @@ -818,13 +819,14 @@ fn split_kbsize_boston_2k_padded() {

assert!(wrk.path("testme-000.csv").exists());
assert!(wrk.path("testme-011.csv").exists());
assert!(wrk.path("testme-020.csv").exists());
assert!(wrk.path("testme-029.csv").exists());
assert!(wrk.path("testme-039.csv").exists());
assert!(wrk.path("testme-048.csv").exists());
assert!(wrk.path("testme-057.csv").exists());
assert!(wrk.path("testme-068.csv").exists());
assert!(wrk.path("testme-019.csv").exists());
assert!(wrk.path("testme-027.csv").exists());
assert!(wrk.path("testme-036.csv").exists());
assert!(wrk.path("testme-045.csv").exists());
assert!(wrk.path("testme-052.csv").exists());
assert!(wrk.path("testme-061.csv").exists());
assert!(wrk.path("testme-070.csv").exists());
assert!(wrk.path("testme-078.csv").exists());
assert!(wrk.path("testme-088.csv").exists());
assert!(wrk.path("testme-098.csv").exists());
assert!(wrk.path("testme-086.csv").exists());
assert!(wrk.path("testme-095.csv").exists());
}

0 comments on commit 3d4b81e

Please sign in to comment.