Skip to content

Commit

Permalink
Added PartitionedWriter for disk partitioning. (#3331)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed May 9, 2022
1 parent b9dfab9 commit 106a0c3
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 12 deletions.
4 changes: 4 additions & 0 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fmt = ["polars-core/fmt"]
decompress = ["flate2/miniz_oxide"]
decompress-fast = ["flate2/zlib-ng-compat"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
partition = ["polars-core/partition_by"]
# don't use this
private = ["polars-time/private"]

Expand Down Expand Up @@ -55,6 +56,9 @@ serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true }
simdutf8 = "0.1"

[dev-dependencies]
tempdir = "0.3.7"

[package.metadata.docs.rs]
all-features = true
# defines the configuration attribute `docsrs`
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ where
}
}

fn finish(mut self, df: &mut DataFrame) -> Result<()> {
fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
let schema = df.schema().to_arrow();
let avro_fields = write::to_avro_schema(&schema)?;

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
}
}

fn finish(mut self, df: &mut DataFrame) -> Result<()> {
fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
df.rechunk();
let names = df.get_column_names();
let iter = df.iter_chunks();
Expand Down
54 changes: 48 additions & 6 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
//! ```
use super::{finish_reader, ArrowReader, ArrowResult};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use crate::{prelude::*, WriterFactory};
use arrow::io::ipc::write::WriteOptions;
use arrow::io::ipc::{read, write};
use polars_core::prelude::*;

use std::io::{Read, Seek, Write};

use std::path::PathBuf;
use std::sync::Arc;

/// Read Arrows IPC format into a DataFrame
Expand Down Expand Up @@ -232,10 +235,7 @@ use crate::RowCount;
use polars_core::frame::ArrowChunk;
pub use write::Compression as IpcCompression;

impl<W> IpcWriter<W>
where
W: Write,
{
impl<W> IpcWriter<W> {
/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
Expand All @@ -254,7 +254,7 @@ where
}
}

fn finish(mut self, df: &mut DataFrame) -> Result<()> {
fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
let mut ipc_writer = write::FileWriter::try_new(
&mut self.writer,
&df.schema().to_arrow(),
Expand All @@ -274,6 +274,48 @@ where
}
}

pub struct IpcWriterOption {
compression: Option<write::Compression>,
extension: PathBuf,
}

impl IpcWriterOption {
pub fn new() -> Self {
Self {
compression: None,
extension: PathBuf::from(".ipc"),
}
}

/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}

/// Set the extention. Defaults to ".ipc".
pub fn with_extension(mut self, extension: PathBuf) -> Self {
self.extension = extension;
self
}
}

impl Default for IpcWriterOption {
fn default() -> Self {
Self::new()
}
}

impl WriterFactory for IpcWriterOption {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>> {
Box::new(IpcWriter::new(writer).with_compression(self.compression))
}

fn extension(&self) -> PathBuf {
self.extension.to_owned()
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
}
}

fn finish(mut self, df: &mut DataFrame) -> Result<()> {
fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
df.rechunk();
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>();
let batches = df
Expand Down
17 changes: 14 additions & 3 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub mod prelude;
mod tests;
pub(crate) mod utils;

#[cfg(feature = "partition")]
pub mod partition;

pub use options::*;

#[cfg(any(feature = "ipc", feature = "json", feature = "avro"))]
Expand All @@ -48,6 +51,7 @@ use arrow::error::Result as ArrowResult;
use polars_core::frame::ArrowChunk;
use polars_core::prelude::*;
use std::io::{Read, Seek, Write};
use std::path::PathBuf;

pub trait SerReader<R>
where
Expand All @@ -59,7 +63,7 @@ where
#[must_use]
fn set_rechunk(self, _rechunk: bool) -> Self
where
Self: std::marker::Sized,
Self: Sized,
{
self
}
Expand All @@ -72,8 +76,15 @@ pub trait SerWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self;
fn finish(self, df: &mut DataFrame) -> Result<()>;
fn new(writer: W) -> Self
where
Self: Sized;
fn finish(&mut self, df: &mut DataFrame) -> Result<()>;
}

pub trait WriterFactory {
fn create_writer<W: Write + 'static>(&self, writer: W) -> Box<dyn SerWriter<W>>;
fn extension(&self) -> PathBuf;
}

pub trait ArrowReader {
Expand Down
170 changes: 170 additions & 0 deletions polars/polars-io/src/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use crate::{utils::resolve_homedir, WriterFactory};
use polars_core::prelude::*;
use rayon::iter::IndexedParallelIterator;
use std::{
fs::File,
io::BufWriter,
path::{Path, PathBuf},
};

/// partition_df must be created by the same way of partition_by
fn resolve_partition_dir<I, S>(rootdir: &Path, by: I, partition_df: &DataFrame) -> PathBuf
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let mut path = PathBuf::new();
path.push(resolve_homedir(rootdir));

for key in by.into_iter() {
let value = partition_df[key.as_ref()].get(0).to_string();
path.push(format!("{}={}", key.as_ref(), value))
}
path
}

/// Write a DataFrame with disk partitioning
///
/// # Example
/// ```
/// use polars_core::prelude::*;
/// use polars_io::ipc::IpcWriterOption;
/// use polars_io::partition::PartitionedWriter;
///
/// fn example(df: &mut DataFrame) -> Result<()> {
/// let option = IpcWriterOption::default();
/// PartitionedWriter::new(option, "./rootdir", ["a", "b"])
/// .finish(df)
/// }
/// ```
///
pub struct PartitionedWriter<F> {
option: F,
rootdir: PathBuf,
by: Vec<String>,
parallel: bool,
}

impl<F> PartitionedWriter<F>
where
F: WriterFactory + Send + Sync,
{
pub fn new<P, I, S>(option: F, rootdir: P, by: I) -> Self
where
P: Into<PathBuf>,
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
Self {
option,
rootdir: rootdir.into(),
by: by.into_iter().map(|s| s.as_ref().to_string()).collect(),
parallel: true,
}
}

/// Write the parquet file in parallel (default).
pub fn with_parallel(mut self, pararell: bool) -> Self {
self.parallel = pararell;
self
}

fn write_partition_df(&self, partition_df: &mut DataFrame, i: usize) -> Result<()> {
let mut path = resolve_partition_dir(&self.rootdir, &self.by, partition_df);
std::fs::create_dir_all(&path)?;

path.push(format!(
"data-{:04}.{}",
i,
self.option.extension().display()
));

let file = std::fs::File::create(path)?;
let writer = BufWriter::new(file);

self.option
.create_writer::<BufWriter<File>>(writer)
.finish(partition_df)
}

pub fn finish(self, df: &DataFrame) -> Result<()> {
use polars_core::POOL;
use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;

let partitioned = df.partition_by(&self.by)?;

if self.parallel {
POOL.install(|| {
partitioned
.into_par_iter()
.enumerate()
.map(|(i, mut partition_df)| self.write_partition_df(&mut partition_df, i))
.collect::<Result<Vec<_>>>()
})?;
} else {
for (i, mut partition_df) in partitioned.into_iter().enumerate() {
self.write_partition_df(&mut partition_df, i)?;
}
}

Ok(())
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
#[cfg(feature = "ipc")]
fn test_ipc_partition() -> Result<()> {
use crate::ipc::IpcReader;
use crate::SerReader;
use std::{io::BufReader, path::PathBuf};

use tempdir::TempDir;

use crate::prelude::IpcWriterOption;

let tempdir = TempDir::new("ipc-partition")?;

let df = df!("a" => [1, 1, 2, 3], "b" => [2, 2, 3, 4], "c" => [2, 3, 4, 5]).unwrap();
let by = ["a", "b"];
let rootdir = tempdir.path();

let option = IpcWriterOption::new();

PartitionedWriter::new(option, &rootdir, by).finish(&df)?;

let expected_dfs = [
df!("a" => [1, 1], "b" => [2, 2], "c" => [2, 3])?,
df!("a" => [2], "b" => [3], "c" => [4])?,
df!("a" => [3], "b" => [4], "c" => [5])?,
];

let expected: Vec<(PathBuf, DataFrame)> = ["a=1/b=2", "a=2/b=3", "a=3/b=4"]
.into_iter()
.zip(expected_dfs.into_iter())
.map(|(p, df)| (PathBuf::from(rootdir.join(p)), df))
.collect();

for (expected_dir, expected_df) in expected.iter() {
assert!(expected_dir.exists());

let ipc_paths = std::fs::read_dir(&expected_dir)?
.map(|e| {
let entry = e?;
Ok(entry.path())
})
.collect::<Result<Vec<_>>>()?;

assert_eq!(ipc_paths.len(), 1);
let reader = BufReader::new(std::fs::File::open(&ipc_paths[0])?);
let df = IpcReader::new(reader).finish()?;
assert!(expected_df.frame_equal(&df));
}

Ok(())
}
}

0 comments on commit 106a0c3

Please sign in to comment.