Skip to content

Commit

Permalink
Update package descriptions and refactor code
Browse files Browse the repository at this point in the history
Updated the README.md to describe the new usage of tokio crate for multithreaded processing. Simultaneously, refactored main.rs to improve the code structure and renamed the "process_file" function to "convert_to_parquet" for better understanding. Some print statements were reformatted for cleaner console output.
  • Loading branch information
rayyildiz committed Mar 27, 2024
1 parent e673b52 commit b818f3b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 22 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Expand Up @@ -13,7 +13,9 @@ polars = { version = "0.38", default-features = false, features = ["csv"] }
polars-io = { version = "0.38", default-features = false, features = ["csv", "parquet"] }
clap = { version = "4.5", features = ["derive", "std"] }
indicatif = { version = "0.17" }
tokio = { version = "1.36", default-features = false, features = ["rt","rt-multi-thread"] }
tokio = { version = "1.36", default-features = false, features = ["rt", "rt-multi-thread"] }


[profile.release]
codegen-units = 1
lto = true
Expand Down
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -40,8 +40,8 @@ Other arguments:
## Features

- Fast and reliable CSV to Parquet conversion.
- Multithreaded processing with the help of the Rayon crate.
- Progress indication during conversion with the help of the Indicatif crate.
- Multithreaded processing with the help of the [tokio](https://tokio.rs/) crate.
- Progress indication during conversion with the help of the [indicatif](https://docs.rs/indicatif) crate.

## Contributing

Expand Down
58 changes: 39 additions & 19 deletions src/main.rs
@@ -1,15 +1,14 @@
extern crate core;

use std::fs::File;
use std::time::Instant;
use clap::{arg, Parser};
use indicatif::{ProgressBar, ProgressStyle};
use polars::prelude::*;
use std::fs::File;
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::Instant;
use tokio::runtime;


#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
Expand All @@ -30,7 +29,6 @@ struct Args {
worker: u8,
}


// New struct for storing file path and error data
struct ErrorData {
file_path: String,
Expand All @@ -44,7 +42,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let delimiter = args.delimiter.as_str().chars().next().unwrap_or(',');
let has_header = args.header;

println!("Program arguments\n path: {}\n delimiter: {}\n has header: {} \n worker count: {}", path.display(), delimiter, has_header, args.worker);
println!(
"Program arguments\n path: {}\n delimiter: {}\n has header: {} \n worker count: {}",
path.display(),
delimiter,
has_header,
args.worker
);
let errors = Arc::new(Mutex::new(Vec::<ErrorData>::new()));

let d = std::fs::read_dir(&path)?;
Expand All @@ -58,16 +62,19 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let bar = ProgressBar::new(files.len().try_into().unwrap());

bar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.yellow/blue} {pos:>7}/{len:7} {msg}")
.unwrap());
bar.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] {bar:40.yellow/blue} {pos:>7}/{len:7} {msg}",
)
.unwrap(),
);
let bar = Arc::new(Mutex::new(bar));

let runtime = runtime::Builder::new_multi_thread()
.worker_threads(args.worker as usize)
.enable_all()
.build()?;


runtime.block_on(async {
let mut handles = vec![];

Expand All @@ -76,11 +83,15 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let bar = Arc::clone(&bar);
let errors_clone = Arc::clone(&errors);
let h = tokio::spawn(async move {

if let Err(err) = process_file(&path_clone, delimiter, has_header, file.to_str().unwrap()) {
if let Err(err) =
convert_to_parquet(&path_clone, delimiter, has_header, file.to_str().unwrap())
{
let mut errors = errors_clone.lock().unwrap();

errors.push(ErrorData { file_path: file.to_str().unwrap().to_string(), error: err.to_string() });
errors.push(ErrorData {
file_path: file.to_str().unwrap().to_string(),
error: err.to_string(),
});
}
bar.lock().unwrap().inc(1);
});
Expand All @@ -93,12 +104,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
});


bar.lock().unwrap().finish();

let errors = errors.lock().unwrap();
for err_data in &*errors {
println!("File: {} Error: {:?}\n", err_data.file_path, err_data.error);
println!(
"File: {} Error: {:?}\n",
err_data.file_path, err_data.error
);
}

let elapsed = start.elapsed();
Expand Down Expand Up @@ -132,17 +145,25 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let has_header = true;
/// let file_name = "data.csv";
///
/// process_file(&base, delimiter, has_header, file_name)?;
/// convert_to_parquet(&base, delimiter, has_header, file_name)?;
///
/// Ok(())
/// }
/// ```
fn process_file(base: &PathBuf, delimiter: char, has_header: bool, file_name: &str) -> Result<(), Box<dyn std::error::Error>> {
pub fn convert_to_parquet(
base: &PathBuf,
delimiter: char,
has_header: bool,
file_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let file_path = base.join(file_name);

let file = File::open(&file_path)?;

let mut df_posts = CsvReader::new(file).has_header(has_header).with_separator(delimiter as u8).finish()?;
let mut df_posts = CsvReader::new(file)
.has_header(has_header)
.with_separator(delimiter as u8)
.finish()?;

let target_file = file_path.with_extension("parquet");
let mut file = File::create(target_file).unwrap();
Expand All @@ -155,19 +176,18 @@ fn process_file(base: &PathBuf, delimiter: char, has_header: bool, file_name: &s
Ok(())
}


#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_process_file() {
fn test_convert_to_parquet() {
let mut base = std::env::current_dir().unwrap();
base.push("testdata");

let file_name = "sample.csv";

let result = process_file(&base, ',', true, file_name);
let result = convert_to_parquet(&base, ',', true, file_name);

// Check that the function completed successfully
assert!(result.is_ok());
Expand Down

0 comments on commit b818f3b

Please sign in to comment.