Skip to content

Commit

Permalink
refactor(rust): streaming improvements (#5541)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 18, 2022
1 parent dc49eaf commit 9377116
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 136 deletions.
11 changes: 11 additions & 0 deletions polars/polars-core/src/datatypes/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,52 +430,62 @@ impl<'a> Hash for AnyValue<'a> {
impl<'a> Eq for AnyValue<'a> {}

impl From<f64> for AnyValue<'_> {
#[inline]
fn from(a: f64) -> Self {
AnyValue::Float64(a)
}
}
impl From<f32> for AnyValue<'_> {
#[inline]
fn from(a: f32) -> Self {
AnyValue::Float32(a)
}
}
impl From<u32> for AnyValue<'_> {
#[inline]
fn from(a: u32) -> Self {
AnyValue::UInt32(a)
}
}
impl From<u64> for AnyValue<'_> {
#[inline]
fn from(a: u64) -> Self {
AnyValue::UInt64(a)
}
}
impl From<i64> for AnyValue<'_> {
#[inline]
fn from(a: i64) -> Self {
AnyValue::Int64(a)
}
}
impl From<i32> for AnyValue<'_> {
#[inline]
fn from(a: i32) -> Self {
AnyValue::Int32(a)
}
}
impl From<i16> for AnyValue<'_> {
#[inline]
fn from(a: i16) -> Self {
AnyValue::Int16(a)
}
}
impl From<u16> for AnyValue<'_> {
#[inline]
fn from(a: u16) -> Self {
AnyValue::UInt16(a)
}
}

impl From<i8> for AnyValue<'_> {
#[inline]
fn from(a: i8) -> Self {
AnyValue::Int8(a)
}
}
impl From<u8> for AnyValue<'_> {
#[inline]
fn from(a: u8) -> Self {
AnyValue::UInt8(a)
}
Expand All @@ -485,6 +495,7 @@ impl<'a, T> From<Option<T>> for AnyValue<'a>
where
T: Into<AnyValue<'a>>,
{
#[inline]
fn from(a: Option<T>) -> Self {
match a {
None => AnyValue::Null,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub(crate) fn check_categorical_src(l: &DataType, r: &DataType) -> PolarsResult<
(DataType::Categorical(Some(l)), DataType::Categorical(Some(r))) => {
if !l.same_src(r) {
return Err(PolarsError::ComputeError("Joins/or comparisons on categorical dtypes can only happen if they are created under the same global string cache.\
Hint: set a global StringCache".into()));
Hint: set a global StringCache".into()));
}
Ok(())
}
Expand Down
12 changes: 8 additions & 4 deletions polars/polars-core/src/vector_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,14 @@ impl VecHash for Utf8Chunked {
buf.reserve(self.len());
let null_h = get_null_hash_value(random_state.clone());
self.downcast_iter().for_each(|arr| {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => random_state.hash_single(v),
None => null_h,
}))
if arr.null_count() == 0 {
buf.extend(arr.values_iter().map(|v| random_state.hash_single(v)))
} else {
buf.extend(arr.into_iter().map(|opt_v| match opt_v {
Some(v) => random_state.hash_single(v),
None => null_h,
}))
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
{
match expr_arena.get(node) {
AExpr::Alias(input, _) => convert_to_hash_agg(*input, expr_arena, schema, to_physical),
AExpr::Count => (
AExpr::Count | AExpr::Agg(AAggExpr::Count(_)) => (
Arc::new(Count {}),
AggregateFunction::Count(CountAgg::new()),
),
Expand Down Expand Up @@ -143,7 +143,7 @@ where
let dtype = phys_expr.field(schema).unwrap().dtype;
(phys_expr, AggregateFunction::Last(LastAgg::new(dtype)))
}
_ => todo!(),
agg => panic!("{:?} not yet implemented.", agg),
},
_ => todo!(),
}
Expand Down

0 comments on commit 9377116

Please sign in to comment.