Skip to content

Commit

Permalink
ARROW-11055: [Rust] [DataFusion] Support date_trunc function
Browse files Browse the repository at this point in the history
`date_trunc` SQL function implementation and GROUP BY timestamp support.

Closes apache#9040 from paveltiunov/date-trunc

Authored-by: Pavel Tiunov <pavel.tiunov@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
paveltiunov authored and GeorgeAp committed Jun 7, 2021
1 parent f02d9cb commit 7dabc25
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 6 deletions.
34 changes: 34 additions & 0 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,16 @@ fn parse(
&DataType::Date64(_) => {
build_primitive_array::<Date64Type>(line_number, rows, i)
}
&DataType::Timestamp(TimeUnit::Microsecond, _) => {
build_primitive_array::<TimestampMicrosecondType>(
line_number,
rows,
i,
)
}
&DataType::Timestamp(TimeUnit::Nanosecond, _) => {
build_primitive_array::<TimestampNanosecondType>(line_number, rows, i)
}
&DataType::Utf8 => Ok(Arc::new(
rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
) as ArrayRef),
Expand Down Expand Up @@ -531,6 +541,30 @@ impl Parser for Date64Type {
}
}

impl Parser for TimestampNanosecondType {
fn parse(string: &str) -> Option<i64> {
match Self::DATA_TYPE {
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
Self::Native::from_i64(date_time.timestamp_nanos())
}
_ => None,
}
}
}

impl Parser for TimestampMicrosecondType {
fn parse(string: &str) -> Option<i64> {
match Self::DATA_TYPE {
DataType::Timestamp(TimeUnit::Microsecond, None) => {
let date_time = string.parse::<chrono::NaiveDateTime>().ok()?;
Self::Native::from_i64(date_time.timestamp_nanos() / 1000)
}
_ => None,
}
}
}

fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
T::parse(string)
}
Expand Down
51 changes: 51 additions & 0 deletions rust/datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,57 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn group_by_date_trunc() -> Result<()> {
let tmp_dir = TempDir::new()?;
let mut ctx = ExecutionContext::new();
let schema = Arc::new(Schema::new(vec![
Field::new("c2", DataType::UInt64, false),
Field::new(
"t1",
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
),
]));

// generate a partitioned file
for partition in 0..4 {
let filename = format!("partition-{}.{}", partition, "csv");
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..10 {
let data = format!("{},2020-12-{}T00:00:00.000\n", i, i + 10);
file.write_all(data.as_bytes())?;
}
}

ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema).has_header(false),
)?;

let results = plan_and_collect(
&mut ctx,
"SELECT date_trunc('week', t1) as week, SUM(c2) FROM test GROUP BY date_trunc('week', t1)"
).await?;
assert_eq!(results.len(), 1);

let batch = &results[0];

assert_eq!(field_names(batch), vec!["week", "SUM(c2)"]);

let expected: Vec<&str> =
vec!["2020-12-07T00:00:00,24", "2020-12-14T00:00:00,156"];
let mut rows = test::format_batch(&batch);
rows.sort();
assert_eq!(rows, expected);

Ok(())
}

async fn run_count_distinct_integers_aggregated_scenario(
partitions: Vec<Vec<(&str, u64)>>,
) -> Result<Vec<RecordBatch>> {
Expand Down
161 changes: 161 additions & 0 deletions rust/datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow::{
buffer::Buffer,
datatypes::{DataType, TimeUnit, ToByteSlice},
};
use chrono::Duration;
use chrono::{prelude::*, LocalResult};

#[inline]
Expand Down Expand Up @@ -205,6 +206,108 @@ pub fn to_timestamp(args: &[ArrayRef]) -> Result<TimestampNanosecondArray> {
Ok(TimestampNanosecondArray::from(Arc::new(data)))
}

/// date_trunc SQL function
pub fn date_trunc(args: &[ArrayRef]) -> Result<TimestampNanosecondArray> {
let granularity_array =
&args[0]
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| {
DataFusionError::Execution(
"Could not cast date_trunc granularity input to StringArray"
.to_string(),
)
})?;

let array = &args[1]
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.ok_or_else(|| {
DataFusionError::Execution(
"Could not cast date_trunc array input to TimestampNanosecondArray"
.to_string(),
)
})?;

let range = 0..array.len();
let result = range
.map(|i| {
if array.is_null(i) {
Ok(0_i64)
} else {
let date_time = match granularity_array.value(i) {
"second" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0)),
"minute" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0)),
"hour" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0)),
"day" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0)),
"week" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.map(|d| {
d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)
}),
"month" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0)),
"year" => array
.value_as_datetime(i)
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.and_then(|d| d.with_day0(0))
.and_then(|d| d.with_month0(0)),
unsupported => {
return Err(DataFusionError::Execution(format!(
"Unsupported date_trunc granularity: {}",
unsupported
)))
}
};
date_time.map(|d| d.timestamp_nanos()).ok_or_else(|| {
DataFusionError::Execution(format!(
"Can't truncate date time: {:?}",
array.value_as_datetime(i)
))
})
}
})
.collect::<Result<Vec<_>>>()?;

let data = ArrayData::new(
DataType::Timestamp(TimeUnit::Nanosecond, None),
array.len(),
Some(array.null_count()),
array.data().null_buffer().cloned(),
0,
vec![Buffer::from(result.to_byte_slice())],
vec![],
);

Ok(TimestampNanosecondArray::from(Arc::new(data)))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -378,6 +481,64 @@ mod tests {
Ok(())
}

#[test]
fn date_trunc_test() -> Result<()> {
let mut ts_builder = StringBuilder::new(2);
let mut truncated_builder = StringBuilder::new(2);
let mut string_builder = StringBuilder::new(2);

ts_builder.append_null()?;
truncated_builder.append_null()?;
string_builder.append_value("second")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T13:42:29.000000Z")?;
string_builder.append_value("second")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T13:42:00.000000Z")?;
string_builder.append_value("minute")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T13:00:00.000000Z")?;
string_builder.append_value("hour")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-08T00:00:00.000000Z")?;
string_builder.append_value("day")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-07T00:00:00.000000Z")?;
string_builder.append_value("week")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-09-01T00:00:00.000000Z")?;
string_builder.append_value("month")?;

ts_builder.append_value("2020-09-08T13:42:29.190855Z")?;
truncated_builder.append_value("2020-01-01T00:00:00.000000Z")?;
string_builder.append_value("year")?;

ts_builder.append_value("2021-01-01T13:42:29.190855Z")?;
truncated_builder.append_value("2020-12-28T00:00:00.000000Z")?;
string_builder.append_value("week")?;

ts_builder.append_value("2020-01-01T13:42:29.190855Z")?;
truncated_builder.append_value("2019-12-30T00:00:00.000000Z")?;
string_builder.append_value("week")?;

let string_array = Arc::new(string_builder.finish());
let ts_array = Arc::new(to_timestamp(&[Arc::new(ts_builder.finish())]).unwrap());
let date_trunc_array = date_trunc(&[string_array, ts_array])
.expect("that to_timestamp parsed values without error");

let expected_timestamps =
to_timestamp(&[Arc::new(truncated_builder.finish())]).unwrap();

assert_eq!(date_trunc_array, expected_timestamps);
Ok(())
}

#[test]
fn to_timestamp_invalid_input_type() -> Result<()> {
// pass the wrong type of input array to to_timestamp and test
Expand Down
13 changes: 13 additions & 0 deletions rust/datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub enum BuiltinScalarFunction {
Array,
/// SQL NULLIF()
NullIf,
/// Date truncate
DateTrunc,
}

impl fmt::Display for BuiltinScalarFunction {
Expand Down Expand Up @@ -168,6 +170,7 @@ impl FromStr for BuiltinScalarFunction {
"trim" => BuiltinScalarFunction::Trim,
"upper" => BuiltinScalarFunction::Upper,
"to_timestamp" => BuiltinScalarFunction::ToTimestamp,
"date_trunc" => BuiltinScalarFunction::DateTrunc,
"array" => BuiltinScalarFunction::Array,
"nullif" => BuiltinScalarFunction::NullIf,
_ => {
Expand Down Expand Up @@ -247,6 +250,9 @@ pub fn return_type(
BuiltinScalarFunction::ToTimestamp => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::DateTrunc => {
Ok(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
BuiltinScalarFunction::Array => Ok(DataType::FixedSizeList(
Box::new(Field::new("item", arg_types[0].clone(), true)),
arg_types.len() as i32,
Expand Down Expand Up @@ -317,6 +323,9 @@ pub fn create_physical_expr(
BuiltinScalarFunction::ToTimestamp => {
|args| Ok(Arc::new(datetime_expressions::to_timestamp(args)?))
}
BuiltinScalarFunction::DateTrunc => {
|args| Ok(Arc::new(datetime_expressions::date_trunc(args)?))
}
BuiltinScalarFunction::Array => |args| Ok(array_expressions::array(args)?),
});
// coerce
Expand Down Expand Up @@ -355,6 +364,10 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature {
Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8])
}
BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, vec![DataType::Utf8]),
BuiltinScalarFunction::DateTrunc => Signature::Exact(vec![
DataType::Utf8,
DataType::Timestamp(TimeUnit::Nanosecond, None),
]),
BuiltinScalarFunction::Array => {
Signature::Variadic(array_expressions::SUPPORTED_ARRAY_TYPES.to_vec())
}
Expand Down
4 changes: 4 additions & 0 deletions rust/datafusion/src/physical_plan/group_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub(crate) enum GroupByScalar {
Int32(i32),
Int64(i64),
Utf8(Box<String>),
TimeMicrosecond(i64),
TimeNanosecond(i64),
}

impl TryFrom<&ScalarValue> for GroupByScalar {
Expand Down Expand Up @@ -87,6 +89,8 @@ impl From<&GroupByScalar> for ScalarValue {
GroupByScalar::UInt32(v) => ScalarValue::UInt32(Some(*v)),
GroupByScalar::UInt64(v) => ScalarValue::UInt64(Some(*v)),
GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.to_string())),
GroupByScalar::TimeMicrosecond(v) => ScalarValue::TimeMicrosecond(Some(*v)),
GroupByScalar::TimeNanosecond(v) => ScalarValue::TimeNanosecond(Some(*v)),
}
}
}
Expand Down
Loading

0 comments on commit 7dabc25

Please sign in to comment.