In [0]:
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key = "bronze_output")

start_date = bronze_output.get("start_date", "")
bronze_s3 = bronze_output.get("bronze_s3", "")
silver_s3 = bronze_output.get("silver_s3", "")

In [0]:
df = spark.read.option('multiline', "true").json(f"{bronze_s3}/{start_date}_earthquake.json")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias("longitude"),
        col('geometry.coordinates').getItem(1).alias("latitude"),
        col('geometry.coordinates').getItem(2).alias("elevation"),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)

In [0]:
df = (
    df
    .withColumn("longitude", when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn("latitude", when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
df = (
    df
    .withColumn('time', (col('time')/1000).cast(TimestampType()))
    .withColumn('updated', (col('updated')/1000).cast(TimestampType()))
)

In [0]:
df.write.mode('append').parquet(silver_s3)