Skip to content

Commit

Permalink
Merge branch 'main' into xxchan/exuberant-cod
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Oct 17, 2022
2 parents db0563d + 7072ee1 commit 98cc624
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 41 deletions.
120 changes: 120 additions & 0 deletions e2e_test/batch/basic/time_window_utc.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (row_id int, uid int, v int, created_at timestamp with time zone);

statement ok
insert into t1 values
(1, 1, 4, '2022-01-01 10:00:00Z'),
(2, 3, 3, '2022-01-01 10:05:00Z'),
(3, 2, 2, '2022-01-01 10:14:00Z'),
(4, 1, 1, '2022-01-01 10:22:00Z'),
(5, 3, 5, '2022-01-01 10:33:00Z'),
(6, 2, 7, '2022-01-01 10:42:00Z'),
(7, 1, 6, '2022-01-01 10:51:00Z'),
(8, 3, 8, '2022-01-01 11:02:00Z');

query IITTT
select row_id, uid, created_at, window_start, window_end
from tumble(t1, created_at, interval '30' minute) order by row_id, window_start;
----
1 1 2022-01-01 10:00:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
2 3 2022-01-01 10:05:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
3 2 2022-01-01 10:14:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
4 1 2022-01-01 10:22:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
5 3 2022-01-01 10:33:00+00:00 2022-01-01 10:30:00+00:00 2022-01-01 11:00:00+00:00
6 2 2022-01-01 10:42:00+00:00 2022-01-01 10:30:00+00:00 2022-01-01 11:00:00+00:00
7 1 2022-01-01 10:51:00+00:00 2022-01-01 10:30:00+00:00 2022-01-01 11:00:00+00:00
8 3 2022-01-01 11:02:00+00:00 2022-01-01 11:00:00+00:00 2022-01-01 11:30:00+00:00

query IITTT
select row_id, uid, created_at, window_start, window_end
from hop(t1, created_at, interval '15' minute, interval '30' minute) order by row_id, window_start;
----
1 1 2022-01-01 10:00:00+00:00 2022-01-01 09:45:00+00:00 2022-01-01 10:15:00+00:00
1 1 2022-01-01 10:00:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
2 3 2022-01-01 10:05:00+00:00 2022-01-01 09:45:00+00:00 2022-01-01 10:15:00+00:00
2 3 2022-01-01 10:05:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
3 2 2022-01-01 10:14:00+00:00 2022-01-01 09:45:00+00:00 2022-01-01 10:15:00+00:00
3 2 2022-01-01 10:14:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
4 1 2022-01-01 10:22:00+00:00 2022-01-01 10:00:00+00:00 2022-01-01 10:30:00+00:00
4 1 2022-01-01 10:22:00+00:00 2022-01-01 10:15:00+00:00 2022-01-01 10:45:00+00:00
5 3 2022-01-01 10:33:00+00:00 2022-01-01 10:15:00+00:00 2022-01-01 10:45:00+00:00
5 3 2022-01-01 10:33:00+00:00 2022-01-01 10:30:00+00:00 2022-01-01 11:00:00+00:00
6 2 2022-01-01 10:42:00+00:00 2022-01-01 10:15:00+00:00 2022-01-01 10:45:00+00:00
6 2 2022-01-01 10:42:00+00:00 2022-01-01 10:30:00+00:00 2022-01-01 11:00:00+00:00
7 1 2022-01-01 10:51:00+00:00 2022-01-01 10:30:00+00:00 2022-01-01 11:00:00+00:00
7 1 2022-01-01 10:51:00+00:00 2022-01-01 10:45:00+00:00 2022-01-01 11:15:00+00:00
8 3 2022-01-01 11:02:00+00:00 2022-01-01 10:45:00+00:00 2022-01-01 11:15:00+00:00
8 3 2022-01-01 11:02:00+00:00 2022-01-01 11:00:00+00:00 2022-01-01 11:30:00+00:00

query IT
select sum(v), window_start
from tumble(t1, created_at, interval '30' minute)
group by window_start order by window_start;
----
10 2022-01-01 10:00:00+00:00
18 2022-01-01 10:30:00+00:00
8 2022-01-01 11:00:00+00:00

query IIT
select uid, sum(v), window_start
from tumble(t1, created_at, interval '30' minute)
group by window_start, uid order by window_start, uid;
----
1 5 2022-01-01 10:00:00+00:00
2 2 2022-01-01 10:00:00+00:00
3 3 2022-01-01 10:00:00+00:00
1 6 2022-01-01 10:30:00+00:00
2 7 2022-01-01 10:30:00+00:00
3 5 2022-01-01 10:30:00+00:00
3 8 2022-01-01 11:00:00+00:00

query IT
select sum(v), window_start
from hop(t1, created_at, interval '15' minute, interval '30' minute)
group by window_start order by window_start;
----
9 2022-01-01 09:45:00+00:00
10 2022-01-01 10:00:00+00:00
13 2022-01-01 10:15:00+00:00
18 2022-01-01 10:30:00+00:00
14 2022-01-01 10:45:00+00:00
8 2022-01-01 11:00:00+00:00

query IIT
select uid, sum(v), window_start
from hop(t1, created_at, interval '15' minute, interval '30' minute)
group by window_start, uid order by window_start, uid;
----
1 4 2022-01-01 09:45:00+00:00
2 2 2022-01-01 09:45:00+00:00
3 3 2022-01-01 09:45:00+00:00
1 5 2022-01-01 10:00:00+00:00
2 2 2022-01-01 10:00:00+00:00
3 3 2022-01-01 10:00:00+00:00
1 1 2022-01-01 10:15:00+00:00
2 7 2022-01-01 10:15:00+00:00
3 5 2022-01-01 10:15:00+00:00
1 6 2022-01-01 10:30:00+00:00
2 7 2022-01-01 10:30:00+00:00
3 5 2022-01-01 10:30:00+00:00
1 6 2022-01-01 10:45:00+00:00
3 8 2022-01-01 10:45:00+00:00
3 8 2022-01-01 11:00:00+00:00

statement error
select * from hop(t1, created_at, interval '0', interval '1');

statement error
select * from hop(t1, created_at, interval '1', interval '-1');

statement error
select * from hop(t1, created_at, interval '-1', interval '1');

statement error
select * from hop(t1, created_at, interval '-1', interval '-1');

statement ok
drop table t1;
9 changes: 9 additions & 0 deletions e2e_test/batch/types/timestamptz_utc.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,12 @@ select v1, v2 from t where v2 BETWEEN '2022-10-01T11:00:05Z' AND '2022-10-01T11:

statement ok
drop table t;

# Arithmetic are currently done in UTC rather than session TimeZone

# If this was done in PostgreSQL under 'US/Pacific', it will return 1 hour
# earlier because there are only 23 hours in this day due to Daylight Saving.
query T
select '2022-03-13 09:00:00Z'::timestamptz + interval '1' day - interval '24' hour;
----
2022-03-13 09:00:00+00:00
19 changes: 11 additions & 8 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ impl BoxedExecutorBuilder for HopWindowExecutor {
.map(|x| x as usize)
.collect_vec();

let time_col_data_type = child.schema().fields()[time_col].data_type();
let output_type = DataType::window_of(&time_col_data_type).unwrap();
let original_schema: Schema = child
.schema()
.clone()
.into_fields()
.into_iter()
.chain([
Field::with_name(DataType::Timestamp, "window_start"),
Field::with_name(DataType::Timestamp, "window_end"),
Field::with_name(output_type.clone(), "window_start"),
Field::with_name(output_type, "window_end"),
])
.collect();
let output_indices_schema: Schema = output_indices
Expand Down Expand Up @@ -152,6 +154,7 @@ impl HopWindowExecutor {
.get();

let time_col_data_type = child.schema().fields()[time_col_idx].data_type();
let output_type = DataType::window_of(&time_col_data_type).unwrap();
let time_col_ref = InputRefExpression::new(time_col_data_type, self.time_col_idx).boxed();

let window_slide_expr =
Expand All @@ -174,10 +177,10 @@ impl HopWindowExecutor {
.boxed();
let hop_expr = new_binary_expr(
expr_node::Type::TumbleStart,
risingwave_common::types::DataType::Timestamp,
output_type.clone(),
new_binary_expr(
expr_node::Type::Subtract,
DataType::Timestamp,
output_type.clone(),
time_col_ref,
window_size_sub_slide_expr,
)?,
Expand Down Expand Up @@ -212,15 +215,15 @@ impl HopWindowExecutor {
.boxed();
let window_start_expr = new_binary_expr(
expr_node::Type::Add,
DataType::Timestamp,
InputRefExpression::new(DataType::Timestamp, 0).boxed(),
output_type.clone(),
InputRefExpression::new(output_type.clone(), 0).boxed(),
window_start_offset_expr,
)?;
window_start_exprs.push(window_start_expr);
let window_end_expr = new_binary_expr(
expr_node::Type::Add,
DataType::Timestamp,
InputRefExpression::new(DataType::Timestamp, 0).boxed(),
output_type.clone(),
InputRefExpression::new(output_type.clone(), 0).boxed(),
window_end_offset_expr,
)?;
window_end_exprs.push(window_end_expr);
Expand Down
9 changes: 9 additions & 0 deletions src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,15 @@ impl DataType {
matches!(self, DataType::Int16 | DataType::Int32 | DataType::Int64)
}

/// Returns the output type of window function on a given input type.
pub fn window_of(input: &DataType) -> Option<DataType> {
match input {
DataType::Timestampz => Some(DataType::Timestampz),
DataType::Timestamp | DataType::Date => Some(DataType::Timestamp),
_ => None,
}
}

/// Checks if memcomparable encoding of datatype is equivalent to its value encoding.
pub fn mem_cmp_eq_value_enc(&self) -> bool {
use DataType::*;
Expand Down
17 changes: 15 additions & 2 deletions src/expr/src/expr/expr_binary_nonnull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use risingwave_common::array::{
Array, BoolArray, DecimalArray, I32Array, IntervalArray, ListArray, NaiveDateArray,
Array, BoolArray, DecimalArray, I32Array, I64Array, IntervalArray, ListArray, NaiveDateArray,
NaiveDateTimeArray, StructArray, Utf8Array,
};
use risingwave_common::types::*;
Expand All @@ -29,7 +29,9 @@ use crate::vector_op::extract::{extract_from_date, extract_from_timestamp};
use crate::vector_op::like::like_default;
use crate::vector_op::position::position;
use crate::vector_op::round::round_digits;
use crate::vector_op::tumble::{tumble_start_date, tumble_start_date_time};
use crate::vector_op::tumble::{
tumble_start_date, tumble_start_date_time, tumble_start_timestampz,
};
use crate::{for_all_cmp_variants, ExprError, Result};

/// This macro helps create arithmetic expression.
Expand Down Expand Up @@ -376,6 +378,8 @@ pub fn new_binary_expr(
l, r, ret,
general_add,
{
{ timestampz, interval, timestampz, timestampz_interval_add },
{ interval, timestampz, timestampz, interval_timestampz_add },
{ timestamp, interval, timestamp, timestamp_interval_add },
{ interval, timestamp, timestamp, interval_timestamp_add },
{ interval, date, timestamp, interval_date_add },
Expand All @@ -396,6 +400,7 @@ pub fn new_binary_expr(
l, r, ret,
general_sub,
{
{ timestampz, interval, timestampz, timestampz_interval_sub },
{ timestamp, timestamp, interval, timestamp_timestamp_sub },
{ timestamp, interval, timestamp, timestamp_interval_sub },
{ date, date, int32, date_date_sub },
Expand Down Expand Up @@ -550,6 +555,14 @@ fn new_tumble_start(
>::new(
expr_ia1, expr_ia2, return_type, tumble_start_date_time
)),
DataType::Timestampz => Box::new(
BinaryExpression::<I64Array, IntervalArray, I64Array, _>::new(
expr_ia1,
expr_ia2,
return_type,
tumble_start_timestampz,
),
),
_ => {
return Err(ExprError::UnsupportedFunction(format!(
"tumble_start is not supported for {:?}",
Expand Down
27 changes: 27 additions & 0 deletions src/expr/src/vector_op/arithmetic_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,33 @@ pub fn timestamp_interval_sub<T1, T2, T3>(
interval_timestamp_add::<T1, T2, T3>(r.negative(), l)
}

#[inline(always)]
pub fn timestampz_interval_add<T1, T2, T3>(l: i64, r: IntervalUnit) -> Result<i64> {
interval_timestampz_add::<T1, T2, T3>(r, l)
}

#[inline(always)]
pub fn timestampz_interval_sub<T1, T2, T3>(l: i64, r: IntervalUnit) -> Result<i64> {
interval_timestampz_add::<T1, T2, T3>(r.negative(), l)
}

#[inline(always)]
pub fn interval_timestampz_add<T1, T2, T3>(l: IntervalUnit, r: i64) -> Result<i64> {
// Without session TimeZone, we cannot add month/day in local time. See #5826.
// However, we only reject months but accept days, assuming them are always 24-hour and ignoring
// Daylight Saving.
// This is to keep consistent with `tumble_start` of RisingWave / `date_bin` of PostgreSQL.
if l.get_months() != 0 {
return Err(ExprError::UnsupportedFunction(
"timestamp with time zone +/- interval of months".into(),
));
}
let delta_usecs = l.get_days() as i64 * 24 * 60 * 60 * 1_000_000 + l.get_ms() * 1000;

r.checked_add(delta_usecs)
.ok_or(ExprError::NumericOutOfRange)
}

#[inline(always)]
pub fn interval_int_mul<T1, T2, T3>(l: IntervalUnit, r: T2) -> Result<IntervalUnit>
where
Expand Down
39 changes: 30 additions & 9 deletions src/expr/src/vector_op/tumble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,48 @@ pub fn tumble_start_date(
tumble_start_date_time(time.into(), window)
}

// FIXME: This implementation is very crude and likely wrong. fix it later.
#[inline(always)]
pub fn tumble_start_date_time(
time: NaiveDateTimeWrapper,
window: IntervalUnit,
) -> Result<NaiveDateTimeWrapper> {
let diff = time.0.timestamp();
let diff = time.0.timestamp_micros();
let window_start = tm_diff_bin(diff, window)?;
Ok(NaiveDateTimeWrapper(NaiveDateTime::from_timestamp(
window_start / 1_000_000,
(window_start % 1_000_000 * 1000) as u32,
)))
}

#[inline(always)]
pub fn tumble_start_timestampz(time: i64, window: IntervalUnit) -> Result<i64> {
// Actually directly calls into the helper `tm_diff_bin`. But we keep the shared utility and
// enduser function separate.
let diff = time;
let window_start = tm_diff_bin(diff, window)?;
Ok(window_start)
}

/// The common part of PostgreSQL function `timestamp_bin` and `timestamptz_bin`.
#[inline(always)]
fn tm_diff_bin(diff_usecs: i64, window: IntervalUnit) -> Result<i64> {
if window.get_months() != 0 {
return Err(ExprError::InvalidParam {
name: "window",
reason: "unimplemented: tumble_start only support days or milliseconds".to_string(),
});
}
let window = window.get_days() as i64 * 24 * 60 * 60 + window.get_ms() / 1000;
let offset = diff / window;
let window_start = window * offset;
let window_usecs = window.get_days() as i64 * 24 * 60 * 60 * 1_000_000 + window.get_ms() * 1000;

Ok(NaiveDateTimeWrapper(NaiveDateTime::from_timestamp(
window_start,
0,
)))
if window_usecs <= 0 {
return Err(ExprError::InvalidParam {
name: "window",
reason: "window must be positive".to_string(),
});
}

let delta_usecs = diff_usecs - diff_usecs % window_usecs;
Ok(delta_usecs)
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/time_window.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
create materialized view t as select * from s;
select * from tumble(t, (country).created_at, interval '3' day);
binder_error: 'Bind error: the 2st arg of window table function should be a timestamp
or date column'
with time zone, timestamp or date column'
create_source:
row_format: protobuf
name: s
Expand All @@ -41,7 +41,7 @@
create table t1 (id int, created_at varchar);
select * from hop(t1, created_at, interval '2' day, interval '4' day);
binder_error: 'Bind error: the 2st arg of window table function should be a timestamp
or date column'
with time zone, timestamp or date column'
- sql: |
create table t1 (id int, created_at date);
select * from hop(t1, created_at, interval '2' day, interval '3' day);
Expand Down

0 comments on commit 98cc624

Please sign in to comment.