Skip to content

Commit

Permalink
Use Polars ThreadPool (#1927)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibENPC committed Nov 29, 2021
1 parent d889304 commit 6dd9ded
Showing 1 changed file with 64 additions and 54 deletions.
118 changes: 64 additions & 54 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl DataFrame {
/// Aggregate all the chunks in the DataFrame to a single chunk in parallel.
/// This may lead to more peak memory consumption.
pub fn as_single_chunk_par(&mut self) -> &mut Self {
self.columns = self.columns.par_iter().map(|s| s.rechunk()).collect();
self.columns = POOL.install(|| self.columns.par_iter().map(|s| s.rechunk()).collect());
self
}

Expand Down Expand Up @@ -1266,14 +1266,15 @@ impl DataFrame {
where
I: Iterator<Item = usize> + Clone + Sync,
{
let new_col = self
.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_iter(&mut i)
})
.collect::<Result<_>>()?;
let new_col = POOL.install(|| {
self.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_iter(&mut i)
})
.collect::<Result<_>>()
})?;
Ok(DataFrame::new_no_checks(new_col))
}

Expand Down Expand Up @@ -1312,13 +1313,15 @@ impl DataFrame {
.map(|s| s.take_iter_unchecked(&mut iter))
.collect::<Vec<_>>()
} else {
self.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_iter_unchecked(&mut i)
})
.collect::<Vec<_>>()
POOL.install(|| {
self.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_iter_unchecked(&mut i)
})
.collect::<Vec<_>>()
})
};
DataFrame::new_no_checks(new_col)
}
Expand Down Expand Up @@ -1359,13 +1362,15 @@ impl DataFrame {
.map(|s| s.take_opt_iter_unchecked(&mut iter))
.collect::<Vec<_>>()
} else {
self.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_opt_iter_unchecked(&mut i)
})
.collect::<Vec<_>>()
POOL.install(|| {
self.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_opt_iter_unchecked(&mut i)
})
.collect::<Vec<_>>()
})
};

DataFrame::new_no_checks(new_col)
Expand Down Expand Up @@ -1988,7 +1993,7 @@ impl DataFrame {
///
/// See the method on [Series](../series/enum.Series.html#method.shift) for more info on the `shift` operation.
pub fn shift(&self, periods: i64) -> Self {
let col = self.columns.par_iter().map(|s| s.shift(periods)).collect();
let col = POOL.install(|| self.columns.par_iter().map(|s| s.shift(periods)).collect());
DataFrame::new_no_checks(col)
}

Expand All @@ -2001,11 +2006,12 @@ impl DataFrame {
///
/// See the method on [Series](../series/enum.Series.html#method.fill_null) for more info on the `fill_null` operation.
pub fn fill_null(&self, strategy: FillNullStrategy) -> Result<Self> {
let col = self
.columns
.par_iter()
.map(|s| s.fill_null(strategy))
.collect::<Result<Vec<_>>>()?;
let col = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.fill_null(strategy))
.collect::<Result<Vec<_>>>()
})?;
Ok(DataFrame::new_no_checks(col))
}

Expand Down Expand Up @@ -2038,7 +2044,7 @@ impl DataFrame {
/// +---------+---------+
/// ```
pub fn max(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.max_as_series()).collect();
let columns = POOL.install(|| self.columns.par_iter().map(|s| s.max_as_series()).collect());
DataFrame::new_no_checks(columns)
}

Expand Down Expand Up @@ -2071,7 +2077,7 @@ impl DataFrame {
/// +-------------------+--------------------+
/// ```
pub fn std(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.std_as_series()).collect();
let columns = POOL.install(|| self.columns.par_iter().map(|s| s.std_as_series()).collect());
DataFrame::new_no_checks(columns)
}
/// Aggregate the columns to their variation values.
Expand Down Expand Up @@ -2103,7 +2109,7 @@ impl DataFrame {
/// +---------+---------+
/// ```
pub fn var(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.var_as_series()).collect();
let columns = POOL.install(|| self.columns.par_iter().map(|s| s.var_as_series()).collect());
DataFrame::new_no_checks(columns)
}

Expand Down Expand Up @@ -2136,7 +2142,7 @@ impl DataFrame {
/// +---------+---------+
/// ```
pub fn min(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.min_as_series()).collect();
let columns = POOL.install(|| self.columns.par_iter().map(|s| s.min_as_series()).collect());
DataFrame::new_no_checks(columns)
}

Expand Down Expand Up @@ -2169,7 +2175,7 @@ impl DataFrame {
/// +---------+---------+
/// ```
pub fn sum(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.sum_as_series()).collect();
let columns = POOL.install(|| self.columns.par_iter().map(|s| s.sum_as_series()).collect());
DataFrame::new_no_checks(columns)
}

Expand Down Expand Up @@ -2202,11 +2208,12 @@ impl DataFrame {
/// +---------+---------+
/// ```
pub fn mean(&self) -> Self {
let columns = self
.columns
.par_iter()
.map(|s| s.mean_as_series())
.collect();
let columns = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.mean_as_series())
.collect()
});
DataFrame::new_no_checks(columns)
}

Expand Down Expand Up @@ -2239,21 +2246,23 @@ impl DataFrame {
/// +---------+---------+
/// ```
pub fn median(&self) -> Self {
let columns = self
.columns
.par_iter()
.map(|s| s.median_as_series())
.collect();
let columns = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.median_as_series())
.collect()
});
DataFrame::new_no_checks(columns)
}

/// Aggregate the columns to their quantile values.
pub fn quantile(&self, quantile: f64) -> Result<Self> {
let columns = self
.columns
.par_iter()
.map(|s| s.quantile_as_series(quantile))
.collect::<Result<Vec<_>>>()?;
let columns = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.quantile_as_series(quantile))
.collect::<Result<Vec<_>>>()
})?;
Ok(DataFrame::new_no_checks(columns))
}

Expand Down Expand Up @@ -2436,11 +2445,12 @@ impl DataFrame {
/// +------+------+------+--------+--------+--------+---------+---------+---------+
/// ```
pub fn to_dummies(&self) -> Result<Self> {
let cols = self
.columns
.par_iter()
.map(|s| s.to_dummies())
.collect::<Result<Vec<_>>>()?;
let cols = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.to_dummies())
.collect::<Result<Vec<_>>>()
})?;

accumulate_dataframes_horizontal(cols)
}
Expand Down

0 comments on commit 6dd9ded

Please sign in to comment.