Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split: added --chunks option #1587

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
static USAGE: &str = r#"
Splits the given CSV data into chunks.

Uses multithreading to go faster if the given CSV data has an index.

The files are written to the directory given with the name '{start}.csv',
where {start} is the index of the first record of the chunk (starting at 0).

Expand All @@ -11,12 +13,16 @@ Examples:

qsv split . -s 100 input.csv

cat in.csv | qsv split outdir -s 1000
cat in.csv | qsv split outdir -s 1000

qsv split outdir --chunks 10 input.csv

qsv split outdir -c 10 -j 4 input.csv

For more examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_split.rs.

Usage:
qsv split [options] <outdir> [<input>]
qsv split [options] (--size <arg> | --chunks <arg>) <outdir> [<input>]
qsv split --help

split arguments:
Expand All @@ -28,6 +34,14 @@ split arguments:
split options:
-s, --size <arg> The number of records to write into each chunk.
[default: 500]
-c, --chunks <arg> The number of chunks to split the data into.
This option is mutually exclusive with --size.
The number of rows in each chunk is determined by
the number of records in the CSV data and the number
of desired chunks. If the number of records is not evenly
divisible by the number of chunks, the last chunk will
have fewer records.

-j, --jobs <arg> The number of splitting jobs to run in parallel.
This only works when the given CSV data has
an index already created. Note that a file handle
Expand Down Expand Up @@ -69,6 +83,7 @@ struct Args {
arg_input: Option<String>,
arg_outdir: String,
flag_size: usize,
flag_chunks: Option<usize>,
flag_jobs: Option<usize>,
flag_filename: FilenameTemplate,
flag_pad: usize,
Expand Down Expand Up @@ -101,11 +116,22 @@ impl Args {
let mut rdr = rconfig.reader()?;
let headers = rdr.byte_headers()?.clone();

let chunk_size = if self.flag_chunks.is_some() {
let count = util::count_rows(&rconfig)?;
let chunk = self.flag_chunks.unwrap();
if chunk == 0 {
return fail_incorrectusage_clierror!("--chunk must be greater than 0.");
}
(count as f64 / chunk as f64).ceil() as usize
} else {
self.flag_size
};

let mut wtr = self.new_writer(&headers, 0, self.flag_pad)?;
let mut i = 0;
let mut row = csv::ByteRecord::new();
while rdr.read_byte_record(&mut row)? {
if i > 0 && i % self.flag_size == 0 {
if i > 0 && i % chunk_size == 0 {
wtr.flush()?;
wtr = self.new_writer(&headers, i, self.flag_pad)?;
}
Expand All @@ -117,13 +143,21 @@ impl Args {
}

fn parallel_split(&self, idx: &Indexed<fs::File, fs::File>) -> CliResult<()> {
let nchunks = util::num_of_chunks(idx.count() as usize, self.flag_size);
let args = self.clone();
let chunk_size;
let nchunks = if let Some(flag_chunks) = args.flag_chunks {
chunk_size = idx.count() as usize / flag_chunks;
flag_chunks
} else {
chunk_size = args.flag_size;
util::num_of_chunks(idx.count() as usize, self.flag_size)
};
if nchunks == 1 {
// there's only one chunk, we can just do a sequential split
// which has less overhead and better error handling
return self.sequential_split();
}
let args = self.clone();

util::njobs(args.flag_jobs);

// safety: we cannot use ? here because we're in a closure
Expand All @@ -138,13 +172,13 @@ impl Args {

let mut wtr = args
// safety: the only way this can fail is if we cannot create a file
.new_writer(headers, i * args.flag_size, args.flag_pad)
.new_writer(headers, i * chunk_size, args.flag_pad)
.unwrap();

// safety: we know that there is more than one chunk, so we can safely
// seek to the start of the chunk
idx.seek((i * args.flag_size) as u64).unwrap();
for row in idx.byte_records().take(args.flag_size) {
idx.seek((i * chunk_size) as u64).unwrap();
for row in idx.byte_records().take(chunk_size) {
let row = row.unwrap();
wtr.write_byte_record(&row).unwrap();
}
Expand Down