In [0]:
import json

dbutils.widgets.text('bronze_params', '')
dbutils.widgets.text('silver_params', '')

bronze_params = dbutils.widgets.get('bronze_params')
silver_params = dbutils.widgets.get('silver_params')

print(f'Raw Bronze param: {bronze_params}')
print(f'Raw Silver param: {silver_params}')

bronze_data = json.loads(bronze_params)

start_date = bronze_data.get('start_date')
end_date = bronze_data.get('end_date')
silver_adls = bronze_data.get('silver_adls')
gold_adls = bronze_data.get('gold_adls')
silver_data = silver_params 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta
import reverse_geocoder as rg

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

In [0]:
df = df.limit(100) #for faster processing

In [0]:
def get_country_code(lat,lon):
    #retrive country code from the given latitude and longitude

    coordinates = (float(lat), float(lon))

    try:
        result = rg.search(coordinates)[0].get('cc')
        print(f'Processed coordinate {coordinates} -> {result}')
        return result
    except Exception as e:
        print(f'Error processing coordinate {lat}, {lon} -> {str(e)}')
        return None

In [0]:
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
get_country_code(48.8588443, 2.2943506)

Loading formatted geocoded file...
Processed coordinate (48.8588443, 2.2943506) -> FR


'FR'

In [0]:
#add country code and city attribute
df_with_location = df.withColumn('country_code', get_country_code_udf(col('latitude'), col('longitude')))

In [0]:
df_with_location.head()

Row(id='nc75162167', longitude=-122.818168640137, latitude=38.8136672973633, elevation=1.41999995708466, title='M 0.8 - 7 km NW of The Geysers, CA', place_description='7 km NW of The Geysers, CA', mag=0.75, magType='md', time=datetime.datetime(2025, 4, 7, 23, 58, 49, 490000), updated=datetime.datetime(2025, 4, 8, 0, 0, 24, 268000), sig=9, country_code='US')

In [0]:
#add significance classification
df_with_location_sig_class = df_with_location.withColumn(
    'sig_class',
    when(col('sig') < 100, 'Low')
    .when((col('sig') >= 100) & (col('sig') < 500), 'Moderate')
    .otherwise('High')
)

In [0]:
df_with_location_sig_class.printSchema()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-7355990501835673>, line 1[0m
[0;32m----> 1[0m df_with_location_sig_class[38;5;241m.[39mprintSchema()

[0;31mNameError[0m: name 'df_with_location_sig_class' is not defined

In [0]:
gold_output_path = f'{gold_adls}earthquake_events_data/'

df_with_location_sig_class.write.mode('append').parquet(gold_output_path)