Skip to content

Commit

Permalink
fix outer join on floats and remove dtype-u64 flag
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 3, 2021
1 parent 43fa784 commit 080b351
Show file tree
Hide file tree
Showing 25 changed files with 129 additions and 139 deletions.
4 changes: 1 addition & 3 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ cross_join = ["polars-core/cross_join", "polars-lazy/cross_join"]
dot_product = ["polars-core/dot_product", "polars-lazy/dot_product"]
concat_str = ["polars-core/concat_str", "polars-lazy/concat_str"]
row_hash = ["polars-core/row_hash"]
reinterpret = ["polars-core/reinterpret", "polars-core/dtype-u64"]
reinterpret = ["polars-core/reinterpret"]
decompress = ["polars-io/decompress"]
decompress-fast = ["polars-io/decompress-fast"]
mode = ["polars-core/mode", "polars-lazy/mode"]
Expand Down Expand Up @@ -107,7 +107,6 @@ dtype-full = [
"dtype-i16",
"dtype-u8",
"dtype-u16",
"dtype-u64",
]

# sensible minimal set of opt-in datatypes
Expand All @@ -123,7 +122,6 @@ dtype-i8 = ["polars-core/dtype-i8", "polars-lazy/dtype-i8"]
dtype-i16 = ["polars-core/dtype-i16", "polars-lazy/dtype-i16"]
dtype-u8 = ["polars-core/dtype-u8", "polars-lazy/dtype-u8"]
dtype-u16 = ["polars-core/dtype-u16", "polars-lazy/dtype-u16"]
dtype-u64 = ["polars-core/dtype-u64", "polars-lazy/dtype-u64", "polars-io/dtype-u64"]
dtype-categorical = ["polars-core/dtype-categorical"]

docs-selection = [
Expand Down
1 change: 0 additions & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ dtype-i8 = []
dtype-i16 = []
dtype-u8 = []
dtype-u16 = []
dtype-u64 = []
dtype-categorical = []

parquet = ["arrow/io_parquet"]
Expand Down
1 change: 0 additions & 1 deletion polars/polars-core/src/chunked_array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ macro_rules! cast_with_dtype {
#[cfg(feature = "dtype-u16")]
UInt16 => ChunkCast::cast::<UInt16Type>($self).map(|ca| ca.into_series()),
UInt32 => ChunkCast::cast::<UInt32Type>($self).map(|ca| ca.into_series()),
#[cfg(feature = "dtype-u64")]
UInt64 => ChunkCast::cast::<UInt64Type>($self).map(|ca| ca.into_series()),
#[cfg(feature = "dtype-i8")]
Int8 => ChunkCast::cast::<Int8Type>($self).map(|ca| ca.into_series()),
Expand Down
64 changes: 64 additions & 0 deletions polars/polars-core/src/chunked_array/ops/bit_repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,67 @@ impl Reinterpret for Int64Chunked {
self.bit_repr_large().into_series()
}
}

impl UInt64Chunked {
pub(crate) fn reinterpret_float(&self) -> Series {
let chunks = self
.downcast_iter()
.map(|array| {
let buf = array.values().clone();
// Safety
// same bit length u64 <-> f64
let buf = unsafe { std::mem::transmute::<_, Buffer<f64>>(buf) };
Arc::new(PrimitiveArray::from_data(
ArrowDataType::Float64,
buf,
array.validity().cloned(),
)) as Arc<dyn Array>
})
.collect::<Vec<_>>();
Float64Chunked::new_from_chunks(self.name(), chunks).into()
}
}
impl UInt32Chunked {
pub(crate) fn reinterpret_float(&self) -> Series {
let chunks = self
.downcast_iter()
.map(|array| {
let buf = array.values().clone();
// Safety
// same bit length u32 <-> f32
let buf = unsafe { std::mem::transmute::<_, Buffer<f32>>(buf) };
Arc::new(PrimitiveArray::from_data(
ArrowDataType::Float32,
buf,
array.validity().cloned(),
)) as Arc<dyn Array>
})
.collect::<Vec<_>>();
Float32Chunked::new_from_chunks(self.name(), chunks).into()
}
}

/// Used to save compilation paths. Use carefully. Although this is safe,
/// if misused it can lead to incorrect results.
impl Float32Chunked {
pub(crate) fn apply_as_ints<F>(&self, f: F) -> Series
where
F: Fn(&Series) -> Series,
{
let s = self.bit_repr_small().into_series();
let out = f(&s);
let out = out.u32().unwrap();
out.reinterpret_float()
}
}
impl Float64Chunked {
pub(crate) fn apply_as_ints<F>(&self, f: F) -> Series
where
F: Fn(&Series) -> Series,
{
let s = self.bit_repr_large().into_series();
let out = f(&s);
let out = out.u64().unwrap();
out.reinterpret_float()
}
}
30 changes: 3 additions & 27 deletions polars/polars-core/src/chunked_array/ops/unique/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,38 +451,14 @@ impl ChunkUnique<Float64Type> for Float64Chunked {
}

fn arg_unique(&self) -> Result<UInt32Chunked> {
#[cfg(feature = "dtype-u64")]
{
self.bit_repr_large().arg_unique()
}

#[cfg(not(feature = "dtype-u64"))]
{
panic!("activate feature dtype-u64")
}
self.bit_repr_large().arg_unique()
}

fn is_unique(&self) -> Result<BooleanChunked> {
#[cfg(feature = "dtype-u64")]
{
self.bit_repr_large().is_unique()
}

#[cfg(not(feature = "dtype-u64"))]
{
panic!("activate feature dtype-u64")
}
self.bit_repr_large().is_unique()
}
fn is_duplicated(&self) -> Result<BooleanChunked> {
#[cfg(feature = "dtype-u64")]
{
self.bit_repr_large().is_duplicated()
}

#[cfg(not(feature = "dtype-u64"))]
{
panic!("activate feature dtype-u64")
}
self.bit_repr_large().is_duplicated()
}
fn value_counts(&self) -> Result<DataFrame> {
impl_value_counts!(self)
Expand Down
1 change: 0 additions & 1 deletion polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ impl DataFrame {
Categorical | Int8 | UInt8 | Int16 | UInt16 => s.cast::<UInt32Type>().unwrap(),
Float32 => s.bit_repr_small().into_series(),
// otherwise we use the vec hash for float
#[cfg(feature = "dtype-u64")]
Float64 => s.bit_repr_large().into_series(),
_ => {
// is date like
Expand Down
77 changes: 48 additions & 29 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,6 @@ impl HashJoin<Float64Type> for Float64Chunked {
}
}

impl HashJoin<ListType> for ListChunked {}
impl HashJoin<CategoricalType> for CategoricalChunked {
fn hash_join_inner(&self, other: &CategoricalChunked) -> Vec<(u32, u32)> {
self.deref().hash_join_inner(&other.cast().unwrap())
Expand Down Expand Up @@ -920,18 +919,11 @@ where
}
}
})
.collect::<ChunkedArray<T>>()
.collect_trusted::<ChunkedArray<T>>()
.into_series()
}
}

impl ZipOuterJoinColumn for Float32Chunked {}
impl ZipOuterJoinColumn for Float64Chunked {}
impl ZipOuterJoinColumn for ListChunked {}
impl ZipOuterJoinColumn for CategoricalChunked {}
#[cfg(feature = "object")]
impl<T> ZipOuterJoinColumn for ObjectChunked<T> {}

macro_rules! impl_zip_outer_join {
($chunkedtype:ident) => {
impl ZipOuterJoinColumn for $chunkedtype {
Expand Down Expand Up @@ -966,6 +958,36 @@ macro_rules! impl_zip_outer_join {
impl_zip_outer_join!(BooleanChunked);
impl_zip_outer_join!(Utf8Chunked);

impl ZipOuterJoinColumn for Float32Chunked {
fn zip_outer_join_column(
&self,
right_column: &Series,
opt_join_tuples: &[(Option<u32>, Option<u32>)],
) -> Series {
self.apply_as_ints(|s| {
s.zip_outer_join_column(
&right_column.bit_repr_small().into_series(),
opt_join_tuples,
)
})
}
}

impl ZipOuterJoinColumn for Float64Chunked {
fn zip_outer_join_column(
&self,
right_column: &Series,
opt_join_tuples: &[(Option<u32>, Option<u32>)],
) -> Series {
self.apply_as_ints(|s| {
s.zip_outer_join_column(
&right_column.bit_repr_large().into_series(),
opt_join_tuples,
)
})
}
}

impl DataFrame {
/// Utility method to finish a join.
pub(crate) fn finish_join(
Expand Down Expand Up @@ -1080,25 +1102,6 @@ impl DataFrame {
}
}

// check if we have floats, in that case we have to coerce them
if selected_left
.iter()
.any(|s| matches!(s.dtype(), DataType::Float32 | DataType::Float64))
{
#[cfg(feature = "dtype-u64")]
for selected in &mut [&mut selected_left, &mut selected_right] {
selected.iter_mut().for_each(|s| match s.dtype().clone() {
DataType::Float64 => *s = s.bit_repr_large().into_series(),
DataType::Float32 => *s = s.bit_repr_small().into_series(),
_ => {}
});
}
#[cfg(not(feature = "dtype-u64"))]
{
panic!("activate dtype-u64 feature to be able to join on floats")
}
}

// multiple keys
match how {
JoinType::Inner => {
Expand Down Expand Up @@ -1656,7 +1659,6 @@ mod test {

#[test]
#[cfg_attr(miri, ignore)]
#[cfg(feature = "dtype-u64")]
fn test_join_floats() -> Result<()> {
let df_a = df! {
"a" => &[1.0, 2.0, 1.0, 1.0],
Expand All @@ -1681,6 +1683,23 @@ mod test {
Vec::from(out.column("ham")?.utf8()?),
&[None, Some("var"), None, None]
);

let out = df_a.join(
&df_b,
vec!["a", "c"],
vec!["foo", "bar"],
JoinType::Outer,
None,
)?;
assert_eq!(
out.dtypes(),
&[
DataType::Utf8,
DataType::Float64,
DataType::Float64,
DataType::Utf8
]
);
Ok(())
}
}
6 changes: 0 additions & 6 deletions polars/polars-core/src/frame/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ pub(crate) enum Buffer {
Int32(PrimitiveChunkedBuilder<Int32Type>),
Int64(PrimitiveChunkedBuilder<Int64Type>),
UInt32(PrimitiveChunkedBuilder<UInt32Type>),
#[cfg(feature = "dtype-u64")]
UInt64(PrimitiveChunkedBuilder<UInt64Type>),
#[cfg(feature = "dtype-date32")]
Date32(PrimitiveChunkedBuilder<Date32Type>),
Expand All @@ -246,7 +245,6 @@ impl Debug for Buffer {
Int32(_) => f.write_str("i32"),
Int64(_) => f.write_str("i64"),
UInt32(_) => f.write_str("u32"),
#[cfg(feature = "dtype-u64")]
UInt64(_) => f.write_str("u64"),
#[cfg(feature = "dtype-date32")]
Date32(_) => f.write_str("date32"),
Expand All @@ -271,9 +269,7 @@ impl Buffer {
(Int64(builder), AnyValue::Null) => builder.append_null(),
(UInt32(builder), AnyValue::UInt32(v)) => builder.append_value(v),
(UInt32(builder), AnyValue::Null) => builder.append_null(),
#[cfg(feature = "dtype-u64")]
(UInt64(builder), AnyValue::UInt64(v)) => builder.append_value(v),
#[cfg(feature = "dtype-u64")]
(UInt64(builder), AnyValue::Null) => builder.append_null(),
#[cfg(feature = "dtype-date32")]
(Date32(builder), AnyValue::Null) => builder.append_null(),
Expand All @@ -296,7 +292,6 @@ impl Buffer {
Int32(b) => b.finish().into_series(),
Int64(b) => b.finish().into_series(),
UInt32(b) => b.finish().into_series(),
#[cfg(feature = "dtype-u64")]
UInt64(b) => b.finish().into_series(),
#[cfg(feature = "dtype-date32")]
Date32(b) => b.finish().into_series(),
Expand All @@ -319,7 +314,6 @@ impl From<(&DataType, usize)> for Buffer {
Int32 => Buffer::Int32(PrimitiveChunkedBuilder::new("", len)),
Int64 => Buffer::Int64(PrimitiveChunkedBuilder::new("", len)),
UInt32 => Buffer::UInt32(PrimitiveChunkedBuilder::new("", len)),
#[cfg(feature = "dtype-u64")]
UInt64 => Buffer::UInt64(PrimitiveChunkedBuilder::new("", len)),
#[cfg(feature = "dtype-date32")]
Date32 => Buffer::Date32(PrimitiveChunkedBuilder::new("", len)),
Expand Down
1 change: 0 additions & 1 deletion polars/polars-core/src/serde/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ impl<'de> Deserialize<'de> for Series {
let values: Vec<Option<i64>> = map.next_value()?;
Ok(Series::new(&name, values))
}
#[cfg(feature = "dtype-u64")]
DeDataType::UInt64 => {
let values: Vec<Option<u64>> = map.next_value()?;
Ok(Series::new(&name, values))
Expand Down
1 change: 0 additions & 1 deletion polars/polars-core/src/series/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ pub mod checked {
.unwrap()
.apply_on_opt(|opt_v| opt_v.and_then(|v| v.checked_div(rhs.to_i32().unwrap())))
.into_series(),
#[cfg(feature = "dtype-u64")]
UInt64 => s
.u64()
.unwrap()
Expand Down
6 changes: 5 additions & 1 deletion polars/polars-core/src/series/implementations/categorical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ impl private::PrivateSeries for SeriesWrap<CategoricalChunked> {
right_column: &Series,
opt_join_tuples: &[(Option<u32>, Option<u32>)],
) -> Series {
ZipOuterJoinColumn::zip_outer_join_column(&self.0, right_column, opt_join_tuples)
let ca = self.0.cast::<UInt32Type>().unwrap();
let right = right_column.cast_with_dtype(&DataType::UInt32).unwrap();
ZipOuterJoinColumn::zip_outer_join_column(&ca, &right, opt_join_tuples)
.cast_with_dtype(&DataType::Categorical)
.unwrap()
}
fn group_tuples(&self, multithreaded: bool) -> GroupTuples {
IntoGroupTuples::group_tuples(&self.0, multithreaded)
Expand Down
18 changes: 0 additions & 18 deletions polars/polars-core/src/series/implementations/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::frame::asof_join::JoinAsof;
#[cfg(feature = "pivot")]
use crate::frame::groupby::pivot::*;
use crate::frame::groupby::*;
use crate::frame::hash_join::{HashJoin, ZipOuterJoinColumn};
use crate::prelude::*;
use crate::series::implementations::SeriesWrap;
use ahash::RandomState;
Expand Down Expand Up @@ -112,23 +111,6 @@ impl private::PrivateSeries for SeriesWrap<ListChunked> {
) -> Result<DataFrame> {
self.0.pivot_count(pivot_series, keys, groups)
}
fn hash_join_inner(&self, other: &Series) -> Vec<(u32, u32)> {
HashJoin::hash_join_inner(&self.0, other.as_ref().as_ref())
}
fn hash_join_left(&self, other: &Series) -> Vec<(u32, Option<u32>)> {
HashJoin::hash_join_left(&self.0, other.as_ref().as_ref())
}
fn hash_join_outer(&self, other: &Series) -> Vec<(Option<u32>, Option<u32>)> {
HashJoin::hash_join_outer(&self.0, other.as_ref().as_ref())
}
fn zip_outer_join_column(
&self,
right_column: &Series,
opt_join_tuples: &[(Option<u32>, Option<u32>)],
) -> Series {
ZipOuterJoinColumn::zip_outer_join_column(&self.0, right_column, opt_join_tuples)
}

fn group_tuples(&self, multithreaded: bool) -> GroupTuples {
IntoGroupTuples::group_tuples(&self.0, multithreaded)
}
Expand Down

0 comments on commit 080b351

Please sign in to comment.