Skip to content

Commit

Permalink
csv-reader accept &Schema instead of Arc<Schema>
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 5, 2021
1 parent 4767915 commit bec832b
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 58 deletions.
21 changes: 10 additions & 11 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ use crate::{SerReader, SerWriter};
pub use arrow::io::csv::write;
use polars_core::prelude::*;
use std::fs::File;
use std::io::{Read, Seek, Write};
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

/// Write a DataFrame to csv.
pub struct CsvWriter<W: Write> {
Expand Down Expand Up @@ -183,7 +182,7 @@ impl NullValues {
/// ```
pub struct CsvReader<'a, R>
where
R: Read + Seek + MmapBytesReader,
R: MmapBytesReader,
{
/// File or Stream object
reader: R,
Expand All @@ -201,7 +200,7 @@ where
delimiter: Option<u8>,
has_header: bool,
ignore_parser_errors: bool,
schema: Option<Arc<Schema>>,
schema: Option<&'a Schema>,
encoding: CsvEncoding,
n_threads: Option<usize>,
path: Option<PathBuf>,
Expand All @@ -215,7 +214,7 @@ where

impl<'a, R> CsvReader<'a, R>
where
R: Read + Seek + Sync + Send + MmapBytesReader,
R: MmapBytesReader,
{
/// Sets the chunk size used by the parser. This influences performance
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
Expand Down Expand Up @@ -246,7 +245,7 @@ where
/// in the csv parser and expects a complete Schema.
///
/// It is recommended to use [with_dtypes](Self::with_dtypes) instead.
pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
pub fn with_schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
Expand Down Expand Up @@ -349,7 +348,7 @@ where
self
}

pub fn build_inner_reader(self) -> Result<CoreReader<R>> {
pub fn build_inner_reader(self) -> Result<CoreReader<'a, R>> {
build_csv_reader(
self.reader,
self.stop_after_n_rows,
Expand Down Expand Up @@ -385,7 +384,7 @@ impl<'a> CsvReader<'a, File> {

impl<'a, R> SerReader<R> for CsvReader<'a, R>
where
R: Read + Seek + Sync + Send + MmapBytesReader,
R: MmapBytesReader,
{
/// Create a new CsvReader from a file/ stream
fn new(reader: R) -> Self {
Expand Down Expand Up @@ -817,7 +816,7 @@ id090,id048,id0000067778,24,2,51862,4,9,
.map(|s| s.name().to_string())
.collect(),
))
.with_schema(Arc::new(schema))
.with_schema(&schema)
.finish();
assert!(result.is_ok())
}
Expand All @@ -843,11 +842,11 @@ id090,id048,id0000067778,24,2,51862,4,9,
let file = Cursor::new(csv);
let df = CsvReader::new(file)
.has_header(true)
.with_schema(Arc::new(Schema::new(vec![
.with_schema(&Schema::new(vec![
Field::new("foo", DataType::UInt32),
Field::new("bar", DataType::UInt32),
Field::new("ham", DataType::UInt32),
])))
]))
.finish()
.unwrap();
assert_eq!(df.column("ham").unwrap().len(), 3)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv_core/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl ParsedBuffer<BooleanType> for BooleanChunkedBuilder {
pub(crate) fn init_buffers(
projection: &[usize],
capacity: usize,
schema: &SchemaRef,
schema: &Schema,
// The running statistic of the amount of bytes we must allocate per str column
str_capacities: &[RunningSize],
delimiter: u8,
Expand Down
49 changes: 17 additions & 32 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::{prelude::*, POOL};
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use std::borrow::Cow;
use std::fmt;
#[cfg(feature = "decompress")]
use std::io::SeekFrom;
use std::io::{Read, Seek};
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{atomic::AtomicUsize, Arc};

/// CSV file reader
pub struct CoreReader<R: Read + MmapBytesReader> {
pub struct CoreReader<'a, R: MmapBytesReader> {
/// Explicit schema for the CSV file
schema: SchemaRef,
schema: Cow<'a, Schema>,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
Expand All @@ -46,9 +46,9 @@ pub struct CoreReader<R: Read + MmapBytesReader> {
inferred_schema: bool,
}

impl<R> fmt::Debug for CoreReader<R>
impl<'a, R> fmt::Debug for CoreReader<'a, R>
where
R: Read + MmapBytesReader,
R: MmapBytesReader,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Reader")
Expand Down Expand Up @@ -107,23 +107,8 @@ impl RunningSize {
}
}

impl<R: Read + Sync + Send + MmapBytesReader> CoreReader<R> {
/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> =
projection.iter().map(|i| fields[*i].clone()).collect();

Arc::new(Schema::new(projected_fields))
}
None => self.schema.clone(),
}
}

fn find_starting_point<'a>(&self, mut bytes: &'a [u8]) -> Result<&'a [u8]> {
impl<'a, R: MmapBytesReader> CoreReader<'a, R> {
fn find_starting_point<'b>(&self, mut bytes: &'b [u8]) -> Result<&'b [u8]> {
// Skip all leading white space and the occasional utf8-bom
bytes = skip_line_ending(skip_whitespace(skip_bom(bytes)).0).0;

Expand Down Expand Up @@ -447,13 +432,13 @@ impl<R: Read + Sync + Send + MmapBytesReader> CoreReader<R> {
) -> Result<DataFrame> {
if let Some(bytes) = decompress(bytes) {
if self.inferred_schema {
self.schema = bytes_to_schema(
self.schema = Cow::Owned(bytes_to_schema(
&bytes,
self.delimiter,
self.has_header,
self.skip_rows,
self.comment_char,
)?;
)?);
}

self.parse_csv(n_threads, &bytes, predicate.as_ref())
Expand Down Expand Up @@ -520,7 +505,7 @@ impl<R: Read + Sync + Send + MmapBytesReader> CoreReader<R> {
}

#[allow(clippy::too_many_arguments)]
pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
pub fn build_csv_reader<'a, R: MmapBytesReader>(
mut reader: R,
n_rows: Option<usize>,
skip_rows: usize,
Expand All @@ -529,26 +514,26 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
delimiter: Option<u8>,
has_header: bool,
ignore_parser_errors: bool,
schema: Option<SchemaRef>,
schema: Option<&'a Schema>,
columns: Option<Vec<String>>,
encoding: CsvEncoding,
n_threads: Option<usize>,
path: Option<PathBuf>,
schema_overwrite: Option<&Schema>,
schema_overwrite: Option<&'a Schema>,
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
null_values: Option<NullValues>,
) -> Result<CoreReader<R>> {
) -> Result<CoreReader<'a, R>> {
// check if schema should be inferred
let delimiter = delimiter.unwrap_or(b',');

#[cfg(feature = "decompress")]
let mut inferred_schema = false;

let schema = match schema {
Some(schema) => schema,
Some(schema) => Cow::Borrowed(schema),
None => {
#[cfg(feature = "decompress")]
{
Expand All @@ -563,7 +548,7 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
// restore position
reader.seek(SeekFrom::Current(-(N as i64)))?;
if decompress(&bytes).is_some() {
Arc::new(Schema::default())
Cow::Owned(Schema::default())
} else {
let (inferred_schema, _) = infer_file_schema(
&mut reader,
Expand All @@ -574,7 +559,7 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
skip_rows,
comment_char,
)?;
Arc::new(inferred_schema)
Cow::Owned(inferred_schema)
}
}
#[cfg(not(feature = "decompress"))]
Expand All @@ -588,7 +573,7 @@ pub fn build_csv_reader<R: Read + Seek + Sync + Send + MmapBytesReader>(
skip_rows,
comment_char,
)?;
Arc::new(inferred_schema)
Cow::Owned(inferred_schema)
}
}
};
Expand Down
24 changes: 11 additions & 13 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,20 +282,18 @@ pub(crate) fn bytes_to_schema(
has_header: bool,
skip_rows: usize,
comment_char: Option<u8>,
) -> Result<SchemaRef> {
) -> Result<Schema> {
let mut r = std::io::Cursor::new(&bytes);
Ok(Arc::from(
infer_file_schema(
&mut r,
delimiter,
Some(100),
has_header,
None,
skip_rows,
comment_char,
)?
.0,
))
Ok(infer_file_schema(
&mut r,
delimiter,
Some(100),
has_header,
None,
skip_rows,
comment_char,
)?
.0)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl Executor for CsvExec {
let reader = CsvReader::from_path(&self.path)
.unwrap()
.has_header(self.options.has_header)
.with_schema(self.schema.clone())
.with_schema(&self.schema)
.with_delimiter(self.options.delimiter)
.with_ignore_parser_errors(self.options.ignore_errors)
.with_skip_rows(self.options.skip_rows)
Expand Down

0 comments on commit bec832b

Please sign in to comment.