In [0]:
# Retrieve the task value from the previous task (bronze & silver)
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze", key="bronze_output")
silver_data = dbutils.jobs.taskValues.get(taskKey="Silver", key="silver_output")

# Access individual variables
start_date = bronze_output.get("start_date", "")
silver_adls = bronze_output.get("silver_adls", "")
gold_adls = bronze_output.get("gold_adls", "")

print(f"Start Date: {start_date}, Gold ADLS: {gold_adls}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-3957261520692013>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Retrieve the task value from the previous task (bronze & silver)[39;00m
[0;32m----> 2[0m bronze_output [38;5;241m=[39m dbutils[38;5;241m.[39mjobs[38;5;241m.[39mtaskValues[38;5;241m.[39mget(taskKey[38;5;241m=[39m[38;5;124m"[39m[38;5;124mbronze[39m[38;5;124m"[39m, key[38;5;241m=[39m[38;5;124m"[39m[38;5;124mbronze_output[39m[38;5;124m"[39m)
[1;32m      3[0m silver_data [38;5;241m=[39m dbutils[38;5;241m.[39mjobs[38;5;241m.[39mtaskValues[38;5;241m.[39mget(taskKey[38;5;241m=[39m[38;5;124m"[39m[38;5;124msilver[39m[38;5;124m"[39m, key[38;5;241m=[39m[38;5;124m"[39m[38;5;124msilver_output[39m[38;5;124m"[39m)
[1;32m      5[0m [38;5;66;03m# Access individual variables[39;00m

File [0;32

In [0]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
# Ensure the below library is installed on your cluster
import reverse_geocoder as rg
from datetime import date, timedelta

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

In [0]:
df = df.limit(10) # added to speed up processings as during testing it was proving a bottleneck
# The problem is caused by the Python UDF (reverse_geocoder) being a bottleneck due to its non-parallel nature and high computational cost per task

In [0]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates: {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coordinates: {lat}, {lon} -> {str(e)}")
        return None

In [0]:
# registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
get_country_code(48.8588443, 2.2943506)

Loading formatted geocoded file...
Processed coordinates: (48.8588443, 2.2943506) -> FR


'FR'

In [0]:
# adding country_code and city attributes
df_with_location = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

In [0]:
# adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class', 
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                            )

In [0]:
df_with_location_sig_class.head()

Row(id='us6000pihl', longitude=39.9762, latitude=9.0693, elevation=9.256, title='M 4.5 - 19 km NNE of Metahāra, Ethiopia', place_description='19 km NNE of Metahāra, Ethiopia', sig=312, mag=4.5, magType='mb', time=datetime.datetime(2025, 1, 7, 23, 26, 46, 702000), updated=datetime.datetime(2025, 1, 7, 23, 58, 20, 40000), country_code='ET', sig_class='Moderate')

In [0]:
# Save the transformed DataFrame to the Silver container
gold_output_path = f"{gold_adls}earthquake_events_gold/"

In [0]:
# Append DataFrame to Silver container in Parquet format
df_with_location_sig_class.write.mode('append').parquet(gold_output_path)