In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *


In [0]:
%run ../utility/delta_merge

In [0]:
bronze_source = spark.read.table('lakehouse.`02_bronze`.earthquakes')
bronze_source.display()

In [0]:
bronze_source_exploded = bronze_source.select(F.explode('features').alias('features'))
bronze_source_exploded.display()

In [0]:
bronze_source_exploded_1  = bronze_source_exploded.select(
        "features.properties.*",
        "features.id",
        F.col("features.geometry.coordinates")[0].alias("longitude"),
        F.col("features.geometry.coordinates")[1].alias("latitude"),
        F.col("features.geometry.coordinates")[2].alias("depth")
    )
bronze_source_exploded_1.display()

In [0]:
source_transform = (
        bronze_source_exploded_1.withColumn("time", F.from_unixtime(F.col("time") / 1000).cast("timestamp"))
        .withColumn("updated", F.from_unixtime(F.col("updated") / 1000).cast("timestamp"))
        .withColumn("nst", F.col("nst").cast("double"))
        .withColumn("sig", F.col("sig").cast("double"))
        .withColumn("tsunami", F.col("tsunami").cast("double"))
        .withColumn("felt", F.col("felt").cast("double"))
    )

source_transform.display()

In [0]:
silver_source = (source_transform
                 .select("id", "time", "longitude", "latitude", "depth", "mag", "magType", "place", "gap", "dmin", "rms", "net", "code", "ids", "sources", "types", "nst", "title", "status", "tsunami", "sig","felt","updated")
                 .dropDuplicates())

silver_source.display()

## Testing

In [0]:
# silver_source.groupBy("id").count().filter("count > 1").display()

In [0]:
# silver_source.filter("id = 'se60617271'").display()

#### TODO: Get the latest updates
- group by id 
- order by update timestamp desc
- add row_number
- fitler row_number = 1  

In [0]:
from pyspark.sql.window import Window 

window_split = Window.orderBy(F.col("updated").desc()).partitionBy("id")

silver_source_deduped_final = (silver_source
                 .withColumn("row_number", F.row_number().over(window_split))
                 .filter("row_number = 1")
                 .withColumn('hash_id', F.sha2(F.concat_ws('_', F.col('id'), F.col('time')), 256))
                 .drop("row_number"))
silver_source_deduped_final.display()

In [0]:
# silver_source_deduped_final.filter("id = 'se60617271'").display()

In [0]:
# parameters
silver_table_name = "lakehouse.03_silver.earthquake_events"

In [0]:
if check_delta_table(silver_table_name):
    print(f"{silver_table_name} table exists in the catalog, Updating it")
    delta_merge(silver_source_deduped_final, silver_table_name, "hash_id")
else:
    print(f"{silver_table_name} table does not exist in the catalog, Writing it for the first time")
    silver_source_deduped_final.write.mode("overwrite").format("delta").option("delta.enableChangeDataFeed", "true").saveAsTable(silver_table_name)

In [0]:
# # silver_source_deduped_final.columns

# cols_map = {col : f"source.{col}" for col in silver_source_deduped_final.columns}
# cols_map

In [0]:
%sql
desc history lakehouse.`03_silver`.earthquake_events