In [0]:
# retrieve the output parameters from the bronze and silver notebooks
bronze_output = dbutils.jobs.taskValues.get(taskKey= 'bronze', key= 'bronze_output')
silver_path = dbutils.jobs.taskValues.get(taskKey= 'silver', key= 'silver_output')
# retrieve individual parameters
start_date = bronze_output.get('start_date','')
silver_adls = bronze_output.get('silver_adls','')
gold_adls = bronze_output.get('gold_adls','')
# display the parameters
print(f"Start Date: {start_date}")
print(f"Silver Path: {silver_path}")
print(f"Gold ADLS: {gold_adls}")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta
import reverse_geocoder as rg

In [0]:
# read the parquet data from the silver ADLS
df = spark.read\
    .format('parquet')\
        .load(silver_path)

In [0]:
# python function to get the country code frm the coordinates
"""
Retrieve the country code for a given latitude and longitude.

Parameters:
lat (float or str): Latitude of the location.
lon (float or str): Longitude of the location.

Returns:
str: Country code of the location, retrieved using the reverse geocoding API.

Example:
>>> get_country_details(48.8588443, 2.2943506)
'FR'
"""
def get_country_code(lat, lon):
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None

In [0]:
## convert the python function to a udf
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
# create a new column with the country code
df = df.withColumn('country_code', \
    get_country_code_udf(col('latitude'), col('longitude')))

In [0]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- country_code: string (nullable = true)



In [0]:
df.limit(5).display()

id,longitude,latitude,elevation,title,place_description,sig,mag,magType,time,updated,country_code
ci41143408,-116.4486667,34.31,8.73,"M 1.2 - 22 km N of Yucca Valley, CA","22 km N of Yucca Valley, CA",20,1.15,ml,2025-05-02T23:43:19.52Z,2025-05-03T00:01:19.489Z,US
ak0255m5hvxv,-152.9035,60.3,120.4,"M 1.6 - 73 km WNW of Ninilchik, Alaska","73 km WNW of Ninilchik, Alaska",39,1.6,ml,2025-05-02T23:40:01.794Z,2025-05-02T23:42:43.527Z,US
ak0255m5cfgq,-150.5436,60.5389,13.8,"M 1.6 - 12 km E of Sterling, Alaska","12 km E of Sterling, Alaska",39,1.6,ml,2025-05-02T23:14:55.463Z,2025-05-02T23:16:59.634Z,US
hv74664712,-155.298833333333,19.9691666666667,31.0,"M 3.6 - 6 km WSW of Laupāhoehoe, Hawaii","6 km WSW of Laupāhoehoe, Hawaii",223,3.58,ml,2025-05-02T22:59:40.4Z,2025-05-03T00:52:51.825Z,US
ak0255m4xbr6,-150.5616,60.546,8.2,"M 1.7 - 11 km E of Sterling, Alaska","11 km E of Sterling, Alaska",44,1.7,ml,2025-05-02T22:44:08.984Z,2025-05-02T22:46:04.534Z,US


In [0]:
# convert 'sig' into a categorical column
df = df.withColumn(
    'significance_category',
    when(col('sig') < 100, lit('Low'))
    .when((col('sig') >= 100) & (col('sig') < 500), lit('Moderate'))
    .otherwise(lit('High'))
)

In [0]:
df.limit(5).display()

id,longitude,latitude,elevation,title,place_description,sig,mag,magType,time,updated,country_code,significance_category
ci41143408,-116.4486667,34.31,8.73,"M 1.2 - 22 km N of Yucca Valley, CA","22 km N of Yucca Valley, CA",20,1.15,ml,2025-05-02T23:43:19.52Z,2025-05-03T00:01:19.489Z,US,Low
ak0255m5hvxv,-152.9035,60.3,120.4,"M 1.6 - 73 km WNW of Ninilchik, Alaska","73 km WNW of Ninilchik, Alaska",39,1.6,ml,2025-05-02T23:40:01.794Z,2025-05-02T23:42:43.527Z,US,Low
ak0255m5cfgq,-150.5436,60.5389,13.8,"M 1.6 - 12 km E of Sterling, Alaska","12 km E of Sterling, Alaska",39,1.6,ml,2025-05-02T23:14:55.463Z,2025-05-02T23:16:59.634Z,US,Low
hv74664712,-155.298833333333,19.9691666666667,31.0,"M 3.6 - 6 km WSW of Laupāhoehoe, Hawaii","6 km WSW of Laupāhoehoe, Hawaii",223,3.58,ml,2025-05-02T22:59:40.4Z,2025-05-03T00:52:51.825Z,US,Moderate
ak0255m4xbr6,-150.5616,60.546,8.2,"M 1.7 - 11 km E of Sterling, Alaska","11 km E of Sterling, Alaska",44,1.7,ml,2025-05-02T22:44:08.984Z,2025-05-02T22:46:04.534Z,US,Low


In [0]:
# save the file to the gold ADLS
write_path = f"{gold_adls}/{start_date}_earthquake_events"
df.write \
    .format('parquet') \
        .mode('overwrite') \
            .save(write_path)