Skip to content

Commit

Permalink
add column to existing dataframe methods
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 6, 2020
1 parent 3e92266 commit abf96f2
Showing 1 changed file with 65 additions and 26 deletions.
91 changes: 65 additions & 26 deletions polars/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,33 +100,44 @@ impl DataFrame {

/// Ensure all the chunks in the DataFrame are aligned.
fn rechunk(&mut self) -> Result<&mut Self> {
let chunk_lens = self
.columns
.iter()
.map(|s| s.n_chunks())
.collect::<Vec<_>>();

let argmin = chunk_lens
.iter()
.position_min()
.ok_or(PolarsError::NoData)?;
let min_chunks = chunk_lens[argmin];

let to_rechunk = chunk_lens
.into_iter()
.enumerate()
.filter_map(|(idx, len)| if len > min_chunks { Some(idx) } else { None })
.collect::<Vec<_>>();

// clone shouldn't be too expensive as we expect the nr. of chunks to be close to 1.
let chunk_id = self.columns[argmin].chunk_lengths().clone();
let mut chunk_lens = Vec::with_capacity(self.columns.len());
let mut all_equal = true;
for series in &self.columns {
let current_len = series.len();
if chunk_lens.len() > 1 {
if current_len != chunk_lens[0] {
all_equal = false
}
}
chunk_lens.push(series.len())
}

for idx in to_rechunk {
let col = &self.columns[idx];
let new_col = col.rechunk(Some(&chunk_id))?;
self.columns[idx] = new_col;
// fast path
if all_equal {
Ok(self)
} else {
let argmin = chunk_lens
.iter()
.position_min()
.ok_or(PolarsError::NoData)?;
let min_chunks = chunk_lens[argmin];

let to_rechunk = chunk_lens
.into_iter()
.enumerate()
.filter_map(|(idx, len)| if len > min_chunks { Some(idx) } else { None })
.collect::<Vec<_>>();

// clone shouldn't be too expensive as we expect the nr. of chunks to be close to 1.
let chunk_id = self.columns[argmin].chunk_lengths().clone();

for idx in to_rechunk {
let col = &self.columns[idx];
let new_col = col.rechunk(Some(&chunk_id))?;
self.columns[idx] = new_col;
}
Ok(self)
}
Ok(self)
}

/// Get a reference to the DataFrame schema.
Expand Down Expand Up @@ -299,7 +310,25 @@ impl DataFrame {
Ok(DataFrame::new_no_checks(new_cols))
}

/// Get a row in the dataframe. Beware this is slow.
/// Add a new column to this `DataFrame`.
pub fn add_column<S: IntoSeries>(&mut self, column: S) -> Result<&mut Self> {
let series = column.into_series();
if series.len() == self.height() {
self.columns.push(series);
Ok(self)
} else {
Err(PolarsError::ShapeMisMatch)
}
}

/// Create a new `DataFrame` with the column added.
pub fn with_column<S: IntoSeries>(&self, column: S) -> Result<Self> {
let mut df = self.clone();
df.add_column(column)?;
Ok(df)
}

/// Get a row in the `DataFrame` Beware this is slow.
///
/// # Example
///
Expand Down Expand Up @@ -915,19 +944,29 @@ impl DataFrame {
Ok(DataFrame::new_no_checks(col))
}

/// Pipe different functions/ closure operations that work on a DataFrame together.
pub fn pipe<F, B>(self, f: F) -> Result<B>
where
F: Fn(DataFrame) -> Result<B>,
{
f(self)
}

/// Pipe different functions/ closure operations that work on a DataFrame together.
pub fn pipe_mut<F, B>(&mut self, f: F) -> Result<B>
where
F: Fn(&mut DataFrame) -> Result<B>,
{
f(self)
}

/// Pipe different functions/ closure operations that work on a DataFrame together.
pub fn pipe_with_args<F, B, Args>(self, f: F, args: Args) -> Result<B>
where
F: Fn(DataFrame, Args) -> Result<B>,
{
f(self, args)
}
}

pub struct RecordBatchIter<'a> {
Expand Down

0 comments on commit abf96f2

Please sign in to comment.