Skip to content

Commit

Permalink
drop schema in DataFrame. The Series/ ChunkedArray's are now the sour…
Browse files Browse the repository at this point in the history
…ce of truth wrt the schema.
  • Loading branch information
ritchie46 committed Sep 5, 2020
1 parent 3252691 commit be46ebe
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 61 deletions.
15 changes: 8 additions & 7 deletions polars/src/frame/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,17 +474,18 @@ impl DataFrame {
fn finish_join(&self, mut df_left: DataFrame, mut df_right: DataFrame) -> Result<DataFrame> {
let mut left_names =
HashSet::with_capacity_and_hasher(df_left.width(), FnvBuildHasher::default());
for field in df_left.schema.fields() {
left_names.insert(field.name());
}

df_left.columns.iter().for_each(|series| {
left_names.insert(series.name());
});

let mut rename_strs = Vec::with_capacity(df_right.width());

for field in df_right.schema.fields() {
if left_names.contains(field.name()) {
rename_strs.push(field.name().to_owned())
df_right.columns.iter().for_each(|series| {
if left_names.contains(series.name()) {
rename_strs.push(series.name().to_owned())
}
}
});

for name in rename_strs {
df_right.rename(&name, &format!("{}_right", name))?;
Expand Down
78 changes: 29 additions & 49 deletions polars/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::prelude::*;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use itertools::Itertools;
use itertools::__std_iter::FromIterator;
use rayon::prelude::*;
use std::marker::Sized;
use std::mem;
Expand All @@ -14,6 +13,7 @@ pub mod group_by;
pub mod hash_join;
pub mod select;
pub mod ser;
mod upstream_traits;

pub trait IntoSeries {
fn into_series(self) -> Series
Expand All @@ -39,7 +39,6 @@ type DfColumns = Vec<DfSeries>;

#[derive(Clone)]
pub struct DataFrame {
schema: DfSchema,
columns: DfColumns,
}

Expand Down Expand Up @@ -71,21 +70,12 @@ impl DataFrame {
/// let df = DataFrame::new(vec![s0, s1]).unwrap();
/// ```
pub fn new(columns: Vec<Series>) -> Result<Self> {
let fields = Self::create_fields(&columns);
let schema = Arc::new(Schema::new(fields));

let mut df = Self::new_with_schema(schema, columns)?;
df.rechunk()?;
Ok(df)
}

/// Create a new DataFrame from a schema and a vec of series.
/// Only for crate use as schema is not checked.
fn new_with_schema(schema: DfSchema, columns: DfColumns) -> Result<Self> {
if !columns.iter().map(|s| s.len()).all_equal() {
return Err(PolarsError::ShapeMisMatch);
}
Ok(DataFrame { schema, columns })
let mut df = DataFrame { columns };
df.rechunk()?;
Ok(df)
}

/// Ensure all the chunks in the DataFrame are aligned.
Expand Down Expand Up @@ -120,8 +110,9 @@ impl DataFrame {
}

/// Get a reference to the DataFrame schema.
pub fn schema(&self) -> &DfSchema {
&self.schema
pub fn schema(&self) -> Schema {
let fields = Self::create_fields(&self.columns);
Schema::new(fields)
}

/// Get a reference to the DataFrame columns.
Expand Down Expand Up @@ -156,15 +147,13 @@ impl DataFrame {

/// This method should be called after every mutable addition/ deletion of columns
fn register_mutation(&mut self) -> Result<()> {
let fields = Self::create_fields(&self.columns);
self.schema = Arc::new(Schema::new(fields));
self.rechunk()?;
Ok(())
}

/// Get a reference the schema fields of the DataFrame.
pub fn fields(&self) -> &Vec<Field> {
self.schema.fields()
/// Get a reference to the schema fields of the DataFrame.
pub fn fields(&self) -> Vec<Field> {
self.columns.iter().map(|s| s.field().clone()).collect()
}

/// Get (width x height)
Expand Down Expand Up @@ -327,11 +316,10 @@ impl DataFrame {

/// Get column index of a series by name.
pub fn find_idx_by_name(&self, name: &str) -> Option<usize> {
self.schema
.fields()
self.columns
.iter()
.enumerate()
.filter(|(_idx, field)| field.name() == name)
.filter(|(_idx, series)| series.name() == name)
.map(|(idx, _)| idx)
.next()
}
Expand Down Expand Up @@ -425,7 +413,7 @@ impl DataFrame {
s.take_iter(&mut i, capacity)
})
.collect::<Result<Vec<_>>>()?;
DataFrame::new_with_schema(self.schema.clone(), new_col)
DataFrame::new(new_col)
}

/// Take DataFrame values by indexes from an iterator. This doesn't do any bound checking.
Expand All @@ -451,7 +439,7 @@ impl DataFrame {
s.take_iter_unchecked(&mut i, capacity)
})
.collect::<Vec<_>>();
DataFrame::new_with_schema(self.schema.clone(), new_col).unwrap()
DataFrame::new(new_col).unwrap()
}

/// Take DataFrame values by indexes from an iterator that may contain None values.
Expand All @@ -477,7 +465,7 @@ impl DataFrame {
s.take_opt_iter(&mut i, capacity)
})
.collect::<Result<Vec<_>>>()?;
DataFrame::new_with_schema(self.schema.clone(), new_col)
DataFrame::new(new_col)
}

/// Take DataFrame values by indexes from an iterator that may contain None values.
Expand All @@ -504,7 +492,7 @@ impl DataFrame {
s.take_opt_iter_unchecked(&mut i, capacity)
})
.collect::<Vec<_>>();
DataFrame::new_with_schema(self.schema.clone(), new_col).unwrap()
DataFrame::new(new_col).unwrap()
}

/// Take DataFrame rows by index values.
Expand All @@ -525,7 +513,7 @@ impl DataFrame {
.map(|s| s.take(indices))
.collect::<Result<Vec<_>>>()?;

DataFrame::new_with_schema(self.schema.clone(), new_col)
DataFrame::new(new_col)
}

/// Rename a column in the DataFrame
Expand Down Expand Up @@ -803,7 +791,7 @@ impl DataFrame {
.par_iter()
.map(|s| s.slice(offset, length))
.collect::<Result<Vec<_>>>()?;
DataFrame::new_with_schema(self.schema.clone(), col)
DataFrame::new(col)
}

/// Get the head of the DataFrame
Expand All @@ -813,7 +801,7 @@ impl DataFrame {
.iter()
.map(|s| s.head(length))
.collect::<Vec<_>>();
DataFrame::new_with_schema(self.schema.clone(), col).unwrap()
DataFrame::new(col).unwrap()
}

/// Get the tail of the DataFrame
Expand All @@ -823,14 +811,16 @@ impl DataFrame {
.iter()
.map(|s| s.tail(length))
.collect::<Vec<_>>();
DataFrame::new_with_schema(self.schema.clone(), col).unwrap()
DataFrame::new(col).unwrap()
}

/// Transform the underlying chunks in the DataFrame to Arrow RecordBatches
pub fn as_record_batches(&self) -> Result<Vec<RecordBatch>> {
let n_chunks = self.n_chunks()?;
let width = self.width();

let schema = Arc::new(self.schema());

let mut record_batches = Vec::with_capacity(n_chunks);
for i in 0..n_chunks {
// the columns of a single recorbatch
Expand All @@ -839,7 +829,7 @@ impl DataFrame {
for col in &self.columns {
rb_cols.push(Arc::clone(&col.chunks()[i]))
}
let rb = RecordBatch::try_new(Arc::clone(&self.schema), rb_cols)?;
let rb = RecordBatch::try_new(Arc::clone(&schema), rb_cols)?;
record_batches.push(rb)
}
Ok(record_batches)
Expand All @@ -862,7 +852,7 @@ impl DataFrame {
}
RecordBatchIter {
columns: &self.columns,
schema: &self.schema,
schema: Arc::new(self.schema()),
buffer_size,
idx: 0,
len: self.height(),
Expand All @@ -872,7 +862,7 @@ impl DataFrame {
/// Get a DataFrame with all the columns in reversed order
pub fn reverse(&self) -> Self {
let col = self.columns.iter().map(|s| s.reverse()).collect::<Vec<_>>();
DataFrame::new_with_schema(self.schema.clone(), col).unwrap()
DataFrame::new(col).unwrap()
}

/// Shift the values by a given period and fill the parts that will be empty due to this operation
Expand All @@ -885,7 +875,7 @@ impl DataFrame {
.iter()
.map(|s| s.shift(periods))
.collect::<Result<Vec<_>>>()?;
DataFrame::new_with_schema(self.schema.clone(), col)
DataFrame::new(col)
}

/// Replace None values with one of the following strategies:
Expand All @@ -902,7 +892,7 @@ impl DataFrame {
.iter()
.map(|s| s.fill_none(strategy))
.collect::<Result<Vec<_>>>()?;
DataFrame::new_with_schema(self.schema.clone(), col)
DataFrame::new(col)
}

pub fn pipe<F, B>(self, f: F) -> Result<B>
Expand All @@ -922,7 +912,7 @@ impl DataFrame {

pub struct RecordBatchIter<'a> {
columns: &'a Vec<Series>,
schema: &'a Arc<Schema>,
schema: Arc<Schema>,
buffer_size: usize,
idx: usize,
len: usize,
Expand All @@ -949,22 +939,12 @@ impl<'a> Iterator for RecordBatchIter<'a> {
let slice = s.slice(self.idx, length).unwrap();
rb_cols.push(Arc::clone(&slice.chunks()[0]))
});
let rb = RecordBatch::try_new(Arc::clone(self.schema), rb_cols).unwrap();
let rb = RecordBatch::try_new(Arc::clone(&self.schema), rb_cols).unwrap();
self.idx += length;
Some(rb)
}
}

impl FromIterator<Series> for DataFrame {
/// # Panics
///
/// Panics if Series have different lengths.
fn from_iter<T: IntoIterator<Item = Series>>(iter: T) -> Self {
let v = iter.into_iter().collect();
DataFrame::new(v).expect("could not create DataFrame from iterator")
}
}

#[cfg(test)]
mod test {
use crate::prelude::*;
Expand Down
2 changes: 1 addition & 1 deletion polars/src/frame/ser/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ where
}

fn finish(self, df: &mut DataFrame) -> Result<()> {
let mut ipc_writer = ArrowIPCFileWriter::try_new(self.writer, &df.schema)?;
let mut ipc_writer = ArrowIPCFileWriter::try_new(self.writer, &df.schema())?;

let iter = df.iter_record_batches(self.batch_size);

Expand Down
5 changes: 1 addition & 4 deletions polars/src/frame/ser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,5 @@ pub fn finish_reader<R: ArrowReader>(mut reader: R, rechunk: bool) -> Result<Dat
.collect::<Result<Vec<_>>>()?;
}

Ok(DataFrame {
schema: reader.schema(),
columns,
})
Ok(DataFrame { columns })
}

0 comments on commit be46ebe

Please sign in to comment.