Skip to content

Commit

Permalink
python use buffered writers (#3086)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 7, 2022
1 parent 32b1e99 commit bbb8703
Showing 1 changed file with 78 additions and 28 deletions.
106 changes: 78 additions & 28 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use numpy::IntoPyArray;
use pyo3::types::{PyList, PyTuple};
use pyo3::{exceptions::PyRuntimeError, prelude::*};
use std::io::{BufReader, Cursor, Read};
use std::io::{BufReader, BufWriter, Cursor, Read};

use polars::frame::groupby::GroupBy;
use polars::prelude::*;
Expand Down Expand Up @@ -251,7 +251,7 @@ impl PyDataFrame {
}

#[cfg(feature = "avro")]
pub fn to_avro(&mut self, py_f: PyObject, compression: &str) -> PyResult<()> {
pub fn to_avro(&mut self, py: Python, py_f: PyObject, compression: &str) -> PyResult<()> {
use polars::io::avro::{AvroCompression, AvroWriter};
let compression = match compression {
"uncompressed" => None,
Expand All @@ -260,11 +260,20 @@ impl PyDataFrame {
s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()),
};

let mut buf = get_file_like(py_f, true)?;
AvroWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
AvroWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
} else {
let mut buf = get_file_like(py_f, true)?;
AvroWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}

Ok(())
}
Expand Down Expand Up @@ -314,7 +323,7 @@ impl PyDataFrame {
row_oriented: bool,
json_lines: bool,
) -> PyResult<()> {
let file = get_file_like(py_f, true)?;
let file = BufWriter::new(get_file_like(py_f, true)?);

let r = match (pretty, row_oriented, json_lines) {
(_, true, true) => panic!("{}", "only one of {row_oriented, json_lines} should be set"),
Expand Down Expand Up @@ -358,31 +367,60 @@ impl PyDataFrame {
Ok(pydf)
}

pub fn to_csv(&mut self, py_f: PyObject, has_header: bool, sep: u8, quote: u8) -> PyResult<()> {
let mut buf = get_file_like(py_f, true)?;
CsvWriter::new(&mut buf)
.has_header(has_header)
.with_delimiter(sep)
.with_quoting_char(quote)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
pub fn to_csv(
&mut self,
py: Python,
py_f: PyObject,
has_header: bool,
sep: u8,
quote: u8,
) -> PyResult<()> {
if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
CsvWriter::new(f)
.has_header(has_header)
.with_delimiter(sep)
.with_quoting_char(quote)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
} else {
let mut buf = get_file_like(py_f, true)?;
CsvWriter::new(&mut buf)
.has_header(has_header)
.with_delimiter(sep)
.with_quoting_char(quote)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}

Ok(())
}

#[cfg(feature = "ipc")]
pub fn to_ipc(&mut self, py_f: PyObject, compression: &str) -> PyResult<()> {
pub fn to_ipc(&mut self, py: Python, py_f: PyObject, compression: &str) -> PyResult<()> {
let compression = match compression {
"uncompressed" => None,
"lz4" => Some(IpcCompression::LZ4),
"zstd" => Some(IpcCompression::ZSTD),
s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()),
};
let mut buf = get_file_like(py_f, true)?;

IpcWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
IpcWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
} else {
let mut buf = get_file_like(py_f, true)?;

IpcWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}
Ok(())
}

Expand Down Expand Up @@ -475,6 +513,7 @@ impl PyDataFrame {
#[cfg(feature = "parquet")]
pub fn to_parquet(
&mut self,
py: Python,
py_f: PyObject,
compression: &str,
statistics: bool,
Expand All @@ -489,13 +528,24 @@ impl PyDataFrame {
"zstd" => ParquetCompression::Zstd,
s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()),
};
let buf = get_file_like(py_f, true)?;

ParquetWriter::new(buf)
.with_compression(compression)
.with_statistics(statistics)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
if let Ok(s) = py_f.extract::<&str>(py) {
let f = std::fs::File::create(s).unwrap();
let f = BufWriter::new(f);
ParquetWriter::new(f)
.with_compression(compression)
.with_statistics(statistics)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
} else {
let buf = get_file_like(py_f, true)?;
ParquetWriter::new(buf)
.with_compression(compression)
.with_statistics(statistics)
.finish(&mut self.df)
.map_err(PyPolarsErr::from)?;
}

Ok(())
}

Expand Down

0 comments on commit bbb8703

Please sign in to comment.