Skip to content

Commit

Permalink
throw error when scanning compressed csv
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 18, 2021
1 parent dd70f31 commit 200f769
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 26 deletions.
5 changes: 5 additions & 0 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ impl<'a> CoreReader<'a> {
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
let mut reader_bytes = reader_bytes;

#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
if is_compressed(&reader_bytes) {
return Err(PolarsError::ComputeError("cannot read compressed csv file; compile with feature 'decompress' or 'decompress-fast'".into()));
}

// check if schema should be inferred
let delimiter = delimiter.unwrap_or(b',');

Expand Down
24 changes: 16 additions & 8 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,20 +280,28 @@ pub fn infer_file_schema(
Ok((Schema::new(fields), records_count))
}

// magic numbers
const GZIP: [u8; 2] = [31, 139];
const ZLIB0: [u8; 2] = [0x78, 0x01];
const ZLIB1: [u8; 2] = [0x78, 0x9C];
const ZLIB2: [u8; 2] = [0x78, 0xDA];

/// check if csv file is compressed
pub fn is_compressed(bytes: &[u8]) -> bool {
bytes.starts_with(&ZLIB0)
|| bytes.starts_with(&ZLIB1)
|| bytes.starts_with(&ZLIB2)
|| bytes.starts_with(&GZIP)
}

#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
pub(crate) fn decompress(bytes: &[u8]) -> Option<Vec<u8>> {
// magic numbers
let gzip: [u8; 2] = [31, 139];
let zlib0: [u8; 2] = [0x78, 0x01];
let zlib1: [u8; 2] = [0x78, 0x9C];
let zlib2: [u8; 2] = [0x78, 0xDA];

if bytes.starts_with(&gzip) {
if bytes.starts_with(&GZIP) {
let mut out = Vec::with_capacity(bytes.len());
let mut decoder = flate2::read::MultiGzDecoder::new(bytes);
decoder.read_to_end(&mut out).ok()?;
Some(out)
} else if bytes.starts_with(&zlib0) || bytes.starts_with(&zlib1) || bytes.starts_with(&zlib2) {
} else if bytes.starts_with(&ZLIB0) || bytes.starts_with(&ZLIB1) || bytes.starts_with(&ZLIB2) {
let mut out = Vec::with_capacity(bytes.len());
let mut decoder = flate2::read::ZlibDecoder::new(bytes);
decoder.read_to_end(&mut out).ok()?;
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<'a> LazyCsvReader<'a> {
self
}

pub fn finish(self) -> LazyFrame {
pub fn finish(self) -> Result<LazyFrame> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_csv(
self.path,
self.delimiter,
Expand All @@ -165,11 +165,11 @@ impl<'a> LazyCsvReader<'a> {
self.quote_char,
self.null_values,
self.infer_schema_length,
)
)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
lf
Ok(lf)
}
}

Expand Down
19 changes: 14 additions & 5 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::utils::{
};
use crate::{prelude::*, utils};
use polars_io::csv::NullValues;
use polars_io::csv_core::utils::get_reader_bytes;
use polars_io::csv_core::utils::{get_reader_bytes, is_compressed};

pub(crate) mod aexpr;
pub(crate) mod alp;
Expand All @@ -37,6 +37,7 @@ mod projection;
#[cfg(feature = "ipc")]
use polars_io::ipc::IpcReader;
use projection::*;
use std::io::{Read, Seek, SeekFrom};

// Will be set/ unset in the fetch operation to communicate overwriting the number of rows to scan.
thread_local! {pub(crate) static FETCH_ROWS: Cell<Option<usize>> = Cell::new(None)}
Expand Down Expand Up @@ -871,9 +872,17 @@ impl LogicalPlanBuilder {
quote_char: Option<u8>,
null_values: Option<NullValues>,
infer_schema_length: Option<usize>,
) -> Self {
) -> Result<Self> {
let path = path.into();
let mut file = std::fs::File::open(&path).expect("could not open file");
let mut file = std::fs::File::open(&path)?;
let mut magic_nr = [0u8; 2];
file.read_exact(&mut magic_nr)?;
if is_compressed(&magic_nr) {
return Err(PolarsError::ComputeError(
"cannot scan compressed csv; use read_csv for compressed data".into(),
));
}
file.seek(SeekFrom::Start(0))?;
let reader_bytes = get_reader_bytes(&mut file).expect("could not mmap file");

let schema = schema.unwrap_or_else(|| {
Expand All @@ -890,7 +899,7 @@ impl LogicalPlanBuilder {
.expect("could not read schema");
Arc::new(schema)
});
LogicalPlan::CsvScan {
Ok(LogicalPlan::CsvScan {
path,
schema,
options: CsvParserOptions {
Expand All @@ -909,7 +918,7 @@ impl LogicalPlanBuilder {
predicate: None,
aggregate: vec![],
}
.into()
.into())
}

pub fn cache(self) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::iter::FromIterator;

fn scan_foods_csv() -> LazyFrame {
let path = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv";
LazyCsvReader::new(path.to_string()).finish()
LazyCsvReader::new(path.to_string()).finish().unwrap()
}

pub(crate) fn fruits_cars() -> DataFrame {
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::dataframe::PyDataFrame;
use crate::error::PyPolarsEr;
use crate::lazy::dataframe::PyLazyFrame;
use crate::prelude::*;
use crate::series::PySeries;
use polars::chunked_array::object::PolarsObjectSafe;
Expand All @@ -16,7 +17,6 @@ use pyo3::types::{PyDict, PySequence};
use pyo3::{PyAny, PyResult};
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use crate::lazy::dataframe::PyLazyFrame;

#[repr(transparent)]
pub struct Wrap<T>(pub T);
Expand Down
22 changes: 16 additions & 6 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl PyLazyFrame {
quote_char: Option<&str>,
null_values: Option<Wrap<NullValues>>,
infer_schema_length: Option<usize>,
) -> Self {
) -> PyResult<Self> {
let null_values = null_values.map(|w| w.0);
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
let quote_char = quote_char.map(|s| s.as_bytes()[0]);
Expand All @@ -118,7 +118,7 @@ impl PyLazyFrame {
Schema::new(fields)
});

LazyCsvReader::new(path)
Ok(LazyCsvReader::new(path)
.with_infer_schema_length(infer_schema_length)
.with_delimiter(delimiter)
.has_header(has_header)
Expand All @@ -132,18 +132,28 @@ impl PyLazyFrame {
.with_quote_char(quote_char)
.with_null_values(null_values)
.finish()
.into()
.map_err(PyPolarsEr::from)?
.into())
}

#[staticmethod]
#[cfg(feature = "parquet")]
pub fn new_from_parquet(path: String, stop_after_n_rows: Option<usize>, cache: bool) -> PyResult<Self> {
let lf = LazyFrame::scan_parquet(path, stop_after_n_rows, cache).map_err(PyPolarsEr::from)?;
pub fn new_from_parquet(
path: String,
stop_after_n_rows: Option<usize>,
cache: bool,
) -> PyResult<Self> {
let lf =
LazyFrame::scan_parquet(path, stop_after_n_rows, cache).map_err(PyPolarsEr::from)?;
Ok(lf.into())
}

#[staticmethod]
pub fn new_from_ipc(path: String, stop_after_n_rows: Option<usize>, cache: bool) -> PyResult<Self> {
pub fn new_from_ipc(
path: String,
stop_after_n_rows: Option<usize>,
cache: bool,
) -> PyResult<Self> {
let lf = LazyFrame::scan_ipc(path, stop_after_n_rows, cache).map_err(PyPolarsEr::from)?;
Ok(lf.into())
}
Expand Down
3 changes: 1 addition & 2 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod prelude;
pub mod series;
pub mod utils;

use crate::conversion::{get_df, get_pyseq, get_series, Wrap, get_lf};
use crate::conversion::{get_df, get_lf, get_pyseq, get_series, Wrap};
use crate::error::PyPolarsEr;
use crate::file::get_either_file;
use crate::prelude::{DataType, PyDataType};
Expand Down Expand Up @@ -197,7 +197,6 @@ fn concat_lf(lfs: &PyAny, rechunk: bool) -> PyResult<PyLazyFrame> {
let (seq, len) = get_pyseq(lfs)?;
let mut lfs = Vec::with_capacity(len);


for res in seq.iter()? {
let item = res?;
let lf = get_lf(item)?;
Expand Down

0 comments on commit 200f769

Please sign in to comment.