Skip to content

Commit

Permalink
fix(sql): FILL(LINEAR) now supports ALIGN TO CALENDAR (#4392)
Browse files Browse the repository at this point in the history
  • Loading branch information
nwoolmer committed Apr 19, 2024
1 parent c26ee23 commit 5cd0c4a
Show file tree
Hide file tree
Showing 6 changed files with 1,030 additions and 179 deletions.
6 changes: 5 additions & 1 deletion core/src/main/java/io/questdb/griffin/SqlCodeGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2896,7 +2896,11 @@ private RecordCursorFactory generateSampleBy(
valueTypes,
entityColumnFilter,
groupByFunctionPositions,
timestampIndex
timestampIndex,
timezoneNameFunc,
timezoneNameFuncPos,
offsetFunc,
offsetFuncPos
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@
import io.questdb.griffin.engine.functions.columns.TimestampColumn;
import io.questdb.griffin.model.QueryModel;
import io.questdb.std.*;
import io.questdb.std.datetime.TimeZoneRules;
import io.questdb.std.datetime.microtime.TimestampFormatUtils;
import io.questdb.std.datetime.microtime.Timestamps;
import org.jetbrains.annotations.NotNull;

import static io.questdb.std.datetime.TimeZoneRuleFactory.RESOLUTION_MICROS;
import static io.questdb.std.datetime.microtime.Timestamps.MINUTE_MICROS;

public class SampleByInterpolateRecordCursorFactory extends AbstractRecordCursorFactory {

protected final RecordCursorFactory base;
Expand Down Expand Up @@ -75,7 +81,11 @@ public SampleByInterpolateRecordCursorFactory(
@Transient @NotNull ArrayColumnTypes valueTypes,
@Transient @NotNull EntityColumnFilter entityColumnFilter,
@Transient @NotNull IntList groupByFunctionPositions,
int timestampIndex
int timestampIndex,
Function timezoneNameFunc,
int timezoneNameFuncPos,
Function offsetFunc,
int offsetFuncPos
) throws SqlException {
super(metadata);
final int columnCount = model.getBottomUpColumns().size();
Expand Down Expand Up @@ -146,7 +156,7 @@ public SampleByInterpolateRecordCursorFactory(
entityColumnFilter.of(keyTypes.getColumnCount());
this.mapSink2 = RecordSinkFactory.getInstance(asm, keyTypes, entityColumnFilter, false);

this.cursor = new SampleByInterpolateRecordCursor(configuration, recordFunctions, groupByFunctions, keyTypes, valueTypes);
this.cursor = new SampleByInterpolateRecordCursor(configuration, recordFunctions, groupByFunctions, keyTypes, valueTypes, sampler, timezoneNameFunc, timezoneNameFuncPos, offsetFunc, offsetFuncPos);
}

@Override
Expand Down Expand Up @@ -231,12 +241,28 @@ private class SampleByInterpolateRecordCursor extends VirtualFunctionSkewedSymbo
private long prevSample = -1;
private long rowId;

private final Function offsetFunc;
private final int offsetFuncPos;
private final TimestampSampler timestampSampler;
private final Function timezoneNameFunc;
private final int timezoneNameFuncPos;
private long fixedOffset;
private long nextDstUtc;
private TimeZoneRules rules;
private long tzOffset;


public SampleByInterpolateRecordCursor(
CairoConfiguration configuration,
ObjList<Function> functions,
ObjList<GroupByFunction> groupByFunctions,
@Transient @NotNull ArrayColumnTypes keyTypes,
@Transient @NotNull ArrayColumnTypes valueTypes
@Transient @NotNull ArrayColumnTypes valueTypes,
TimestampSampler timestampSampler,
Function timezoneNameFunc,
int timezoneNameFuncPos,
Function offsetFunc,
int offsetFuncPos
) {
super(functions);
// this is the map itself, which we must not forget to free when factory closes
Expand All @@ -247,6 +273,12 @@ public SampleByInterpolateRecordCursor(
allocator = GroupByAllocatorFactory.createThreadUnsafeAllocator(configuration);
GroupByUtils.setAllocator(groupByFunctions, allocator);
isOpen = true;

this.timestampSampler = timestampSampler;
this.timezoneNameFunc = timezoneNameFunc;
this.timezoneNameFuncPos = timezoneNameFuncPos;
this.offsetFunc = offsetFunc;
this.offsetFuncPos = offsetFuncPos;
}

@Override
Expand All @@ -259,6 +291,10 @@ public void close() {
Misc.clearObjList(groupByFunctions);
super.close();
}
timezoneNameFunc.clear();
offsetFunc.clear();
Misc.free(timezoneNameFunc);
Misc.free(offsetFunc);
}

@Override
Expand All @@ -270,7 +306,7 @@ public boolean hasNext() {
return super.hasNext();
}

public void of(RecordCursor managedCursor, SqlExecutionContext executionContext) {
public void of(RecordCursor managedCursor, SqlExecutionContext executionContext) throws SqlException {
if (!isOpen) {
isOpen = true;
recordKeyMap.reopen();
Expand All @@ -287,6 +323,8 @@ public void of(RecordCursor managedCursor, SqlExecutionContext executionContext)
isMapInitialized = false;
isMapFilled = false;
isMapBuilt = false;
parseParams(this, executionContext);
areTimestampsInitialized = false;
}

@Override
Expand Down Expand Up @@ -320,6 +358,11 @@ private void buildMap() {
isMapInitialized = true;
}

if (!areTimestampsInitialized) {
initTimestamps();
areTimestampsInitialized = true;
}

if (!isMapFilled) {
fillMap();
isMapFilled = true;
Expand Down Expand Up @@ -485,15 +528,7 @@ private void fillMap() {
// check group for gaps and fill them with placeholder
// entries. Values for these entries will be interpolated later.

if (prevSample == -1) {
// we have data in cursor, so we can grab first value
final boolean good = managedCursor.hasNext();
assert good;
long timestamp = managedRecord.getTimestamp(timestampIndex);
sampler.setStart(timestamp);
prevSample = sampler.round(timestamp);
loSample = prevSample; // the lowest timestamp value
}
assert prevSample != -1;

do {
circuitBreaker.statefulThrowExceptionIfTripped();
Expand Down Expand Up @@ -635,5 +670,73 @@ private void nullifyRange(long lo, long hi, Record record) {
}
}
}

protected void parseParams(RecordCursor base, SqlExecutionContext executionContext) throws SqlException {
// factory guarantees that base cursor is not empty
timezoneNameFunc.init(base, executionContext);
offsetFunc.init(base, executionContext);
rules = null;

final CharSequence tz = timezoneNameFunc.getStrA(null);
if (tz != null) {
try {
long opt = Timestamps.parseOffset(tz);
if (opt == Long.MIN_VALUE) {
// this is timezone name
// fixed rules means the timezone does not have historical or daylight time changes
rules = TimestampFormatUtils.EN_LOCALE.getZoneRules(
Numbers.decodeLowInt(TimestampFormatUtils.EN_LOCALE.matchZone(tz, 0, tz.length())),
RESOLUTION_MICROS
);
} else {
// here timezone is in numeric offset format
tzOffset = Numbers.decodeLowInt(opt) * MINUTE_MICROS;
nextDstUtc = Long.MAX_VALUE;
}
} catch (NumericException e) {
throw SqlException.$(timezoneNameFuncPos, "invalid timezone: ").put(tz);
}
} else {
tzOffset = 0;
nextDstUtc = Long.MAX_VALUE;
}

final CharSequence offset = offsetFunc.getStrA(null);
if (offset != null) {
final long val = Timestamps.parseOffset(offset);
if (val == Numbers.LONG_NaN) {
// bad value for offset
throw SqlException.$(offsetFuncPos, "invalid offset: ").put(offset);
}
fixedOffset = Numbers.decodeLowInt(val) * MINUTE_MICROS;
} else {
fixedOffset = Long.MIN_VALUE;
}
}

private boolean areTimestampsInitialized;

protected void initTimestamps() {
if (areTimestampsInitialized) {
return;
}

assert managedCursor.hasNext();

final long timestamp = managedRecord.getTimestamp(timestampIndex);
if (rules != null) {
tzOffset = rules.getOffset(timestamp);
nextDstUtc = rules.getNextDST(timestamp);
}

if (tzOffset == 0 && fixedOffset == Long.MIN_VALUE) {
// this is the default path, we align time intervals to the first observation
timestampSampler.setStart(timestamp);
} else {
timestampSampler.setStart(fixedOffset != Long.MIN_VALUE ? fixedOffset : 0L);
}
prevSample = sampler.round(timestamp);
loSample = prevSample; // the lowest timestamp value
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,16 @@ public void testAggregationBySymbolWithSampling() throws Exception {
"1970-01-01T01:30:00.000000Z\tAAA\t228.55327569899347\t2000.0\n" +
"1970-01-01T02:30:00.000000Z\tAAA\t85.73439427824682\t1500.0\n" +
"1970-01-01T03:30:00.000000Z\tAAA\t157.22760372823447\t1000.0\n" +
"1970-01-01T04:30:00.000000Z\tAAA\t0.0\t1000.0\n", "select k, s, haversine_dist_deg(lat, lon, k), sum(p) from tab sample by 1h fill(linear)"
"1970-01-01T04:30:00.000000Z\tAAA\t0.0\t1000.0\n", "select k, s, haversine_dist_deg(lat, lon, k), sum(p) from tab sample by 1h fill(linear) align to first observation"
);

assertSql(
"k\ts\thaversine_dist_deg\tsum\n" +
"1970-01-01T00:00:00.000000Z\tAAA\t78.50616567866791\t1000.0\n" +
"1970-01-01T01:00:00.000000Z\tAAA\t264.19224423853797\t2000.0\n" +
"1970-01-01T02:00:00.000000Z\tAAA\t85.73439427824682\t1500.0\n" +
"1970-01-01T03:00:00.000000Z\tAAA\t121.48099900324064\t1000.0\n" +
"1970-01-01T04:00:00.000000Z\tAAA\t78.61380186411724\t1000.0\n", "select k, s, haversine_dist_deg(lat, lon, k), sum(p) from tab sample by 1h fill(linear) align to calendar"
);
}

Expand Down Expand Up @@ -369,7 +378,47 @@ public void testAggregationWithSampleFill1() throws Exception {
"VTJW\t297.90493337981263\t1970-01-03T14:31:40.000000Z\n" +
"RXGZ\t141.93824030889962\t1970-01-03T15:31:40.000000Z\n" +
"VTJW\t297.90493337981263\t1970-01-03T15:31:40.000000Z\n",
"select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear)",
"select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear) align to first observation",
null,
"k",
true, true);

assertQuery("s\thaversine_dist_deg\tk\n" +
"VTJW\t140.48471753024785\t1970-01-03T00:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T00:00:00.000000Z\n" +
"VTJW\t297.7248158372856\t1970-01-03T01:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T01:00:00.000000Z\n" +
"VTJW\t297.911239737792\t1970-01-03T02:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T02:00:00.000000Z\n" +
"VTJW\t49.65838525039151\t1970-01-03T03:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T03:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T04:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T04:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T05:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T05:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T06:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T06:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T07:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T07:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T08:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T08:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T09:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T09:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T10:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T10:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T11:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T11:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T12:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T12:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T13:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T13:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T14:00:00.000000Z\n" +
"RXGZ\t268.93561321686246\t1970-01-03T14:00:00.000000Z\n" +
"RXGZ\t141.19868683690248\t1970-01-03T15:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T15:00:00.000000Z\n" +
"RXGZ\t0.0\t1970-01-03T16:00:00.000000Z\n" +
"VTJW\t297.950311502349\t1970-01-03T16:00:00.000000Z\n",
"select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear) align to calendar",
null,
"k",
true, true);
Expand Down Expand Up @@ -415,7 +464,15 @@ public void testAggregationWithSampleFill2_DataStartsOnTheClock() throws Excepti
"AAA\t942.1704436827788\t1970-01-01T01:00:00.000000Z\n" +
"AAA\t936.1854124136329\t1970-01-01T02:00:00.000000Z\n" +
"AAA\t155.09709548701773\t1970-01-01T03:00:00.000000Z\n"
, "select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear)", null, "k", true, true);
, "select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear) align to first observation", null, "k", true, true);

assertQuery("s\thaversine_dist_deg\tk\n" +
"AAA\t943.0307116486234\t1970-01-01T00:00:00.000000Z\n" +
"AAA\t942.1704436827788\t1970-01-01T01:00:00.000000Z\n" +
"AAA\t936.1854124136329\t1970-01-01T02:00:00.000000Z\n" +
"AAA\t155.09709548701773\t1970-01-01T03:00:00.000000Z\n"
, "select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear) align to calendar" , null, "k", true, true);

}

@Test
Expand Down Expand Up @@ -460,7 +517,16 @@ public void testAggregationWithSampleFill3() throws Exception {
"AAA\t1131.6942599455483\t1970-01-01T00:00:01.000000Z\n" +
"AAA\t1128.9553037924868\t1970-01-01T01:00:01.000000Z\n" +
"AAA\t715.8340994940178\t1970-01-01T02:00:01.000000Z\n",
"select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear)",
"select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear) align to first observation",
null,
"k",
true, true);

assertQuery("s\thaversine_dist_deg\tk\n" +
"AAA\t1131.3799004998614\t1970-01-01T00:00:00.000000Z\n" +
"AAA\t1128.9573035397307\t1970-01-01T01:00:00.000000Z\n" +
"AAA\t716.1464591924607\t1970-01-01T02:00:00.000000Z\n",
"select s, haversine_dist_deg(lat, lon, k), k from tab sample by 1h fill(linear) align to calendar",
null,
"k",
true, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public void testSampleInterpolateRandomAccessConsistency() throws Exception {
"VTJW\t0.7732229848518976\t1970-01-03T09:18:00.000000Z\n",
"select b, max(a), k from " +
" (x where b = 'PEHN' union all x where b = 'VTJW' ) timestamp(k)" +
"sample by 3h fill(linear) order by 3, 2, 1",
"sample by 3h fill(linear) align to first observation order by 3, 2, 1",
"create table x as " +
"(" +
"select" +
Expand All @@ -303,5 +303,23 @@ public void testSampleInterpolateRandomAccessConsistency() throws Exception {
true,
true
);

assertQuery(
"b\tmax\tk\n" +
"PEHN\t0.8445258177211064\t1970-01-03T00:00:00.000000Z\n" +
"VTJW\t0.9125204540487346\t1970-01-03T00:00:00.000000Z\n" +
"PEHN\t0.7365115215570027\t1970-01-03T03:00:00.000000Z\n" +
"VTJW\t0.8660879643164553\t1970-01-03T03:00:00.000000Z\n" +
"PEHN\t0.4346135812930124\t1970-01-03T06:00:00.000000Z\n" +
"VTJW\t0.8196554745841765\t1970-01-03T06:00:00.000000Z\n" +
"PEHN\t0.13271564102902209\t1970-01-03T09:00:00.000000Z\n" +
"VTJW\t0.7732229848518976\t1970-01-03T09:00:00.000000Z\n",
"select b, max(a), k from " +
" (x where b = 'PEHN' union all x where b = 'VTJW' ) timestamp(k)" +
"sample by 3h fill(linear) align to calendar order by 3, 2, 1",
"k",
true,
true
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ public void testSampleByWithFill() throws SqlException, NumericException {
assertSql(
"ts\tmin\tmax\tfirst\tlast\tcount\n" +
"2020-01-01T00:28:47.990000Z\t0.0010\t0.0010\t0.0010\t0.0010\t1\n" +
"2020-01-01T00:29:47.990000Z\t0.0010\t0.0010\t0.0010\t0.0010\t1\n", "select ts, min(ch), max(ch), first(ch), last(ch), count() from tab sample by 1m FILL(LINEAR) LIMIT 2"
"2020-01-01T00:29:47.990000Z\t0.0010\t0.0010\t0.0010\t0.0010\t1\n", "select ts, min(ch), max(ch), first(ch), last(ch), count() from tab sample by 1m FILL(LINEAR) align to first observation LIMIT 2"
);


assertSql(
"ts\tmin\tmax\tfirst\tlast\tcount\n" +
"2020-01-01T00:28:00.000000Z\t0.0010\t0.0010\t0.0010\t0.0010\t1\n" +
"2020-01-01T00:29:00.000000Z\t0.0010\t0.0010\t0.0010\t0.0010\t1\n", "select ts, min(ch), max(ch), first(ch), last(ch), count() from tab sample by 1m FILL(LINEAR) align to calendar LIMIT 2"
);
}
}
Loading

0 comments on commit 5cd0c4a

Please sign in to comment.