In [0]:
#retrive the task value from the previous tasl (bronze)
bronze_output=dbutils.jobs.taskValues.get(taskKey="bronze", key="bronze_output", debugValue="default")
#Access indivial variables
start_date=bronze_output.get("start_date","")
bronze_path=bronze_output.get("bronze_path","")
silver_path=bronze_output.get("silver_path","")
print("Start Date:{start_date},Bronze path:{bronze_path}")

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
df=spark.read.option("multiline","true").json(f"{bronze_path}{start_date}_earthquake_data.json")


In [0]:
df

In [0]:
#reshaping the equarthquake data
df=(df.select(
    'id',
    col('geometry.coordinates').getItem(0).alias('longitude'),
    col('geometry.coordinates').getItem(1).alias('latitude'),
    col('geometry.coordinates').getItem(2).alias('elevation'),
    col('properties.title').alias('title'),
    col('properties.place').alias('place_description'),
    col('properties.sig').alias('sig'),
    col('properties.mag').alias('mag'),
    col('properties.magtype').alias("magtype"),
    col("properties.time").alias("time"),
    col('properties.updated').alias('updated')
    )
)

In [0]:
df.head()

In [0]:
#Validate data: check for null values
df=(df
    .withColumn('longitude', when(isnull(col('longitude')),0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')),0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')),0).otherwise(col('time')))
)

In [0]:
#converting 'time' and 'updated' to timestamp
df=(df
    .withColumn('time',(col('time')/1000).cast(TimestampType()))
    .withColumn('updated',(col('updated')/1000).cast(TimestampType()))
)
df.head()

In [0]:
silver_output_path=f"{silver_path}earthquake_events_silver/"

In [0]:
df.write.mode('append').parquet(silver_output_path)

In [0]:
dbutils.jobs.taskValues.set(key="silver_output",value=silver_output_path)