Skip to content

Commit

Permalink
refactor csv modules
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 14, 2022
1 parent d46e32c commit 5367833
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::csv::parser::{is_whitespace, skip_whitespace};
use crate::csv::read_impl::RunningSize;
use crate::csv::utils::escape_field;
use crate::csv::CsvEncoding;
use crate::csv_core::csv::RunningSize;
use crate::csv_core::parser::{is_whitespace, skip_whitespace};
use crate::csv_core::utils::escape_field;
use arrow::array::Utf8Array;
use arrow::bitmap::MutableBitmap;
use polars_arrow::prelude::FromDataUtf8;
Expand Down
76 changes: 76 additions & 0 deletions polars/polars-io/src/csv/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! # (De)serializing CSV files
//!
//! ## Maximal performance
//! Currently [CsvReader::new](CsvReader::new) has an extra copy. If you want optimal performance in CSV parsing/
//! reading, it is advised to use [CsvReader::from_path](CsvReader::from_path).
//!
//! ## Write a DataFrame to a csv file.
//!
//! ## Example
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example(df: &mut DataFrame) -> Result<()> {
//! let mut file = File::create("example.csv").expect("could not create file");
//!
//! CsvWriter::new(&mut file)
//! .has_header(true)
//! .with_delimiter(b',')
//! .finish(df)
//! }
//! ```
//!
//! ## Read a csv file to a DataFrame
//!
//! ## Example
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example() -> Result<DataFrame> {
//! // always prefer `from_path` as that is fastest.
//! CsvReader::from_path("iris_csv")?
//! .has_header(true)
//! .finish()
//! }
//! ```
//!
pub(crate) mod buffer;
pub(crate) mod parser;
pub mod read_impl;

mod read;
#[cfg(not(feature = "private"))]
pub(crate) mod utils;
#[cfg(feature = "private")]
pub mod utils;
mod write;
pub(super) mod write_impl;

use crate::aggregations::ScanAggregation;
use crate::csv::read_impl::{cast_columns, CoreReader};
use crate::csv::utils::get_reader_bytes;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::utils::resolve_homedir;
use crate::{RowCount, SerReader, SerWriter};

use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

pub use read::{CsvEncoding, CsvReader, NullValues};
pub use write::CsvWriter;
File renamed without changes.
152 changes: 1 addition & 151 deletions polars/polars-io/src/csv.rs → polars/polars-io/src/csv/read.rs
Original file line number Diff line number Diff line change
@@ -1,154 +1,4 @@
//! # (De)serializing CSV files
//!
//! ## Maximal performance
//! Currently [CsvReader::new](CsvReader::new) has an extra copy. If you want optimal performance in CSV parsing/
//! reading, it is advised to use [CsvReader::from_path](CsvReader::from_path).
//!
//! ## Write a DataFrame to a csv file.
//!
//! ## Example
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example(df: &mut DataFrame) -> Result<()> {
//! let mut file = File::create("example.csv").expect("could not create file");
//!
//! CsvWriter::new(&mut file)
//! .has_header(true)
//! .with_delimiter(b',')
//! .finish(df)
//! }
//! ```
//!
//! ## Read a csv file to a DataFrame
//!
//! ## Example
//!
//! ```
//! use polars_core::prelude::*;
//! use polars_io::prelude::*;
//! use std::fs::File;
//!
//! fn example() -> Result<DataFrame> {
//! // always prefer `from_path` as that is fastest.
//! CsvReader::from_path("iris_csv")?
//! .has_header(true)
//! .finish()
//! }
//! ```
//!
use crate::aggregations::ScanAggregation;
use crate::csv_core::csv::{cast_columns, CoreReader};
use crate::csv_core::utils::get_reader_bytes;
use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::utils::resolve_homedir;
use crate::{RowCount, SerReader, SerWriter};

use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
#[cfg(feature = "temporal")]
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

use super::csv_core::write;

/// Write a DataFrame to csv.
///
/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
#[must_use]
pub struct CsvWriter<W: Write> {
/// File or Stream handler
buffer: W,
options: write::SerializeOptions,
header: bool,
batch_size: usize,
}

impl<W> SerWriter<W> for CsvWriter<W>
where
W: Write,
{
fn new(buffer: W) -> Self {
// 9f: all nanoseconds
let options = write::SerializeOptions {
time_format: Some("%T%.9f".to_string()),
datetime_format: Some("%FT%H:%M:%S.%9f".to_string()),
..Default::default()
};

CsvWriter {
buffer,
options,
header: true,
batch_size: 1024,
}
}

fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
df.as_single_chunk_par();
let names = df.get_column_names();
if self.header {
write::write_header(&mut self.buffer, &names, &self.options)?;
}
write::write(&mut self.buffer, df, self.batch_size, &self.options)
}
}

impl<W> CsvWriter<W>
where
W: Write,
{
/// Set whether to write headers
pub fn has_header(mut self, has_header: bool) -> Self {
self.header = has_header;
self
}

/// Set the CSV file's column delimiter as a byte character
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.options.delimiter = delimiter;
self
}

pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

/// Set the CSV file's date format
pub fn with_date_format(mut self, format: Option<String>) -> Self {
self.options.date_format = format;
self
}

/// Set the CSV file's time format
pub fn with_time_format(mut self, format: Option<String>) -> Self {
self.options.time_format = format;
self
}

/// Set the CSV file's timestamp format array in
pub fn with_datetime(mut self, format: Option<String>) -> Self {
self.options.datetime_format = format;
self
}

/// Set the single byte character used for quoting
pub fn with_quoting_char(mut self, char: u8) -> Self {
self.options.quote = char;
self
}
}
use super::*;

#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::aggregations::ScanAggregation;
use crate::csv::utils::*;
use crate::csv::{buffer::*, parser::*};
use crate::csv::{CsvEncoding, NullValues};
use crate::csv_core::utils::*;
use crate::csv_core::{buffer::*, parser::*};
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
use crate::utils::update_row_counts;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::csv::CsvEncoding;
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use crate::csv_core::parser::next_line_position_naive;
use crate::csv_core::parser::{
next_line_position, skip_bom, skip_line_ending, SplitFields, SplitLines,
};
use crate::csv::parser::next_line_position_naive;
use crate::csv::parser::{next_line_position, skip_bom, skip_line_ending, SplitFields, SplitLines};
use crate::csv::CsvEncoding;
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::NullValues;
use once_cell::sync::Lazy;
Expand Down
89 changes: 89 additions & 0 deletions polars/polars-io/src/csv/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use super::*;

/// Write a DataFrame to csv.
///
/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
#[must_use]
pub struct CsvWriter<W: Write> {
/// File or Stream handler
buffer: W,
options: write_impl::SerializeOptions,
header: bool,
batch_size: usize,
}

impl<W> SerWriter<W> for CsvWriter<W>
where
W: Write,
{
fn new(buffer: W) -> Self {
// 9f: all nanoseconds
let options = write_impl::SerializeOptions {
time_format: Some("%T%.9f".to_string()),
datetime_format: Some("%FT%H:%M:%S.%9f".to_string()),
..Default::default()
};

CsvWriter {
buffer,
options,
header: true,
batch_size: 1024,
}
}

fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
df.as_single_chunk_par();
let names = df.get_column_names();
if self.header {
write_impl::write_header(&mut self.buffer, &names, &self.options)?;
}
write_impl::write(&mut self.buffer, df, self.batch_size, &self.options)
}
}

impl<W> CsvWriter<W>
where
W: Write,
{
/// Set whether to write headers
pub fn has_header(mut self, has_header: bool) -> Self {
self.header = has_header;
self
}

/// Set the CSV file's column delimiter as a byte character
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.options.delimiter = delimiter;
self
}

pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

/// Set the CSV file's date format
pub fn with_date_format(mut self, format: Option<String>) -> Self {
self.options.date_format = format;
self
}

/// Set the CSV file's time format
pub fn with_time_format(mut self, format: Option<String>) -> Self {
self.options.time_format = format;
self
}

/// Set the CSV file's timestamp format array in
pub fn with_datetime(mut self, format: Option<String>) -> Self {
self.options.datetime_format = format;
self
}

/// Set the single byte character used for quoting
pub fn with_quoting_char(mut self, char: u8) -> Self {
self.options.quote = char;
self
}
}
File renamed without changes.
9 changes: 0 additions & 9 deletions polars/polars-io/src/csv_core/mod.rs

This file was deleted.

5 changes: 2 additions & 3 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ pub mod avro;
#[cfg(feature = "csv-file")]
#[cfg_attr(docsrs, doc(cfg(feature = "csv-file")))]
pub mod csv;
#[cfg(feature = "csv-file")]
#[cfg_attr(docsrs, doc(cfg(feature = "csv-file")))]
pub mod csv_core;
#[cfg(feature = "parquet")]
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
pub mod export;
#[cfg(any(feature = "ipc", feature = "ipc_streaming"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "ipc", feature = "ipc_streaming"))))]
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/ndjson_core/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::csv_core::parser::*;
use crate::csv_core::utils::*;
use crate::csv::parser::*;
use crate::csv::utils::*;
use crate::mmap::ReaderBytes;
use crate::ndjson_core::buffer::*;
use crate::prelude::*;
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::prelude::*;
use polars_core::prelude::*;
use polars_io::csv::utils::get_reader_bytes;
use polars_io::csv::utils::infer_file_schema;
use polars_io::csv::{CsvEncoding, NullValues};
use polars_io::csv_core::utils::get_reader_bytes;
use polars_io::csv_core::utils::infer_file_schema;
use polars_io::RowCount;

#[derive(Clone)]
Expand Down

0 comments on commit 5367833

Please sign in to comment.