Skip to content

Commit

Permalink
Merge pull request #891 from starlake-ai/fix/timestamp-with-timezone
Browse files Browse the repository at this point in the history
Fix timestamp with timezone
  • Loading branch information
hayssams authored May 27, 2024
2 parents d6a5700 + 2da2c26 commit e80a169
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 15 deletions.
2 changes: 2 additions & 0 deletions samples/spark/metadata/load/sales/orders.sl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ table:
- name: "seller_id"
type: "string"
required: false
- name: "ts"
type: "iso_instant"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
order_id,customer_id,amount,seller_id,ts
12345,A009701,123.65,AQZERD,2024-02-05T21:19:15.454Z
56432,A009701,23.8,AQZERD,2024-02-05T21:19:15.454Z
10000,B308629,23.8,AQZERD,2024-02-05T21:19:15.454Z
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
order_id,customer_id,amount,seller_id,ts
12345,A009701,123.65,AQZERD,2024-02-05T21:19:15.454Z
56432,A009701,23.8,AQZERD,2024-02-05T21:19:15.454Z
10000,B308629,23.8,AQZERD,2024-02-05T21:19:15.454Z
4 changes: 4 additions & 0 deletions samples/spark/metadata/types/types.sl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ types:
snowflake: STRING
postgres: varchar(8000)
synapse: Varchar(8000)
duckdb: VARCHAR
- name: "customerid"
primitiveType: "string"
pattern: "[A-Z][0-9]{6}"
Expand All @@ -23,6 +24,7 @@ types:
snowflake: STRING
postgres: varchar(8000)
synapse: Varchar(8000)
duckdb: VARCHAR
- name: "sellerid"
primitiveType: "string"
pattern: "[0-9]{6}"
Expand All @@ -34,6 +36,7 @@ types:
snowflake: STRING
postgres: varchar(8000)
synapse: Varchar(8000)
duckdb: VARCHAR
- name: "longstring"
primitiveType: "string"
pattern: ".+"
Expand All @@ -45,3 +48,4 @@ types:
snowflake: STRING
postgres: varchar(8000)
synapse: Varchar(8000) # https://docs.microsoft.com/fr-fr/azure/synapse-analytics/metadata/table
duckdb: VARCHAR
8 changes: 4 additions & 4 deletions samples/spark/sample-data/sales/orders-2018-01-01.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
order_id,customer_id,amount,seller_id
12345,A009701,123.65,AQZERD
56432,A009701,23.8,AQZERD
10000,B308629,23.8,AQZERD
order_id,customer_id,amount,seller_id,ts
12345,A009701,123.65,AQZERD,2024-02-05T21:19:15.454Z
56432,A009701,23.8,AQZERD,2024-02-05T21:19:15.454Z
10000,B308629,23.8,AQZERD,2024-02-05T21:19:15.454Z
29 changes: 18 additions & 11 deletions src/main/scala/ai/starlake/schema/model/PrimitiveType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ object PrimitiveType {
def sparkType(zone: Option[String]): DataType = new StructType(Array.empty[StructField])
}

private def instantFromString(str: String, pattern: String, zone: String): Instant = {
private def instantFromString(str: String, pattern: String, zoneId: ZoneId): Instant = {

def simpleDateFormat(str: String, pattern: String, zoneId: ZoneId): Instant = {
if (zoneId.getId != "UTC")
Expand All @@ -224,11 +224,6 @@ object PrimitiveType {
Instant.ofEpochMilli(date.getTime)
}

val zoneId = Option(zone) match {
case Some(z) => ZoneId.of(z)
case None => ZoneId.of("UTC")
}

pattern match {
case "epoch_second" =>
Instant.ofEpochSecond(str.toLong)
Expand Down Expand Up @@ -333,12 +328,24 @@ object PrimitiveType {
object timestamp extends PrimitiveType("timestamp") {

def fromString(str: String, timeFormat: String, zone: String): Any = {
if (str == null || str.isEmpty)
null
else {
val instant = instantFromString(str.trim, timeFormat, zone)
Timestamp.from(instant)
val res = {
if (str == null || str.isEmpty)
null
else {
val zoneId = Option(zone) match {
case Some(z) => ZoneId.of(z)
case None => ZoneId.of("UTC")
}

val instant = instantFromString(str.trim, timeFormat, zoneId)
val currentTimezoneOffset = zoneId.getRules.getOffset(instant)

val ldt = LocalDateTime.ofInstant(instant, currentTimezoneOffset)
val current = Timestamp.valueOf(ldt)
current
}
}
res
}

def sparkType(zone: Option[String]): DataType = TimestampType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class LoadLocalIntegrationSpec extends IntegrationTestBase with TestHelper {
) {
new Directory(new java.io.File(settings.appConfig.datasets)).deleteRecursively()

dropTables
copyFilesToIncomingDir(sampleDataDir)
assert(
new Main().run(
Expand Down

0 comments on commit e80a169

Please sign in to comment.