In [0]:
# Retrieve the task value from the previous task (bronze)
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output")

# Access individual variables
start_date = bronze_output.get("start_date", "")
bronze_adls = bronze_output.get("bronze_adls", "")
silver_adls = bronze_output.get("silver_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

Start Date: , Bronze ADLS: 


In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date,timedelta

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline","true").json(f"{bronze_adls}{start_date}_earthquake.json")

In [0]:
df.head()

Row(geometry=Row(coordinates=[-122.80933380127, 38.8373336791992, 1.98000001907349], type='Point'), id='nc75164891', properties=Row(alert=None, cdi=None, code='75164891', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nc75164891&format=geojson', dmin=0.01393, felt=None, gap=57.0, ids=',nc75164891,', mag=0.73, magType='md', mmi=None, net='nc', nst=14, place='8 km WNW of Cobb, CA', rms=0.01, sig=8, sources=',nc,', status='automatic', time=1744588063600, title='M 0.7 - 8 km WNW of Cobb, CA', tsunami=0, type='earthquake', types=',nearby-cities,origin,phase-data,scitech-link,', tz=None, updated=1744590443372, url='https://earthquake.usgs.gov/earthquakes/eventpage/nc75164891'), type='Feature')

In [0]:
# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)
  

In [0]:
df

DataFrame[id: string, longitude: double, latitude: double, elevation: double, title: string, place_description: string, sig: bigint, mag: double, magType: string, time: bigint, updated: bigint]

In [0]:
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
df = (
    df
    .withColumn('time',(col('time') / 1000).cast(TimestampType()))
    .withColumn('updated',(col('updated') / 1000).cast(TimestampType()))
)

In [0]:
df.head()

Row(id='nc75164891', longitude=-122.80933380127, latitude=38.8373336791992, elevation=1.98000001907349, title='M 0.7 - 8 km WNW of Cobb, CA', place_description='8 km WNW of Cobb, CA', sig=8, mag=0.73, magType='md', time=datetime.datetime(2025, 4, 13, 23, 47, 43, 600000), updated=datetime.datetime(2025, 4, 14, 0, 27, 23, 372000))

In [0]:
# Save the transformed DataFrame to the Silver Container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
# Append DataFrame to Silver container in Parquet format
df.write.mode("append").parquet(silver_output_path)

In [0]:
dbutils.jobs.taskValues.set(key = "silver_output", value = silver_output_path)