Skip to content

Commit

Permalink
drop crossbeam requirement
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 18, 2020
1 parent e39a6d1 commit 5dd9bda
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 35 deletions.
1 change: 0 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ itertools = "^0.9.0"
unsafe_unwrap = "^0.1.0"
rayon = "^1.3.1"
prettytable-rs = { version="^0.8.0", features=["win_crlf"], optional = true, default_features = false}
crossbeam = "^0.7"
chrono = {version = "^0.4.13", optional = true}
enum_dispatch = "^0.3.2"
parquet = {version = "1", optional = true}
Expand Down
41 changes: 21 additions & 20 deletions polars/src/frame/hash_join.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::prelude::*;
use crate::utils::Xob;
use crossbeam::thread;
use enum_dispatch::enum_dispatch;
use fnv::{FnvBuildHasher, FnvHashMap};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -195,7 +194,7 @@ where
}
}
});
hash_tbl.iter().for_each(|(_k, indexes_b)| {
hash_tbl.iter().for_each( |(_k, indexes_b)| {
// remaining joined values from the right table
results.extend(indexes_b.iter().map(|&idx_b| (Some(idx_b), None)))
});
Expand Down Expand Up @@ -524,13 +523,14 @@ impl DataFrame {
let s_right = other.column(right_on)?;
let join_tuples = apply_hash_join_on_series!(s_left, s_right, hash_join_inner);

let (df_left, df_right) = exec_concurrent!({ self.create_left_df(&join_tuples) }, {
let (df_left, df_right) = rayon::join(|| self.create_left_df(&join_tuples), || {
unsafe {
other.drop(right_on).unwrap().take_iter_unchecked(
join_tuples.iter().map(|(_left, right)| *right),
Some(join_tuples.len()),
)
}

});
self.finish_join(df_left, df_right)
}
Expand All @@ -550,13 +550,14 @@ impl DataFrame {
let opt_join_tuples: Vec<(usize, Option<usize>)> =
apply_hash_join_on_series!(s_left, s_right, hash_join_left);

let (df_left, df_right) = exec_concurrent!({ self.create_left_df(&opt_join_tuples) }, {
let (df_left, df_right) =rayon::join(|| self.create_left_df(&opt_join_tuples), || {
unsafe {
other.drop(right_on).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
}

});
self.finish_join(df_left, df_right)
}
Expand Down Expand Up @@ -584,23 +585,23 @@ impl DataFrame {
apply_hash_join_on_series!(s_left, s_right, hash_join_outer);

// Take the left and right dataframes by join tuples
let (mut df_left, df_right) = exec_concurrent!(
{
unsafe {
self.drop(left_on).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(left, _right)| *left),
Some(opt_join_tuples.len()),
)
}
},
{
unsafe {
other.drop(right_on).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
}
let (mut df_left, df_right) = rayon::join(||
{
unsafe {
self.drop(left_on).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(left, _right)| *left),
Some(opt_join_tuples.len()),
)
}
},
|| {
unsafe {
other.drop(right_on).unwrap().take_opt_iter_unchecked(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
}
}
);
let mut s = s_left.zip_outer_join_column(s_right, &opt_join_tuples);
s.rename(left_on);
Expand Down
14 changes: 0 additions & 14 deletions polars/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,6 @@ pub(crate) fn floating_encode_f64(mantissa: u64, exponent: i16, sign: i8) -> f64
sign as f64 * mantissa as f64 * (2.0f64).powf(exponent as f64)
}

#[macro_export]
macro_rules! exec_concurrent {
($block_a:block, $block_b:block) => {{
thread::scope(|s| {
let handle_left = s.spawn(|_| $block_a);
let handle_right = s.spawn(|_| $block_b);
let return_left = handle_left.join().expect("thread panicked");
let return_right = handle_right.join().expect("thread panicked");
(return_left, return_right)
})
.expect("could not join threads or thread panicked")
}};
}

/// Just a wrapper structure. Useful for certain impl specializations
/// This is for instance use to implement
/// `impl<T> FromIterator<T::Native> for Xob<ChunkedArray<T>>`
Expand Down

0 comments on commit 5dd9bda

Please sign in to comment.