In [0]:
'''
# Remove this before running Data Factory Pipeline

from datetime import date, timedelta

start_date = date.today() - timedelta(1)

bronze_adls = "abfss://bronze@dataomar123.dfs.core.windows.net/"
silver_adls = "abfss://silver@dataomar123.dfs.core.windows.net/"

'''

In [0]:
import json

# Retrieve the bronze_params directly as a widget
bronze_params = dbutils.widgets.get("bronze_params")
print(f"Raw bronze_params: {bronze_params}")

# Parse the JSON string
output_data = json.loads(bronze_params)

# Access individual variables
start_date = output_data.get("start_date", "")
end_date = output_data.get("end_date", "")
bronze_adls = output_data.get("bronze_adls", "")
silver_adls = output_data.get("silver_adls", "")
gold_adls = output_data.get("gold_adls", "")

print(f"Start Date: {start_date}, Bronze ADLS: {bronze_adls}")

In [0]:
from pyspark.sql.functions import col, isnull, when
from pyspark.sql.types import TimestampType
from datetime import date, timedelta

In [0]:
# Load the JSON data into a Spark DataFrame
df = spark.read.option("multiline", "true").json(f"{bronze_adls}{start_date}_earthquake_data.json")

In [0]:
df.head()

Row(geometry=Row(coordinates=[-92.9183333333333, 35.873, 2.76], type='Point'), id='nm60503563', properties=Row(alert=None, cdi=2.2, code='60503563', detail='https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nm60503563&format=geojson', dmin=0.4851, felt=1, gap=94.0, ids=',nm60503563,', mag=2.38, magType='md', mmi=None, net='nm', nst=13, place='14 km NE of Pelsor, Arkansas', rms=0.26, sig=87, sources=',nm,', status='reviewed', time=1746316297500, title='M 2.4 - 14 km NE of Pelsor, Arkansas', tsunami=0, type='earthquake', types=',dyfi,origin,phase-data,', tz=None, updated=1746373147630, url='https://earthquake.usgs.gov/earthquakes/eventpage/nm60503563'), type='Feature')

In [0]:
df

DataFrame[geometry: struct<coordinates:array<double>,type:string>, id: string, properties: struct<alert:string,cdi:double,code:string,detail:string,dmin:double,felt:bigint,gap:double,ids:string,mag:double,magType:string,mmi:double,net:string,nst:bigint,place:string,rms:double,sig:bigint,sources:string,status:string,time:bigint,title:string,tsunami:bigint,type:string,types:string,tz:string,updated:bigint,url:string>, type: string]

In [0]:
df.display()

geometry,id,properties,type
"List(List(-92.9183333333333, 35.873, 2.76), Point)",nm60503563,"List(null, 2.2, 60503563, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nm60503563&format=geojson, 0.4851, 1, 94.0, ,nm60503563,, 2.38, md, null, nm, 13, 14 km NE of Pelsor, Arkansas, 0.26, 87, ,nm,, reviewed, 1746316297500, M 2.4 - 14 km NE of Pelsor, Arkansas, 0, earthquake, ,dyfi,origin,phase-data,, null, 1746373147630, https://earthquake.usgs.gov/earthquakes/eventpage/nm60503563)",Feature
"List(List(-119.3563, 39.6467, 12.1), Point)",nn00897188,"List(null, null, 00897188, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nn00897188&format=geojson, 0.348, null, 130.84999999999997, ,nn00897188,, 1.4, ml, null, nn, 16, 6 km WNW of Wadsworth, Nevada, 0.1902, 30, ,nn,, automatic, 1746315476027, M 1.4 - 6 km WNW of Wadsworth, Nevada, 0, earthquake, ,origin,phase-data,, null, 1746315629162, https://earthquake.usgs.gov/earthquakes/eventpage/nn00897188)",Feature
"List(List(-121.63200378418, 36.8481674194336, 0.689999997615814), Point)",nc75176126,"List(null, null, 75176126, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nc75176126&format=geojson, 0.0484, null, 169.0, ,nc75176126,, 1.42, md, null, nc, 10, 5 km S of Aromas, CA, 0.16, 31, ,nc,, automatic, 1746314360660, M 1.4 - 5 km S of Aromas, CA, 0, earthquake, ,nearby-cities,origin,phase-data,scitech-link,, null, 1746316039185, https://earthquake.usgs.gov/earthquakes/eventpage/nc75176126)",Feature
"List(List(-147.4666, 64.9681, 0.0), Point)",ak0255nsv4h2,"List(null, null, 0255nsv4h2, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak0255nsv4h2&format=geojson, null, null, null, ,ak0255nsv4h2,, 1.1, ml, null, ak, null, 7 km E of Fox, Alaska, 0.63, 19, ,ak,, automatic, 1746313911102, M 1.1 - 7 km E of Fox, Alaska, 0, earthquake, ,origin,phase-data,, null, 1746314018235, https://earthquake.usgs.gov/earthquakes/eventpage/ak0255nsv4h2)",Feature
"List(List(-151.8208, 64.4694, 2.3), Point)",ak0255nstyyw,"List(null, null, 0255nstyyw, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak0255nstyyw&format=geojson, null, null, null, ,ak0255nstyyw,, 1.1, ml, null, ak, null, 69 km NNE of Lake Minchumina, Alaska, 0.82, 19, ,ak,, automatic, 1746313573207, M 1.1 - 69 km NNE of Lake Minchumina, Alaska, 0, earthquake, ,origin,phase-data,, null, 1746313713885, https://earthquake.usgs.gov/earthquakes/eventpage/ak0255nstyyw)",Feature
"List(List(-148.3672, 61.5458, 20.9), Point)",ak0255nstyqr,"List(null, null, 0255nstyqr, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak0255nstyqr&format=geojson, null, null, null, ,ak0255nstyqr,, 1.2, ml, null, ak, null, 28 km S of Chickaloon, Alaska, 0.25, 22, ,ak,, automatic, 1746313570274, M 1.2 - 28 km S of Chickaloon, Alaska, 0, earthquake, ,origin,phase-data,, null, 1746313643518, https://earthquake.usgs.gov/earthquakes/eventpage/ak0255nstyqr)",Feature
"List(List(-156.8836, 56.015, 2.4), Point)",ak0255nssoyd,"List(null, null, 0255nssoyd, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak0255nssoyd&format=geojson, null, null, null, ,ak0255nssoyd,, 2.3, ml, null, ak, null, 99 km ESE of Chignik, Alaska, 0.54, 81, ,ak,, automatic, 1746313216851, M 2.3 - 99 km ESE of Chignik, Alaska, 0, earthquake, ,origin,phase-data,, null, 1746313356258, https://earthquake.usgs.gov/earthquakes/eventpage/ak0255nssoyd)",Feature
"List(List(-148.183, 65.4472, 3.6), Point)",ak0255nsim6z,"List(null, null, 0255nsim6z, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak0255nsim6z&format=geojson, null, null, null, ,ak0255nsim6z,, 2.5, ml, null, ak, null, 18 km ESE of Livengood, Alaska, 0.63, 96, ,ak,, automatic, 1746312795479, M 2.5 - 18 km ESE of Livengood, Alaska, 0, earthquake, ,origin,phase-data,, null, 1746312905336, https://earthquake.usgs.gov/earthquakes/eventpage/ak0255nsim6z)",Feature
"List(List(-149.5192, 63.6266, 57.4), Point)",ak0255nsesao,"List(null, null, 0255nsesao, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=ak0255nsesao&format=geojson, null, null, null, ,ak0255nsesao,, 1.5, ml, null, ak, null, 32 km WSW of Denali Park, Alaska, 1.41, 35, ,ak,, automatic, 1746311728322, M 1.5 - 32 km WSW of Denali Park, Alaska, 0, earthquake, ,origin,phase-data,, null, 1746311848679, https://earthquake.usgs.gov/earthquakes/eventpage/ak0255nsesao)",Feature
"List(List(-122.77116394043, 38.787166595459, 1.23000001907349), Point)",nc75176121,"List(null, null, 75176121, https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=nc75176121&format=geojson, 0.01615, null, 92.0, ,nc75176121,, 0.74, md, null, nc, 9, 2 km NW of The Geysers, CA, 0.01, 8, ,nc,, automatic, 1746311338900, M 0.7 - 2 km NW of The Geysers, CA, 0, earthquake, ,nearby-cities,origin,phase-data,scitech-link,, null, 1746314240990, https://earthquake.usgs.gov/earthquakes/eventpage/nc75176121)",Feature


In [0]:

# Reshape earthquake data
df = (
    df
    .select(
        'id',
        col('geometry.coordinates').getItem(0).alias('longitude'),
        col('geometry.coordinates').getItem(1).alias('latitude'),
        col('geometry.coordinates').getItem(2).alias('elevation'),
        col('properties.title').alias('title'),
        col('properties.place').alias('place_description'),
        col('properties.sig').alias('sig'),
        col('properties.mag').alias('mag'),
        col('properties.magType').alias('magType'),
        col('properties.time').alias('time'),
        col('properties.updated').alias('updated')
    )
)
    

In [0]:
df.head()

Row(id='nm60503563', longitude=-92.9183333333333, latitude=35.873, elevation=2.76, title='M 2.4 - 14 km NE of Pelsor, Arkansas', place_description='14 km NE of Pelsor, Arkansas', sig=87, mag=2.38, magType='md', time=1746316297500, updated=1746373147630)

In [0]:
df.display()

id,longitude,latitude,elevation,title,place_description,sig,mag,magType,time,updated
nm60503563,-92.9183333333333,35.873,2.76,"M 2.4 - 14 km NE of Pelsor, Arkansas","14 km NE of Pelsor, Arkansas",87,2.38,md,1746316297500,1746373147630
nn00897188,-119.3563,39.6467,12.1,"M 1.4 - 6 km WNW of Wadsworth, Nevada","6 km WNW of Wadsworth, Nevada",30,1.4,ml,1746315476027,1746315629162
nc75176126,-121.63200378418,36.8481674194336,0.689999997615814,"M 1.4 - 5 km S of Aromas, CA","5 km S of Aromas, CA",31,1.42,md,1746314360660,1746316039185
ak0255nsv4h2,-147.4666,64.9681,0.0,"M 1.1 - 7 km E of Fox, Alaska","7 km E of Fox, Alaska",19,1.1,ml,1746313911102,1746314018235
ak0255nstyyw,-151.8208,64.4694,2.3,"M 1.1 - 69 km NNE of Lake Minchumina, Alaska","69 km NNE of Lake Minchumina, Alaska",19,1.1,ml,1746313573207,1746313713885
ak0255nstyqr,-148.3672,61.5458,20.9,"M 1.2 - 28 km S of Chickaloon, Alaska","28 km S of Chickaloon, Alaska",22,1.2,ml,1746313570274,1746313643518
ak0255nssoyd,-156.8836,56.015,2.4,"M 2.3 - 99 km ESE of Chignik, Alaska","99 km ESE of Chignik, Alaska",81,2.3,ml,1746313216851,1746313356258
ak0255nsim6z,-148.183,65.4472,3.6,"M 2.5 - 18 km ESE of Livengood, Alaska","18 km ESE of Livengood, Alaska",96,2.5,ml,1746312795479,1746312905336
ak0255nsesao,-149.5192,63.6266,57.4,"M 1.5 - 32 km WSW of Denali Park, Alaska","32 km WSW of Denali Park, Alaska",35,1.5,ml,1746311728322,1746311848679
nc75176121,-122.77116394043,38.787166595459,1.23000001907349,"M 0.7 - 2 km NW of The Geysers, CA","2 km NW of The Geysers, CA",8,0.74,md,1746311338900,1746314240990


In [0]:

# Validate & clean data frame: Check for missing or null values
df = (
    df
    .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
    .withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
    .withColumn('time', when(isnull(col('time')), 0).otherwise(col('time')))
)

In [0]:
df.head()

Row(id='nm60503563', longitude=-92.9183333333333, latitude=35.873, elevation=2.76, title='M 2.4 - 14 km NE of Pelsor, Arkansas', place_description='14 km NE of Pelsor, Arkansas', sig=87, mag=2.38, magType='md', time=1746316297500, updated=1746373147630)

In [0]:
# Convert 'time' and 'updated' to timestamp from Unix time
df = (
    df
    .withColumn('time', (col('time') / 1000).cast(TimestampType()))
    .withColumn('updated', (col('updated') / 1000).cast(TimestampType()))
)
     

In [0]:
df.head()

Row(id='nm60503563', longitude=-92.9183333333333, latitude=35.873, elevation=2.76, title='M 2.4 - 14 km NE of Pelsor, Arkansas', place_description='14 km NE of Pelsor, Arkansas', sig=87, mag=2.38, magType='md', time=datetime.datetime(2025, 5, 3, 23, 51, 37, 500000), updated=datetime.datetime(2025, 5, 4, 15, 39, 7, 630000))

In [0]:
# Save the transformed DataFrame in the Silver container
silver_output_path = f"{silver_adls}earthquake_events_silver/"

In [0]:
# Append DataFrame to Silver container in Parquet format
df.write.mode('append').parquet(silver_output_path)

In [0]:
# Exit the notebook and return the silver_output_path as the result/output,
# so it can be used by another notebook or pipeline activity (e.g., in Azure Data Factory).

dbutils.notebook.exit(silver_output_path)