Skip to content

Commit

Permalink
[SPARK-31984][SQL] Make micros rebasing functions via local timestamp…
Browse files Browse the repository at this point in the history
…s pure

### What changes were proposed in this pull request?
1. Set the given time zone as the first parameter of `RebaseDateTime`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` to Java 7 `GregorianCalendar`.
```scala
    val cal = new Calendar.Builder()
      // `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems
      .setCalendarType("gregory")
    ...
      .setTimeZone(tz)
      .build()
```
This makes the instance of the calendar independent from the default JVM time zone.

2. Change type of the first parameter from `ZoneId` to `TimeZone`. This allows to avoid unnecessary conversion from `TimeZone` to `ZoneId`, for example in
```scala
  def rebaseJulianToGregorianMicros(micros: Long): Long = {
    ...
      if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
        rebaseJulianToGregorianMicros(timeZone.toZoneId, micros)
```
and back to `TimeZone` inside of `rebaseJulianToGregorianMicros(zoneId: ZoneId, ...)`

3. Modify tests in `RebaseDateTimeSuite`, and set the default JVM time zone only for functions that depend on it.

### Why are the changes needed?
1. Ignoring passed parameter and using a global variable is bad practice.
2. Dependency from the global state doesn't allow to run the functions in parallel otherwise there is non-zero probability that the functions may return wrong result if the default JVM is changed during their execution.
3. This open opportunity for parallelisation of JSON files generation `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`. Currently, the tests `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'` generate the JSON files by iterating over all time zones sequentially w/ step of 1 week. Due to the large step, we can miss some spikes in diffs between 2 calendars (Java 8 Gregorian and Java 7 hybrid calendars) as the PR apache#28787 fixed and apache#28816 should fix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running existing tests from `RebaseDateTimeSuite`.

Closes apache#28824 from MaxGekk/pure-micros-rebasing.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Jun 16, 2020
1 parent d24d27f commit 6e9ff72
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,14 @@ object RebaseDateTime {
* Julian calendar: 1582-01-01 00:00:00.123456 -> -12243196799876544
* The code below converts -12244061221876544 to -12243196799876544.
*
* @param zoneId The time zone ID at which the rebasing should be performed.
* @param tz The time zone at which the rebasing should be performed.
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'
* in Proleptic Gregorian calendar. It can be negative.
* @return The rebased microseconds since the epoch in Julian calendar.
*/
private[sql] def rebaseGregorianToJulianMicros(zoneId: ZoneId, micros: Long): Long = {
private[sql] def rebaseGregorianToJulianMicros(tz: TimeZone, micros: Long): Long = {
val instant = microsToInstant(micros)
val zoneId = tz.toZoneId
val zonedDateTime = instant.atZone(zoneId)
var ldt = zonedDateTime.toLocalDateTime
if (ldt.isAfter(julianEndTs) && ldt.isBefore(gregorianStartTs)) {
Expand All @@ -337,6 +338,7 @@ object RebaseDateTime {
.setCalendarType("gregory")
.setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth)
.setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond)
.setTimeZone(tz)
.build()
// A local timestamp can have 2 instants in the cases of switching from:
// 1. Summer to winter time.
Expand Down Expand Up @@ -379,7 +381,7 @@ object RebaseDateTime {
val tzId = timeZone.getID
val rebaseRecord = gregJulianRebaseMap.getOrNull(tzId)
if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
rebaseGregorianToJulianMicros(timeZone.toZoneId, micros)
rebaseGregorianToJulianMicros(timeZone, micros)
} else {
rebaseMicros(rebaseRecord, micros)
}
Expand All @@ -401,17 +403,17 @@ object RebaseDateTime {
* Proleptic Gregorian calendar: 1582-01-01 00:00:00.123456 -> -12244061221876544
* The code below converts -12243196799876544 to -12244061221876544.
*
* @param zoneId The time zone ID at which the rebasing should be performed.
* @param tz The time zone at which the rebasing should be performed.
* @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'
* in the Julian calendar. It can be negative.
* @return The rebased microseconds since the epoch in Proleptic Gregorian calendar.
*/
private[sql] def rebaseJulianToGregorianMicros(zoneId: ZoneId, micros: Long): Long = {
private[sql] def rebaseJulianToGregorianMicros(tz: TimeZone, micros: Long): Long = {
val cal = new Calendar.Builder()
// `gregory` is a hybrid calendar that supports both
// the Julian and Gregorian calendar systems
// `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems
.setCalendarType("gregory")
.setInstant(microsToMillis(micros))
.setTimeZone(tz)
.build()
val localDateTime = LocalDateTime.of(
cal.get(YEAR),
Expand All @@ -427,6 +429,7 @@ object RebaseDateTime {
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
.`with`(ChronoField.ERA, cal.get(ERA))
.plusDays(cal.get(DAY_OF_MONTH) - 1)
val zoneId = tz.toZoneId
val zonedDateTime = localDateTime.atZone(zoneId)
// In the case of local timestamp overlapping, we need to choose the correct time instant
// which is related to the original local timestamp. We look ahead of 1 day, and if the next
Expand Down Expand Up @@ -479,7 +482,7 @@ object RebaseDateTime {
val tzId = timeZone.getID
val rebaseRecord = julianGregRebaseMap.getOrNull(tzId)
if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
rebaseJulianToGregorianMicros(timeZone.toZoneId, micros)
rebaseJulianToGregorianMicros(timeZone, micros)
} else {
rebaseMicros(rebaseRecord, micros)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,48 +201,48 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
test("optimization of micros rebasing - Gregorian to Julian") {
outstandingZoneIds.foreach { zid =>
withClue(s"zone id = $zid") {
withDefaultTimeZone(zid) {
val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
.atZone(zid)
.toInstant)
val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0)
.atZone(zid)
.toInstant)
var micros = start
do {
val rebased = rebaseGregorianToJulianMicros(zid, micros)
val rebasedAndOptimized = rebaseGregorianToJulianMicros(micros)
assert(rebasedAndOptimized === rebased)
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
} while (micros <= end)
}
val start = instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
.atZone(zid)
.toInstant)
val end = instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0)
.atZone(zid)
.toInstant)
var micros = start
do {
val rebased = rebaseGregorianToJulianMicros(TimeZone.getTimeZone(zid), micros)
val rebasedAndOptimized = withDefaultTimeZone(zid) {
rebaseGregorianToJulianMicros(micros)
}
assert(rebasedAndOptimized === rebased)
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
} while (micros <= end)
}
}
}

test("optimization of micros rebasing - Julian to Gregorian") {
outstandingZoneIds.foreach { zid =>
withClue(s"zone id = $zid") {
withDefaultTimeZone(zid) {
val start = rebaseGregorianToJulianMicros(
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
val end = rebaseGregorianToJulianMicros(
instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant))
var micros = start
do {
val rebased = rebaseJulianToGregorianMicros(zid, micros)
val rebasedAndOptimized = rebaseJulianToGregorianMicros(micros)
assert(rebasedAndOptimized === rebased)
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
} while (micros <= end)
}
val start = rebaseGregorianToJulianMicros(
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
val end = rebaseGregorianToJulianMicros(
instantToMicros(LocalDateTime.of(2100, 1, 1, 0, 0, 0).atZone(zid).toInstant))
var micros = start
do {
val rebased = rebaseJulianToGregorianMicros(TimeZone.getTimeZone(zid), micros)
val rebasedAndOptimized = withDefaultTimeZone(zid) {
rebaseJulianToGregorianMicros(micros)
}
assert(rebasedAndOptimized === rebased)
micros += (MICROS_PER_DAY * 30 * (0.5 + Math.random())).toLong
} while (micros <= end)
}
}
}

private def generateRebaseJson(
adjustFunc: Long => Long,
rebaseFunc: (ZoneId, Long) => Long,
adjustFunc: (TimeZone, Long) => Long,
rebaseFunc: (TimeZone, Long) => Long,
dir: String,
fileName: String): Unit = {
import java.nio.file.{Files, Paths}
Expand All @@ -260,14 +260,15 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
.sortBy(_.getId)
.foreach { zid =>
withDefaultTimeZone(zid) {
val start = adjustFunc(instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0)
.atZone(zid)
.toInstant))
val tz = TimeZone.getTimeZone(zid)
val start = adjustFunc(
tz,
instantToMicros(LocalDateTime.of(1, 1, 1, 0, 0, 0).atZone(zid).toInstant))
// sun.util.calendar.ZoneInfo resolves DST after 2037 year incorrectly.
// See https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8073446
val end = adjustFunc(instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0)
.atZone(zid)
.toInstant))
val end = adjustFunc(
tz,
instantToMicros(LocalDateTime.of(2037, 1, 1, 0, 0, 0).atZone(zid).toInstant))

var micros = start
var diff = Long.MaxValue
Expand All @@ -276,7 +277,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
val switches = new ArrayBuffer[Long]()
val diffs = new ArrayBuffer[Long]()
while (micros < end) {
val rebased = rebaseFunc(zid, micros)
val rebased = rebaseFunc(tz, micros)
val curDiff = rebased - micros
if (curDiff != diff) {
if (step > MICROS_PER_SECOND) {
Expand Down Expand Up @@ -308,7 +309,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {

ignore("generate 'gregorian-julian-rebase-micros.json'") {
generateRebaseJson(
adjustFunc = identity[Long],
adjustFunc = (_: TimeZone, micros: Long) => micros,
rebaseFunc = rebaseGregorianToJulianMicros,
dir = "/Users/maximgekk/tmp",
fileName = "gregorian-julian-rebase-micros.json")
Expand Down Expand Up @@ -383,26 +384,27 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {

test("rebase not-existed timestamps in the hybrid calendar") {
outstandingZoneIds.foreach { zid =>
withDefaultTimeZone(zid) {
Seq(
"1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999",
"1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
"1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001",
"1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
"1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
"1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001",
"1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455",
"1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111",
"1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415",
"1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321",
"1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
"1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000"
).foreach { case (gregTs, hybridTs) =>
withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") {
val hybridMicros = parseToJulianMicros(hybridTs)
val gregorianMicros = parseToGregMicros(gregTs, zid)

assert(rebaseGregorianToJulianMicros(zid, gregorianMicros) === hybridMicros)
Seq(
"1582-10-04T23:59:59.999999" -> "1582-10-04 23:59:59.999999",
"1582-10-05T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
"1582-10-06T01:02:03.000001" -> "1582-10-15 01:02:03.000001",
"1582-10-07T00:00:00.000000" -> "1582-10-15 00:00:00.000000",
"1582-10-08T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
"1582-10-09T23:59:59.001001" -> "1582-10-15 23:59:59.001001",
"1582-10-10T00:11:22.334455" -> "1582-10-15 00:11:22.334455",
"1582-10-11T11:12:13.111111" -> "1582-10-15 11:12:13.111111",
"1582-10-12T10:11:12.131415" -> "1582-10-15 10:11:12.131415",
"1582-10-13T00:00:00.123321" -> "1582-10-15 00:00:00.123321",
"1582-10-14T23:59:59.999999" -> "1582-10-15 23:59:59.999999",
"1582-10-15T00:00:00.000000" -> "1582-10-15 00:00:00.000000"
).foreach { case (gregTs, hybridTs) =>
withClue(s"tz = ${zid.getId} greg ts = $gregTs hybrid ts = $hybridTs") {
val hybridMicros = withDefaultTimeZone(zid) { parseToJulianMicros(hybridTs) }
val gregorianMicros = parseToGregMicros(gregTs, zid)

val tz = TimeZone.getTimeZone(zid)
assert(rebaseGregorianToJulianMicros(tz, gregorianMicros) === hybridMicros)
withDefaultTimeZone(zid) {
assert(rebaseGregorianToJulianMicros(gregorianMicros) === hybridMicros)
}
}
Expand All @@ -416,38 +418,39 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
// clocks were moved backward to become Sunday, 18 November, 1945 01:00:00 AM.
// In this way, the overlap happened w/o Daylight Saving Time.
val hkZid = getZoneId("Asia/Hong_Kong")
var expected = "1945-11-18 01:30:00.0"
var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0)
var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
var overlapInterval = MICROS_PER_HOUR
if (earlierMicros + overlapInterval != laterMicros) {
// Old JDK might have an outdated time zone database.
// See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition
// from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00"
expected = "1945-09-14 23:30:00.0"
ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0)
earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
// If time zone db doesn't have overlapping at all, set the overlap interval to zero.
overlapInterval = laterMicros - earlierMicros
}
val hkTz = TimeZone.getTimeZone(hkZid)
val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkTz, earlierMicros)
val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkTz, laterMicros)
assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros)
withDefaultTimeZone(hkZid) {
var expected = "1945-11-18 01:30:00.0"
var ldt = LocalDateTime.of(1945, 11, 18, 1, 30, 0)
var earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
var laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
var overlapInterval = MICROS_PER_HOUR
if (earlierMicros + overlapInterval != laterMicros) {
// Old JDK might have an outdated time zone database.
// See https://bugs.openjdk.java.net/browse/JDK-8228469: "Hong Kong ... Its 1945 transition
// from JST to HKT was on 11-18 at 02:00, not 09-15 at 00:00"
expected = "1945-09-14 23:30:00.0"
ldt = LocalDateTime.of(1945, 9, 14, 23, 30, 0)
earlierMicros = instantToMicros(ldt.atZone(hkZid).withEarlierOffsetAtOverlap().toInstant)
laterMicros = instantToMicros(ldt.atZone(hkZid).withLaterOffsetAtOverlap().toInstant)
// If time zone db doesn't have overlapping at all, set the overlap interval to zero.
overlapInterval = laterMicros - earlierMicros
}
val rebasedEarlierMicros = rebaseGregorianToJulianMicros(hkZid, earlierMicros)
val rebasedLaterMicros = rebaseGregorianToJulianMicros(hkZid, laterMicros)
def toTsStr(micros: Long): String = toJavaTimestamp(micros).toString
assert(toTsStr(rebasedEarlierMicros) === expected)
assert(toTsStr(rebasedLaterMicros) === expected)
assert(rebasedEarlierMicros + overlapInterval === rebasedLaterMicros)
// Check optimized rebasing
assert(rebaseGregorianToJulianMicros(earlierMicros) === rebasedEarlierMicros)
assert(rebaseGregorianToJulianMicros(laterMicros) === rebasedLaterMicros)
// Check reverse rebasing
assert(rebaseJulianToGregorianMicros(rebasedEarlierMicros) === earlierMicros)
assert(rebaseJulianToGregorianMicros(rebasedLaterMicros) === laterMicros)
// Check reverse not-optimized rebasing
assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros)
assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros)
}
// Check reverse not-optimized rebasing
assert(rebaseJulianToGregorianMicros(hkTz, rebasedEarlierMicros) === earlierMicros)
assert(rebaseJulianToGregorianMicros(hkTz, rebasedLaterMicros) === laterMicros)
}
}

0 comments on commit 6e9ff72

Please sign in to comment.