Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding asof joins for timevector #635

Merged
merged 1 commit into from Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions Changelog.md
Expand Up @@ -7,8 +7,7 @@ This changelog should be updated as part of a PR if the work is worth noting (mo
## Next Release (Date TBD)

#### New experimental features
- [#615](https://github.com/timescale/timescaledb-toolkit/pull/615): Heartbeat aggregate

- [#615](https://github.com/timescale/timescaledb-toolkit/pull/615): Heatbeat aggregate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe we caught the same thing! Also "heatbeat" though, and also there are some rendering problems without blank lines. I sent a pull request for these.

Users can use the new `heartbeat_agg(timestamp, start_time, agg_interval, heartbeat_interval)` to track the liveness of a system in the range (`start_time`, `start_time` + `agg_interval`). Each timestamp seen in that range is assumed to indicate system liveness for the following `heartbeat_interval`.

Once constructed, users can query heartbeat aggregates for `uptime` and `downtime`, as well as query for `live_ranges` or `dead_ranges`. Users can also check for `live_at(timestamp)`.
Expand All @@ -23,6 +22,10 @@ This changelog should be updated as part of a PR if the work is worth noting (mo

[Examples](docs/examples/)

- [#635](https://github.com/timescale/timescaledb-toolkit/pull/635): AsOf joins for timevectors

This allows users to join two timevectors with the following semantics `timevectorA -> asof(timevectorB)`. This will return records with the LOCF value from timevectorA at the timestamps from timevectorB. Specifically the returned records contain, for each value in timevectorB, {the LOCF value from timevectorA, the value from timevectorB, the timestamp from timevectorB}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without a blank line between lines 15 and 16 here, Github squishes it all into one paragraph. The blank does not disrupt the list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this and merged your fix for the above. Will merge once I verify everything looks good.


#### Bug fixes

#### Other notable changes
Expand Down
173 changes: 172 additions & 1 deletion extension/src/time_vector.rs
Expand Up @@ -4,7 +4,7 @@ use pgx::{iter::TableIterator, *};

use crate::{
aggregate_utils::in_aggregate_context,
build,
build, flatten,
palloc::{Inner, Internal, InternalAsValue, ToInternal},
pg_type, ron_inout_funcs,
};
Expand Down Expand Up @@ -368,6 +368,95 @@ CREATE AGGREGATE rollup(\n\
],
);

#[pg_schema]
pub mod toolkit_experimental {
use super::*;

// Only making this available through the arrow operator right now, as the semantics are cleaner that way
pub fn asof_join<'a, 'b>(
from: Timevector_TSTZ_F64<'a>,
into: Timevector_TSTZ_F64<'b>,
) -> TableIterator<
'a,
(
name!(value1, Option<f64>),
name!(value2, f64),
name!(time, crate::raw::TimestampTz),
),
> {
assert!(
from.num_points > 0 && into.num_points > 0,
"both timevectors must be populated for an asof join"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test these two errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added...it's actually surprisingly difficult to create an empty timevector.

);
let mut from = from
.into_iter()
.map(|points| (points.ts.into(), points.val))
.peekable();
let into = into.into_iter().map(|points| (points.ts, points.val));
let (mut from_time, mut from_val) = from.next().unwrap();
Copy link
Member

@syvb syvb Nov 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default error message from .unwrap() isn't particularly user friendly:

Suggested change
let (mut from_time, mut from_val) = from.next().unwrap();
let (mut from_time, mut from_val) = from.next().expect("Must have at least one point in joining column");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, however it's also going to have potentially ugly behavior if the into vector is empty, so I added an assert that neither is empty to the start of this function.


let mut results = vec![];
for (into_time, into_val) in into {
// Handle case where into starts before from
if into_time < from_time {
results.push((None, into_val, crate::raw::TimestampTz::from(into_time)));
continue;
}

while let Some((peek_time, _)) = from.peek() {
if *peek_time > into_time {
break;
}
(from_time, from_val) = from.next().unwrap();
}

results.push((
Some(from_val),
into_val,
crate::raw::TimestampTz::from(into_time),
));
}

TableIterator::new(results.into_iter())
}

pg_type! {
#[derive(Debug)]
struct AccessorAsof<'input> {
into: Timevector_TSTZ_F64Data<'input>,
}
}

ron_inout_funcs!(AccessorAsof);

#[pg_extern(immutable, parallel_safe, name = "asof")]
pub fn accessor_asof<'a>(tv: Timevector_TSTZ_F64<'a>) -> AccessorAsof<'static> {
unsafe {
flatten! {
AccessorAsof {
into: tv.0
}
}
}
}
}

#[pg_operator(immutable, parallel_safe)]
#[opname(->)]
pub fn arrow_timevector_asof<'a>(
series: Timevector_TSTZ_F64<'a>,
accessor: toolkit_experimental::AccessorAsof<'a>,
) -> TableIterator<
'a,
(
name!(value1, Option<f64>),
name!(value2, f64),
name!(time, crate::raw::TimestampTz),
),
> {
toolkit_experimental::asof_join(series, accessor.into.clone().into())
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand Down Expand Up @@ -644,4 +733,86 @@ mod tests {
assert_eq!(tvec, expected);
})
}

#[pg_test]
fn test_asof_join() {
Spi::execute(|client| {
client.select("SET timezone TO 'UTC'", None, None);

let mut result = client.select(
"WITH s as (
SELECT timevector(time, value) AS v1 FROM
(VALUES
('2022-10-1 1:00 UTC'::TIMESTAMPTZ, 20.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 30.0),
('2022-10-1 3:00 UTC'::TIMESTAMPTZ, 40.0)
) as v(time, value)),
t as (
SELECT timevector(time, value) AS v2 FROM
(VALUES
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0),
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0)
) as v(time, value))
SELECT (v1 -> toolkit_experimental.asof(v2))::TEXT
FROM s, t;",
None,
None,
);

assert_eq!(
result.next().unwrap()[1].value(),
Some("(,15,\"2022-10-01 00:30:00+00\")")
);
assert_eq!(
result.next().unwrap()[1].value(),
Some("(30,45,\"2022-10-01 02:00:00+00\")")
);
assert_eq!(
result.next().unwrap()[1].value(),
Some("(40,60,\"2022-10-01 03:30:00+00\")")
);
assert!(result.next().is_none());
})
}

#[pg_test(error = "both timevectors must be populated for an asof join")]
fn test_asof_none() {
Spi::execute(|client| {
client.select("SET timezone TO 'UTC'", None, None);

client.select(
"WITH s as (
SELECT timevector(now(), 0) -> toolkit_experimental.filter($$ $value != 0 $$) AS empty),
t as (
SELECT timevector(time, value) AS valid FROM
(VALUES
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0),
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0)
) as v(time, value))
SELECT (valid -> toolkit_experimental.asof(empty))
FROM s, t;", None, None);
})
}

#[pg_test(error = "both timevectors must be populated for an asof join")]
fn test_none_asof() {
Spi::execute(|client| {
client.select("SET timezone TO 'UTC'", None, None);

client.select(
"WITH s as (
SELECT timevector(now(), 0) -> toolkit_experimental.filter($$ $value != 0 $$) AS empty),
t as (
SELECT timevector(time, value) AS valid FROM
(VALUES
('2022-10-1 0:30 UTC'::TIMESTAMPTZ, 15.0),
('2022-10-1 2:00 UTC'::TIMESTAMPTZ, 45.0),
('2022-10-1 3:30 UTC'::TIMESTAMPTZ, 60.0)
) as v(time, value))
SELECT (empty -> toolkit_experimental.asof(valid))
FROM s, t;", None, None);
})
}
}