Skip to content

Commit

Permalink
[Python]: use python file handlers; closes #76
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2020
1 parent ce8cab0 commit b6bb61c
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 24 deletions.
1 change: 1 addition & 0 deletions polars/src/chunked_array/par/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl<'a> Producer for Utf8Producer<'a> {
type IntoIter = Utf8Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
// TODO: slice and create normal iterator?
let iter = (0..self.len).into_iter();
Utf8Iter { ca: self.ca, iter }
}
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pyo3 = {version = "0.11", features = ["extension-module"] }
thiserror = "1.0.20"
numpy = "0.11"
ndarray = "0.13.1"
parquet = "1.0.1"


[lib]
Expand Down
61 changes: 55 additions & 6 deletions py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, Sequence, List, Tuple, Optional, Union
from .series import Series, wrap_s
import numpy as np
from typing import TextIO, BinaryIO


def wrap_df(df: PyDataFrame) -> DataFrame:
Expand All @@ -25,28 +26,76 @@ def from_pydf(df: PyDataFrame) -> DataFrame:

@staticmethod
def from_csv(
path: str,
file: Union[str, TextIO],
infer_schema_length: int = 100,
batch_size: int = 100000,
has_headers: bool = True,
ignore_errors: bool = False,
) -> DataFrame:
"""
Read into a DataFrame from a csv file.
Parameters
----------
file
Path to a file or a file like object.
infer_schema_length
Maximum number of lines to read to infer schema.
batch_size
Number of lines to read into the buffer at once. Modify this to change performance.
has_headers
If the CSV file has headers or not.
ignore_errors
Try to keep reading lines if some lines yield errors.
Returns
-------
DataFrame
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.from_csv(
path, infer_schema_length, batch_size, has_headers, ignore_errors
file, infer_schema_length, batch_size, has_headers, ignore_errors
)
return self

@staticmethod
def from_parquet(path: str, batch_size: int = 250000,) -> DataFrame:
def from_parquet(
file: Union[str, BinaryIO], batch_size: int = 250000,
) -> DataFrame:
"""
Read into a DataFrame from a parquet file.
Parameters
----------
file
Path to a file or a file like object.
batch_size
Number of lines to read into the buffer at once. Modify this to change performance.
Returns
-------
DataFrame
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.from_parquet(path, batch_size)
self._df = PyDataFrame.from_parquet(file, batch_size)
return self

@staticmethod
def from_ipc(path: str) -> DataFrame:
def from_ipc(file: Union[str, BinaryIO]) -> DataFrame:
"""
Read into a DataFrame from Arrow IPC stream format.
Parameters
----------
file
Path to a file or a file like object.
Returns
-------
DataFrame
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.from_ipc(path)
self._df = PyDataFrame.from_ipc(file)
return self

def to_csv(
Expand Down
34 changes: 16 additions & 18 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use pyo3::{exceptions::RuntimeError, prelude::*};

use crate::{
error::PyPolarsEr,
file::{get_either_file, get_file_like, EitherRustPythonFile},
series::{to_pyseries_collection, to_series_collection, PySeries},
};

Expand All @@ -29,16 +30,13 @@ impl PyDataFrame {

#[staticmethod]
pub fn from_csv(
path: &str,
py_f: PyObject,
infer_schema_length: usize,
batch_size: usize,
has_header: bool,
ignore_errors: bool,
) -> PyResult<Self> {
// TODO: use python file objects:
// https://github.com/mre/hyperjson/blob/e1a0515f8d033f24b9fba64a0a4c77df841bbd1b/src/lib.rs#L20
let file = std::fs::File::open(path)?;

let file = get_file_like(py_f, false)?;
let reader = CsvReader::new(file)
.infer_schema(Some(infer_schema_length))
.has_header(has_header)
Expand All @@ -54,31 +52,31 @@ impl PyDataFrame {
}

#[staticmethod]
pub fn from_parquet(path: &str, batch_size: usize) -> PyResult<Self> {
let file = std::fs::File::open(path)?;
let df = ParquetReader::new(file)
.with_batch_size(batch_size)
.finish()
.map_err(PyPolarsEr::from)?;
pub fn from_parquet(py_f: PyObject, batch_size: usize) -> PyResult<Self> {
use EitherRustPythonFile::*;
let result = match get_either_file(py_f, false)? {
Py(f) => ParquetReader::new(f).with_batch_size(batch_size).finish(),
Rust(f) => ParquetReader::new(f).with_batch_size(batch_size).finish(),
};
let df = result.map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

#[staticmethod]
pub fn from_ipc(path: &str) -> PyResult<Self> {
let file = std::fs::File::open(path)?;
pub fn from_ipc(py_f: PyObject) -> PyResult<Self> {
let file = get_file_like(py_f, false)?;
let df = IPCReader::new(file).finish().map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

pub fn to_csv(
&mut self,
path: &str,
py_f: PyObject,
batch_size: usize,
has_headers: bool,
delimiter: u8,
) -> PyResult<()> {
// TODO: use python file objects:
let mut buf = std::fs::File::create(path)?;
let mut buf = get_file_like(py_f, true)?;
CsvWriter::new(&mut buf)
.has_headers(has_headers)
.with_delimiter(delimiter)
Expand All @@ -88,8 +86,8 @@ impl PyDataFrame {
Ok(())
}

pub fn to_ipc(&mut self, path: &str, batch_size: usize) -> PyResult<()> {
let mut buf = std::fs::File::create(path)?;
pub fn to_ipc(&mut self, py_f: PyObject, batch_size: usize) -> PyResult<()> {
let mut buf = get_file_like(py_f, true)?;
IPCWriter::new(&mut buf)
.with_batch_size(batch_size)
.finish(&mut self.df)
Expand Down
206 changes: 206 additions & 0 deletions py-polars/src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Credits to https://github.com/omerbenamram/pyo3-file
use parquet::{
errors::ParquetError,
file::reader::{Length, TryClone},
};
use pyo3::exceptions::TypeError;
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyString};
use std::borrow::Borrow;
use std::fs::File;
use std::io;
use std::io::{Read, Seek, SeekFrom, Write};

#[derive(Clone)]
pub struct PyFileLikeObject {
inner: PyObject,
}

/// Wraps a `PyObject`, and implements read, seek, and write for it.
impl PyFileLikeObject {
/// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
/// To assert the object has the required methods methods,
/// instantiate it with `PyFileLikeObject::require`
pub fn new(object: PyObject) -> Self {
PyFileLikeObject { inner: object }
}

/// Same as `PyFileLikeObject::new`, but validates that the underlying
/// python object has a `read`, `write`, and `seek` methods in respect to parameters.
/// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
pub fn with_requirements(
object: PyObject,
read: bool,
write: bool,
seek: bool,
) -> PyResult<Self> {
let gil = Python::acquire_gil();
let py = gil.python();

if read {
if let Err(_) = object.getattr(py, "read") {
return Err(PyErr::new::<TypeError, _>(
"Object does not have a .read() method.",
));
}
}

if seek {
if let Err(_) = object.getattr(py, "seek") {
return Err(PyErr::new::<TypeError, _>(
"Object does not have a .seek() method.",
));
}
}

if write {
if let Err(_) = object.getattr(py, "write") {
return Err(PyErr::new::<TypeError, _>(
"Object does not have a .write() method.",
));
}
}

Ok(PyFileLikeObject::new(object))
}
}

/// Extracts a string repr from, and returns an IO error to send back to rust.
fn pyerr_to_io_err(e: PyErr) -> io::Error {
let gil = Python::acquire_gil();
let py = gil.python();
let e_as_object: PyObject = e.into_py(py);

match e_as_object.call_method(py, "__str__", (), None) {
Ok(repr) => match repr.extract::<String>(py) {
Ok(s) => io::Error::new(io::ErrorKind::Other, s),
Err(_e) => io::Error::new(io::ErrorKind::Other, "An unknown error has occurred"),
},
Err(_) => io::Error::new(io::ErrorKind::Other, "Err doesn't have __str__"),
}
}

impl Read for PyFileLikeObject {
fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
let gil = Python::acquire_gil();
let py = gil.python();

let bytes = self
.inner
.call_method(py, "read", (buf.len(),), None)
.map_err(pyerr_to_io_err)?;

let bytes: &PyBytes = bytes
.cast_as(py)
.expect("Expecting to be able to downcast into bytes from read result.");

&buf.write(bytes.as_bytes())?;

Ok(bytes.len().map_err(pyerr_to_io_err)?)
}
}

impl Write for PyFileLikeObject {
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
let gil = Python::acquire_gil();
let py = gil.python();

let pybytes = PyBytes::new(py, buf);

let number_bytes_written = self
.inner
.call_method(py, "write", (pybytes,), None)
.map_err(pyerr_to_io_err)?;

Ok(number_bytes_written.extract(py).map_err(pyerr_to_io_err)?)
}

fn flush(&mut self) -> Result<(), io::Error> {
let gil = Python::acquire_gil();
let py = gil.python();

self.inner
.call_method(py, "flush", (), None)
.map_err(pyerr_to_io_err)?;

Ok(())
}
}

impl Seek for PyFileLikeObject {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
let gil = Python::acquire_gil();
let py = gil.python();

let (whence, offset) = match pos {
SeekFrom::Start(i) => (0, i as i64),
SeekFrom::Current(i) => (1, i as i64),
SeekFrom::End(i) => (2, i as i64),
};

let new_position = self
.inner
.call_method(py, "seek", (offset, whence), None)
.map_err(pyerr_to_io_err)?;

Ok(new_position.extract(py).map_err(pyerr_to_io_err)?)
}
}

pub trait FileLike: Read + Write + Seek {}

// Needed for arrow parquet
impl Length for PyFileLikeObject {
fn len(&self) -> u64 {
let gil = Python::acquire_gil();
let py = gil.python();

let size = self
.inner
.call_method0(py, "__sizeof__")
.expect("Could not read size of buffer");
size.extract(py)
.expect("did not get an int as result of __sizeof__")
}
}

impl TryClone for PyFileLikeObject {
fn try_clone(&self) -> std::result::Result<Self, ParquetError> {
Ok(self.clone())
}
}

impl FileLike for File {}
impl FileLike for PyFileLikeObject {}

pub enum EitherRustPythonFile {
Py(PyFileLikeObject),
Rust(File),
}

pub fn get_either_file(py_f: PyObject, truncate: bool) -> PyResult<EitherRustPythonFile> {
let gil = Python::acquire_gil();
let py = gil.python();

if let Ok(pstring) = py_f.cast_as::<PyString>(py) {
let rstring = pstring.to_string()?;
let str_slice: &str = rstring.borrow();
let f = if truncate {
File::create(str_slice)?
} else {
File::open(str_slice)?
};
Ok(EitherRustPythonFile::Rust(f))
} else {
let f = PyFileLikeObject::with_requirements(py_f, true, true, true)?;
Ok(EitherRustPythonFile::Py(f))
}
}

pub fn get_file_like(f: PyObject, truncate: bool) -> PyResult<Box<dyn FileLike>> {
use EitherRustPythonFile::*;
match get_either_file(f, truncate)? {
Py(f) => Ok(Box::new(f)),
Rust(f) => Ok(Box::new(f)),
}
}

0 comments on commit b6bb61c

Please sign in to comment.