Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for adding interval. (#417)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Sep 17, 2021
1 parent 1655f53 commit 4701f10
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 99 deletions.
69 changes: 22 additions & 47 deletions src/array/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,56 +66,31 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> Strin
dyn_primitive!(array, i64, temporal_conversions::time64ns_to_time)
}
Time64(_) => unreachable!(), // remaining are not valid
Timestamp(TimeUnit::Second, tz) => {
Timestamp(time_unit, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_s_to_datetime(x),
offset,
)
})
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_s_to_datetime)
}
}
Timestamp(TimeUnit::Millisecond, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_ms_to_datetime(x),
offset,
)
})
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_ms_to_datetime)
}
}
Timestamp(TimeUnit::Microsecond, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_us_to_datetime(x),
offset,
)
})
let timezone = temporal_conversions::parse_offset(tz);
match timezone {
Ok(timezone) => {
dyn_primitive!(array, i64, |time| {
temporal_conversions::timestamp_to_datetime(time, *time_unit, &timezone)
})
}
#[cfg(feature = "chrono-tz")]
Err(_) => {
let timezone = temporal_conversions::parse_offset_tz(tz).unwrap();
dyn_primitive!(array, i64, |time| {
temporal_conversions::timestamp_to_datetime(time, *time_unit, &timezone)
})
}
#[cfg(not(feature = "chrono-tz"))]
_ => panic!(
"Invalid Offset format (must be [-]00:00) or chrono-tz feature not active"
),
}
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_us_to_datetime)
}
}
Timestamp(TimeUnit::Nanosecond, tz) => {
if let Some(tz) = tz {
let offset = temporal_conversions::parse_offset(tz).unwrap();
dyn_primitive!(array, i64, |x| {
chrono::DateTime::<chrono::FixedOffset>::from_utc(
temporal_conversions::timestamp_ns_to_datetime(x),
offset,
)
dyn_primitive!(array, i64, |time| {
temporal_conversions::timestamp_to_naive_datetime(time, *time_unit)
})
} else {
dyn_primitive!(array, i64, temporal_conversions::timestamp_ns_to_datetime)
}
}
Interval(IntervalUnit::YearMonth) => {
Expand Down
9 changes: 8 additions & 1 deletion src/compute/arithmetics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use std::ops::{Add, Div, Mul, Neg, Rem, Sub};

use num_traits::{NumCast, Zero};

use crate::datatypes::{DataType, TimeUnit};
use crate::datatypes::{DataType, IntervalUnit, TimeUnit};
use crate::error::{ArrowError, Result};
use crate::types::NativeType;
use crate::{array::*, bitmap::Bitmap};
Expand Down Expand Up @@ -145,6 +145,11 @@ pub fn arithmetic(lhs: &dyn Array, op: Operator, rhs: &dyn Array) -> Result<Box<
let rhs = rhs.as_any().downcast_ref().unwrap();
time::add_duration::<i64>(lhs, rhs).map(|x| Box::new(x) as Box<dyn Array>)
}
(Timestamp(_, _), Add, Interval(IntervalUnit::MonthDayNano)) => {
let lhs = lhs.as_any().downcast_ref().unwrap();
let rhs = rhs.as_any().downcast_ref().unwrap();
time::add_interval(lhs, rhs).map(|x| Box::new(x) as Box<dyn Array>)
}
(Time64(TimeUnit::Microsecond), Subtract, Duration(_))
| (Time64(TimeUnit::Nanosecond), Subtract, Duration(_))
| (Date64, Subtract, Duration(_))
Expand Down Expand Up @@ -214,6 +219,7 @@ pub fn can_arithmetic(lhs: &DataType, op: Operator, rhs: &DataType) -> bool {
| (Time64(TimeUnit::Nanosecond), Add, Duration(_))
| (Timestamp(_, _), Subtract, Duration(_))
| (Timestamp(_, _), Add, Duration(_))
| (Timestamp(_, _), Add, Interval(IntervalUnit::MonthDayNano))
| (Timestamp(_, None), Subtract, Timestamp(_, None))
)
}
Expand Down Expand Up @@ -462,6 +468,7 @@ mod tests {
Duration(TimeUnit::Millisecond),
Duration(TimeUnit::Microsecond),
Duration(TimeUnit::Nanosecond),
Interval(IntervalUnit::MonthDayNano),
];
let operators = vec![
Operator::Add,
Expand Down
71 changes: 65 additions & 6 deletions src/compute/arithmetics/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::{
compute::arity::binary,
datatypes::{DataType, TimeUnit},
error::{ArrowError, Result},
temporal_conversions::{timeunit_scale, SECONDS_IN_DAY},
types::NativeType,
temporal_conversions,
types::{months_days_ns, NativeType},
};

/// Creates the scale required to add or subtract a Duration to a time array
Expand All @@ -36,20 +36,21 @@ fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
| (DataType::Time32(timeunit_a), DataType::Duration(timeunit_b))
| (DataType::Time64(timeunit_a), DataType::Duration(timeunit_b)) => {
// The scale is based on the TimeUnit that each of the numbers have.
timeunit_scale(*timeunit_a, *timeunit_b)
temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b)
}
(DataType::Date32, DataType::Duration(timeunit)) => {
// Date32 represents the time elapsed time since UNIX epoch
// (1970-01-01) in days (32 bits). The duration value has to be
// scaled to days to be able to add the value to the Date.
timeunit_scale(TimeUnit::Second, *timeunit) / SECONDS_IN_DAY as f64
temporal_conversions::timeunit_scale(TimeUnit::Second, *timeunit)
/ temporal_conversions::SECONDS_IN_DAY as f64
}
(DataType::Date64, DataType::Duration(timeunit)) => {
// Date64 represents the time elapsed time since UNIX epoch
// (1970-01-01) in milliseconds (64 bits). The duration value has
// to be scaled to milliseconds to be able to add the value to the
// Date.
timeunit_scale(TimeUnit::Millisecond, *timeunit)
temporal_conversions::timeunit_scale(TimeUnit::Millisecond, *timeunit)
}
_ => {
return Err(ArrowError::InvalidArgumentError(
Expand Down Expand Up @@ -216,7 +217,7 @@ pub fn subtract_timestamps(
(DataType::Timestamp(timeunit_a, None), DataType::Timestamp(timeunit_b, None)) => {
// Closure for the binary operation. The closure contains the scale
// required to calculate the difference between the timestamps.
let scale = timeunit_scale(*timeunit_a, *timeunit_b);
let scale = temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b);
let op = move |a, b| a - (b as f64 * scale) as i64;

binary(lhs, rhs, DataType::Duration(*timeunit_a), op)
Expand All @@ -227,6 +228,64 @@ pub fn subtract_timestamps(
}
}

/// Adds an interval to a [`DataType::Timestamp`].
pub fn add_interval(
timestamp: &PrimitiveArray<i64>,
interval: &PrimitiveArray<months_days_ns>,
) -> Result<PrimitiveArray<i64>> {
match timestamp.data_type().to_logical_type() {
DataType::Timestamp(time_unit, Some(timezone_str)) => {
let time_unit = *time_unit;
let timezone = temporal_conversions::parse_offset(timezone_str);
match timezone {
Ok(timezone) => binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
),
#[cfg(feature = "chrono-tz")]
Err(_) => {
let timezone = temporal_conversions::parse_offset_tz(timezone_str)?;
binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
)
}
#[cfg(not(feature = "chrono-tz"))]
_ => Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
))),
}
}
DataType::Timestamp(time_unit, None) => {
let time_unit = *time_unit;
binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_naive_interval(timestamp, time_unit, interval)
},
)
}
_ => Err(ArrowError::InvalidArgumentError(
"Adding an interval is only supported for `DataType::Timestamp`".to_string(),
)),
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
17 changes: 5 additions & 12 deletions src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
bitmap::Bitmap,
compute::arity::unary,
datatypes::{DataType, TimeUnit},
error::ArrowError,
temporal_conversions::*,
types::NativeType,
};
Expand Down Expand Up @@ -329,17 +328,10 @@ fn chrono_tz_timestamp_to_utf8<O: Offset>(
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
let timezone = parse_offset_tz(timezone_str)?;
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
}

#[cfg(not(feature = "chrono-tz"))]
Expand All @@ -348,6 +340,7 @@ fn chrono_tz_timestamp_to_utf8<O: Offset>(
_: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
use crate::error::ArrowError;
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
Expand Down
22 changes: 4 additions & 18 deletions src/compute/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,8 @@ fn chrono_tz_hour(
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<PrimitiveArray<u32>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(extract_impl(array, time_unit, timezone, |x| x.hour()))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
let timezone = parse_offset_tz(timezone_str)?;
Ok(extract_impl(array, time_unit, timezone, |x| x.hour()))
}

#[cfg(not(feature = "chrono-tz"))]
Expand All @@ -112,15 +105,8 @@ fn chrono_tz_year(
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<PrimitiveArray<i32>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(extract_impl(array, time_unit, timezone, |x| x.year()))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
let timezone = parse_offset_tz(timezone_str)?;
Ok(extract_impl(array, time_unit, timezone, |x| x.year()))
}

#[cfg(not(feature = "chrono-tz"))]
Expand Down
Loading

0 comments on commit 4701f10

Please sign in to comment.