Skip to content

Commit

Permalink
chore[rust]: remove parking lot dep (#4987)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 26, 2022
1 parent 936dab7 commit 1a4f0f3
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"polars/polars-utils",
"polars/polars-ops",
"polars/polars-algo",
"polars-sql",
"examples/read_csv",
"examples/read_parquet",
"examples/python_rust_compiled_function",
Expand All @@ -24,7 +25,6 @@ ahash = "0.7"
# todo! remove
anyhow = "1"
hashbrown = { version = "0.12", features = ["rayon"] }
parking_lot = "0.12"

[workspace.dependencies.arrow]
package = "arrow2"
Expand Down
2 changes: 0 additions & 2 deletions polars-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,3 @@ features = [
"cross_join",
"strings",
]

[workspace]
210 changes: 210 additions & 0 deletions polars/polars-core/src/frame/groupby/aggregations/dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use super::*;

impl Series {
fn slice_from_offsets(&self, first: IdxSize, len: IdxSize) -> Self {
self.slice(first as i64, len as usize)
}

fn restore_logical(&self, out: Series) -> Series {
if self.is_logical() {
out.cast(self.dtype()).unwrap()
} else {
out
}
}

#[doc(hidden)]
pub fn agg_valid_count(&self, groups: &GroupsProxy) -> Series {
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<IdxType, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
None
} else if !self.has_validity() {
Some(idx.len() as IdxSize)
} else {
let take =
unsafe { self.take_iter_unchecked(&mut idx.iter().map(|i| *i as usize)) };
Some((take.len() - take.null_count()) as IdxSize)
}
}),
GroupsProxy::Slice { groups, .. } => {
_agg_helper_slice::<IdxType, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
if len == 0 {
None
} else if !self.has_validity() {
Some(len)
} else {
let take = self.slice_from_offsets(first, len);
Some((take.len() - take.null_count()) as IdxSize)
}
})
}
}
}

#[doc(hidden)]
pub unsafe fn agg_first(&self, groups: &GroupsProxy) -> Series {
let out = match groups {
GroupsProxy::Idx(groups) => {
let mut iter = groups.iter().map(|(first, idx)| {
if idx.is_empty() {
None
} else {
Some(first as usize)
}
});
// Safety:
// groups are always in bounds
self.take_opt_iter_unchecked(&mut iter)
}
GroupsProxy::Slice { groups, .. } => {
let mut iter =
groups.iter().map(
|&[first, len]| {
if len == 0 {
None
} else {
Some(first as usize)
}
},
);
// Safety:
// groups are always in bounds
self.take_opt_iter_unchecked(&mut iter)
}
};
self.restore_logical(out)
}

#[doc(hidden)]
pub unsafe fn agg_n_unique(&self, groups: &GroupsProxy) -> Series {
match groups {
GroupsProxy::Idx(groups) => agg_helper_idx_on_all::<IdxType, _>(groups, |idx| {
debug_assert!(idx.len() <= self.len());
if idx.is_empty() {
None
} else {
let take = self.take_iter_unchecked(&mut idx.iter().map(|i| *i as usize));
take.n_unique().ok().map(|v| v as IdxSize)
}
}),
GroupsProxy::Slice { groups, .. } => {
_agg_helper_slice::<IdxType, _>(groups, |[first, len]| {
debug_assert!(len <= self.len() as IdxSize);
if len == 0 {
None
} else {
let take = self.slice_from_offsets(first, len);
take.n_unique().ok().map(|v| v as IdxSize)
}
})
}
}
}

#[doc(hidden)]
pub unsafe fn agg_median(&self, groups: &GroupsProxy) -> Series {
use DataType::*;

match self.dtype() {
Float32 => SeriesWrap(self.f32().unwrap().clone()).agg_median(groups),
Float64 => SeriesWrap(self.f64().unwrap().clone()).agg_median(groups),
dt if dt.is_numeric() || dt.is_temporal() => {
let ca = self.to_physical_repr();
let physical_type = ca.dtype();
let s = apply_method_physical_integer!(ca, agg_median, groups);
if dt.is_logical() {
// back to physical and then
// back to logical type
s.cast(physical_type).unwrap().cast(dt).unwrap()
} else {
s
}
}
_ => Series::full_null("", groups.len(), self.dtype()),
}
}

#[doc(hidden)]
pub unsafe fn agg_quantile(
&self,
groups: &GroupsProxy,
quantile: f64,
interpol: QuantileInterpolOptions,
) -> Series {
use DataType::*;

match self.dtype() {
Float32 => {
SeriesWrap(self.f32().unwrap().clone()).agg_quantile(groups, quantile, interpol)
}
Float64 => {
SeriesWrap(self.f64().unwrap().clone()).agg_quantile(groups, quantile, interpol)
}
dt if dt.is_numeric() || dt.is_temporal() => {
let ca = self.to_physical_repr();
let physical_type = ca.dtype();
let s =
apply_method_physical_integer!(ca, agg_quantile, groups, quantile, interpol);
if dt.is_logical() {
// back to physical and then
// back to logical type
s.cast(physical_type).unwrap().cast(dt).unwrap()
} else {
s
}
}
_ => Series::full_null("", groups.len(), self.dtype()),
}
}

#[doc(hidden)]
pub unsafe fn agg_mean(&self, groups: &GroupsProxy) -> Series {
use DataType::*;

match self.dtype() {
Float32 => SeriesWrap(self.f32().unwrap().clone()).agg_mean(groups),
Float64 => SeriesWrap(self.f64().unwrap().clone()).agg_mean(groups),
dt if dt.is_numeric() => {
apply_method_physical_integer!(self, agg_mean, groups)
}
dt @ Duration(_) => {
let s = self.to_physical_repr();
// agg_mean returns Float64
let out = s.agg_mean(groups);
// cast back to Int64 and then to logical duration type
out.cast(&Int64).unwrap().cast(dt).unwrap()
}
_ => Series::full_null("", groups.len(), self.dtype()),
}
}

#[doc(hidden)]
pub unsafe fn agg_last(&self, groups: &GroupsProxy) -> Series {
let out = match groups {
GroupsProxy::Idx(groups) => {
let mut iter = groups.all().iter().map(|idx| {
if idx.is_empty() {
None
} else {
Some(idx[idx.len() - 1] as usize)
}
});
self.take_opt_iter_unchecked(&mut iter)
}
GroupsProxy::Slice { groups, .. } => {
let mut iter = groups.iter().map(|&[first, len]| {
if len == 0 {
None
} else {
Some((first + len - 1) as usize)
}
});
self.take_opt_iter_unchecked(&mut iter)
}
};
self.restore_logical(out)
}
}
1 change: 0 additions & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ test_all = [
ahash.workspace = true
bitflags = "1.3"
glob = "0.3"
parking_lot.workspace = true
pyo3 = { version = "0.16", optional = true }
rayon.workspace = true
regex = { version = "1.5", optional = true }
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-lazy/src/dsl/list.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use parking_lot::Mutex;
use std::sync::Mutex;

use polars_arrow::utils::CustomIterTools;
use polars_core::prelude::*;
use polars_core::series::ops::NullBehavior;
Expand Down Expand Up @@ -242,14 +243,14 @@ impl ListNameSpace {
match out {
Ok(s) => Some(s),
Err(e) => {
*m_err.lock() = Some(e);
*m_err.lock().unwrap() = Some(e);
None
}
}
})
})
.collect();
err = m_err.lock().take();
err = m_err.lock().unwrap().take();
ca
} else {
let mut df_container = DataFrame::new_no_checks(vec![]);
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io::{Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::sync::Mutex;

use parking_lot::Mutex;
use polars_core::frame::explode::MeltArgs;
use polars_core::prelude::*;
use polars_core::utils::try_get_supertype;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ pub(crate) fn to_alp(
LogicalPlan::Error { err, .. } => {
// We just take the error. The LogicalPlan should not be used anymore once this
// is taken.
let mut err = err.lock();
let mut err = err.lock().unwrap();
return Err(err.take().unwrap());
}
LogicalPlan::ExtContext {
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::cell::Cell;
use std::fmt::Debug;
#[cfg(any(feature = "ipc", feature = "csv-file", feature = "parquet"))]
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use parking_lot::Mutex;
use polars_core::prelude::*;

use crate::logical_plan::LogicalPlan::DataFrameScan;
Expand Down Expand Up @@ -267,7 +266,7 @@ impl LogicalPlan {
}
Error { err, .. } => {
// We just take the error. The LogicalPlan should not be used anymore once this
let mut err = err.lock();
let mut err = err.lock().unwrap();
match err.take() {
Some(err) => Err(err),
None => Err(PolarsError::ComputeError(
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl PhysicalExpr for WindowExpr {
cache_key.push_str(s.name());
}

let mut gt_map = state.group_tuples.lock();
let mut gt_map = state.group_tuples.lock().unwrap();
// we run sequential and partitioned
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(gt_map.len() <= 1);
Expand Down Expand Up @@ -291,7 +291,7 @@ impl PhysicalExpr for WindowExpr {
let cache_gb = |gb: GroupBy| {
if state.cache_window() {
let groups = gb.take_groups();
let mut gt_map = state.group_tuples.lock();
let mut gt_map = state.group_tuples.lock().unwrap();
gt_map.insert(cache_key.clone(), groups);
} else {
// drop the group tuples to reduce allocated memory.
Expand Down Expand Up @@ -449,7 +449,7 @@ impl PhysicalExpr for WindowExpr {

// try to get cached join_tuples
let join_opt_ids = if state.cache_window() {
let mut jt_map = state.join_tuples.lock();
let mut jt_map = state.join_tuples.lock().unwrap();
// we run sequential and partitioned
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(jt_map.len() <= 1);
Expand All @@ -469,7 +469,7 @@ impl PhysicalExpr for WindowExpr {
}

if state.cache_window() {
let mut jt_map = state.join_tuples.lock();
let mut jt_map = state.join_tuples.lock().unwrap();
jt_map.insert(cache_key, join_opt_ids);
}

Expand Down
7 changes: 4 additions & 3 deletions polars/polars-lazy/src/physical_plan/file_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use parking_lot::Mutex;
use std::sync::Mutex;

use polars_core::prelude::*;

use crate::prelude::file_caching::FileFingerPrint;
Expand Down Expand Up @@ -29,7 +30,7 @@ impl FileCache {
#[cfg(debug_assertions)]
pub(crate) fn assert_empty(&self) {
for (_, guard) in self.inner.iter() {
let state = guard.lock();
let state = guard.lock().unwrap();
assert!(state.1.is_empty());
}
}
Expand All @@ -51,7 +52,7 @@ impl FileCache {
} else {
// should exist
let guard = self.inner.get(&finger_print).unwrap();
let mut state = guard.lock();
let mut state = guard.lock().unwrap();

// initialize df
if state.0 == 0 {
Expand Down
7 changes: 3 additions & 4 deletions polars/polars-lazy/src/physical_plan/node_timer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use parking_lot::Mutex;
use polars_core::prelude::*;
use polars_core::utils::NoNull;

Expand All @@ -26,15 +25,15 @@ impl NodeTimer {
}

pub(super) fn store(&self, start: StartInstant, end: EndInstant, name: String) {
let mut data = self.data.lock();
let mut data = self.data.lock().unwrap();
let nodes = &mut data.0;
nodes.push(name);
let ticks = &mut data.1;
ticks.push((start, end))
}

pub(super) fn finish(self) -> PolarsResult<DataFrame> {
let mut data = self.data.lock();
let mut data = self.data.lock().unwrap();
let mut nodes = std::mem::take(&mut data.0);
nodes.push("optimization".to_string());

Expand Down

0 comments on commit 1a4f0f3

Please sign in to comment.