In [0]:
# Retrieve the task value from the previous task (bronze)
bronze_output = dbutils.jobs.taskValues.get(
    taskKey="Bronze", 
    key="bronze_output", 
    debugValue={"start_date": "", "bronze_adls": "", "silver_adls": ""}
)
# Access individual variables\
start_date = bronze_output.get("start_date", "")
bronze_adls = bronze_output.get("bronze_adls", "")
silver_adls = bronze_output.get("silver_adls", "")

print(f"start_date : {start_date}, Bronze_adls : {bronze_adls}")


start_date : , Bronze_adls : 


In [0]:

from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
    df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

In [0]:
# Reshape earthquake data
df = df.select(
               'id',
               col('geometry.coordinates').getItem(0).alias('longitude'),
               col('geometry.coordinates').getItem(1).alias('latitude'),
               col('geometry.coordinates').getItem(3).alias('elevation'),
               col('properties.title').alias('title'),             
               col('properties.place').alias('place_description'),
               col('properties.sig').alias('sig'),
               col('properties.mag').alias('mag'), 
               col('properties.magType').alias('magType'),
               col('properties.time').alias('time'),
               col('properties.updated').alias('updated')          
)

In [0]:
df.head()


Row(id='av93037249', longitude=-155.112333333333, latitude=58.2336666666667, elevation=None, title='M -0.6 - 83 km NNW of Karluk, Alaska', place_description='83 km NNW of Karluk, Alaska', sig=0, mag=-0.61, magType='ml', time=1743119514350, updated=1743139564820)

In [0]:
df = ( df.withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
       .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
       .withColumn('elevation', when(isnull(col('elevation')), 0).otherwise(col('elevation')))

)   
    

In [0]:
df = (
       df.withColumn('time', (col('time') / 1000).cast(TimestampType()))
          .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
    )



In [0]:
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
df.write.mode('append').parquet(silver_output_path)


In [0]:
dbutils.jobs.taskValues.set(key = "silver_output", value = silver_output_path)