From aa716f980251bfef0f7815636b6ea5d186dc614b Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 11 Aug 2025 17:29:12 +0800 Subject: [PATCH 1/4] Fix span on negative timestamp Signed-off-by: Heng Qian --- .../sql/planner/physical/collector/Rounding.java | 11 ++++++++--- .../sql/calcite/remote/CalcitePPLAggregationIT.java | 9 ++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java index 37550c4215c..e510cdcf621 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java @@ -73,7 +73,7 @@ public TimestampRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.round(var.timestampValue().toEpochMilli(), interval.integerValue())); + dateTimeUnit.wideRound(var.timestampValue().toEpochMilli(), interval.integerValue())); return new ExprTimestampValue(instant); } } @@ -91,7 +91,7 @@ public DateRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.round( + dateTimeUnit.wideRound( var.dateValue().atStartOfDay().atZone(ZoneOffset.UTC).toInstant().toEpochMilli(), interval.integerValue())); return new ExprDateValue(instant.atZone(ZoneOffset.UTC).toLocalDate()); @@ -116,7 +116,7 @@ public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.round( + dateTimeUnit.wideRound( var.timeValue().getLong(ChronoField.MILLI_OF_DAY), interval.integerValue())); return new ExprTimeValue(instant.atZone(ZoneOffset.UTC).toLocalTime()); } @@ -231,6 +231,11 @@ public long round(long utcMillis, int interval) { public abstract long round(long utcMillis, int interval); + public long wideRound(long utcMillis, int interval) { + long res = round(utcMillis, interval); + return (utcMillis < 0 && res != utcMillis) ? res - ratio * interval : res; + } + /** Resolve the date time unit. */ public static Rounding.DateTimeUnit resolve(String name) { switch (name) { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java index efe9c10c7dc..13b6f7c621c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java @@ -30,6 +30,7 @@ public class CalcitePPLAggregationIT extends PPLIntegTestCase { @Override public void init() throws Exception { + GlobalPushdownConfig.enabled = false; super.init(); enableCalcite(); @@ -502,13 +503,7 @@ public void testCountBySpanForCustomFormats() throws IOException { actual, schema("timestamp_span", "timestamp"), schema("count(custom_no_delimiter_ts)", "bigint")); - // TODO: Span has different behavior between pushdown and non-pushdown for timestamp before - // 1971. V2 engine will have the same issue. - // https://github.com/opensearch-project/sql/issues/3827 - verifyDataRows( - actual, - rows(1, isPushdownEnabled() ? "1961-04-12 09:00:00" : "1961-04-12 10:00:00"), - rows(1, "1984-10-20 15:00:00")); + verifyDataRows(actual, rows(1, "1961-04-12 09:00:00"), rows(1, "1984-10-20 15:00:00")); actual = executeQuery( From a24ba2ebdd0e9579713d27935556a648f9540116 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 11 Aug 2025 17:29:12 +0800 Subject: [PATCH 2/4] Fix span on negative timestamp Signed-off-by: Heng Qian --- .../sql/planner/physical/collector/Rounding.java | 11 ++++++++--- .../sql/calcite/remote/CalcitePPLAggregationIT.java | 8 +------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java index 37550c4215c..e510cdcf621 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java @@ -73,7 +73,7 @@ public TimestampRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.round(var.timestampValue().toEpochMilli(), interval.integerValue())); + dateTimeUnit.wideRound(var.timestampValue().toEpochMilli(), interval.integerValue())); return new ExprTimestampValue(instant); } } @@ -91,7 +91,7 @@ public DateRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.round( + dateTimeUnit.wideRound( var.dateValue().atStartOfDay().atZone(ZoneOffset.UTC).toInstant().toEpochMilli(), interval.integerValue())); return new ExprDateValue(instant.atZone(ZoneOffset.UTC).toLocalDate()); @@ -116,7 +116,7 @@ public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.round( + dateTimeUnit.wideRound( var.timeValue().getLong(ChronoField.MILLI_OF_DAY), interval.integerValue())); return new ExprTimeValue(instant.atZone(ZoneOffset.UTC).toLocalTime()); } @@ -231,6 +231,11 @@ public long round(long utcMillis, int interval) { public abstract long round(long utcMillis, int interval); + public long wideRound(long utcMillis, int interval) { + long res = round(utcMillis, interval); + return (utcMillis < 0 && res != utcMillis) ? res - ratio * interval : res; + } + /** Resolve the date time unit. */ public static Rounding.DateTimeUnit resolve(String name) { switch (name) { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java index efe9c10c7dc..eb725c4eea4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java @@ -502,13 +502,7 @@ public void testCountBySpanForCustomFormats() throws IOException { actual, schema("timestamp_span", "timestamp"), schema("count(custom_no_delimiter_ts)", "bigint")); - // TODO: Span has different behavior between pushdown and non-pushdown for timestamp before - // 1971. V2 engine will have the same issue. - // https://github.com/opensearch-project/sql/issues/3827 - verifyDataRows( - actual, - rows(1, isPushdownEnabled() ? "1961-04-12 09:00:00" : "1961-04-12 10:00:00"), - rows(1, "1984-10-20 15:00:00")); + verifyDataRows(actual, rows(1, "1961-04-12 09:00:00"), rows(1, "1984-10-20 15:00:00")); actual = executeQuery( From 5bba43a792436572555c339d58ff4548a0ede9ab Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Mon, 11 Aug 2025 20:07:01 +0800 Subject: [PATCH 3/4] typo Signed-off-by: Heng Qian --- .../opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java index 13b6f7c621c..eb725c4eea4 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java @@ -30,7 +30,6 @@ public class CalcitePPLAggregationIT extends PPLIntegTestCase { @Override public void init() throws Exception { - GlobalPushdownConfig.enabled = false; super.init(); enableCalcite(); From e9782494f410e54a88c71608eaf380738c968792 Mon Sep 17 00:00:00 2001 From: Heng Qian Date: Tue, 12 Aug 2025 11:58:29 +0800 Subject: [PATCH 4/4] Refine code Signed-off-by: Heng Qian --- .../planner/physical/collector/Rounding.java | 11 +-- .../opensearch/sql/utils/DateTimeUtils.java | 15 +++- .../sql/utils/DateTimeUtilsTest.java | 73 +++++++++++++++++++ 3 files changed, 87 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java index e510cdcf621..37550c4215c 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/collector/Rounding.java @@ -73,7 +73,7 @@ public TimestampRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.wideRound(var.timestampValue().toEpochMilli(), interval.integerValue())); + dateTimeUnit.round(var.timestampValue().toEpochMilli(), interval.integerValue())); return new ExprTimestampValue(instant); } } @@ -91,7 +91,7 @@ public DateRounding(ExprValue interval, String unit) { public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.wideRound( + dateTimeUnit.round( var.dateValue().atStartOfDay().atZone(ZoneOffset.UTC).toInstant().toEpochMilli(), interval.integerValue())); return new ExprDateValue(instant.atZone(ZoneOffset.UTC).toLocalDate()); @@ -116,7 +116,7 @@ public ExprValue round(ExprValue var) { Instant instant = Instant.ofEpochMilli( - dateTimeUnit.wideRound( + dateTimeUnit.round( var.timeValue().getLong(ChronoField.MILLI_OF_DAY), interval.integerValue())); return new ExprTimeValue(instant.atZone(ZoneOffset.UTC).toLocalTime()); } @@ -231,11 +231,6 @@ public long round(long utcMillis, int interval) { public abstract long round(long utcMillis, int interval); - public long wideRound(long utcMillis, int interval) { - long res = round(utcMillis, interval); - return (utcMillis < 0 && res != utcMillis) ? res - ratio * interval : res; - } - /** Resolve the date time unit. */ public static Rounding.DateTimeUnit resolve(String name) { switch (name) { diff --git a/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java b/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java index 34c7a198e8c..d3ae486a17b 100644 --- a/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java +++ b/core/src/main/java/org/opensearch/sql/utils/DateTimeUtils.java @@ -36,7 +36,8 @@ public class DateTimeUtils { * @return Rounded date/time value in utc millis */ public static long roundFloor(long utcMillis, long unitMillis) { - return utcMillis - utcMillis % unitMillis; + long res = utcMillis - utcMillis % unitMillis; + return (utcMillis < 0 && res != utcMillis) ? res - unitMillis : res; } /** @@ -65,7 +66,9 @@ public static long roundMonth(long utcMillis, int interval) { (zonedDateTime.getYear() - initDateTime.getYear()) * 12L + zonedDateTime.getMonthValue() - initDateTime.getMonthValue(); - long monthToAdd = (monthDiff / interval - 1) * interval; + long multiplier = monthDiff / interval - 1; + if (monthDiff < 0 && monthDiff % interval != 0) --multiplier; + long monthToAdd = multiplier * interval; return initDateTime.plusMonths(monthToAdd).toInstant().toEpochMilli(); } @@ -84,7 +87,9 @@ public static long roundQuarter(long utcMillis, int interval) { ((zonedDateTime.getYear() - initDateTime.getYear()) * 12L + zonedDateTime.getMonthValue() - initDateTime.getMonthValue()); - long monthToAdd = (monthDiff / (interval * 3L) - 1) * interval * 3; + long multiplier = monthDiff / (interval * 3L) - 1; + if (monthDiff < 0 && monthDiff % (interval * 3L) != 0) --multiplier; + long monthToAdd = multiplier * interval * 3; return initDateTime.plusMonths(monthToAdd).toInstant().toEpochMilli(); } @@ -99,7 +104,9 @@ public static long roundYear(long utcMillis, int interval) { ZonedDateTime initDateTime = ZonedDateTime.of(1970, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); ZonedDateTime zonedDateTime = Instant.ofEpochMilli(utcMillis).atZone(ZoneOffset.UTC); int yearDiff = zonedDateTime.getYear() - initDateTime.getYear(); - int yearToAdd = (yearDiff / interval) * interval; + int multiplier = yearDiff / interval; + if (yearDiff < 0 && yearDiff % interval != 0) --multiplier; + int yearToAdd = multiplier * interval; return initDateTime.plusYears(yearToAdd).toInstant().toEpochMilli(); } diff --git a/core/src/test/java/org/opensearch/sql/utils/DateTimeUtilsTest.java b/core/src/test/java/org/opensearch/sql/utils/DateTimeUtilsTest.java index 80e1f90eb9a..3130ac001f0 100644 --- a/core/src/test/java/org/opensearch/sql/utils/DateTimeUtilsTest.java +++ b/core/src/test/java/org/opensearch/sql/utils/DateTimeUtilsTest.java @@ -16,6 +16,7 @@ import java.time.format.DateTimeFormatter; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; +import org.opensearch.sql.planner.physical.collector.Rounding.DateTimeUnit; public class DateTimeUtilsTest { @Test @@ -105,4 +106,76 @@ void testRelativeZonedDateTimeWithWrongInput() { IllegalArgumentException.class, () -> getRelativeZonedDateTime("1d+1y", zonedDateTime)); assertEquals(e.getMessage(), "Unexpected character '1' at position 0 in input: 1d+1y"); } + + @Test + void testRoundOnTimestampBeforeEpoch() { + long actual = + LocalDateTime.parse("1961-05-12T23:40:05") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(); + long rounded = DateTimeUnit.MINUTE.round(actual, 1); + assertEquals( + LocalDateTime.parse("1961-05-12T23:40:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.HOUR.round(actual, 1); + assertEquals( + LocalDateTime.parse("1961-05-12T23:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.DAY.round(actual, 1); + assertEquals( + LocalDateTime.parse("1961-05-12T00:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.DAY.round(actual, 3); + assertEquals( + LocalDateTime.parse("1961-05-12T00:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.WEEK.round(actual, 1); + assertEquals( + LocalDateTime.parse("1961-05-08T00:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.MONTH.round(actual, 1); + assertEquals( + LocalDateTime.parse("1961-05-01T00:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.QUARTER.round(actual, 1); + assertEquals( + LocalDateTime.parse("1961-04-01T00:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + + rounded = DateTimeUnit.YEAR.round(actual, 2); + assertEquals( + LocalDateTime.parse("1960-01-01T00:00:00") + .atZone(ZoneOffset.UTC) + .toInstant() + .toEpochMilli(), + Instant.ofEpochMilli(rounded).toEpochMilli()); + } }