In [0]:
# Retrieve the task value from the previous task (bronze & silver)
bronze_output = dbutils.jobs.taskValues.get(taskKey="Bronze_task", key="bronze_output")
silver_data = dbutils.jobs.taskValues.get(taskKey="Silver_task", key="silver_output")

# Access individual variables
start_date = bronze_output.get("start_date", "")
silver_adls = bronze_output.get("silver_adls", "")
gold_adls = bronze_output.get("gold_adls", "")


In [0]:
from pyspark.sql.functions import col, when, udf
from pyspark.sql.types import StringType

import reverse_geocoder as rg
from datetime import date, timedelta


In [0]:
# Read the most recent data from the silver table 
df = spark.read.parquet(silver_data).filter(col("time") > start_date)

In [0]:
# Limit data being processed to allow for faster testing and to keep cost low on Azure

df = df.limit(10)

In [0]:
def get_country_code (lat, lon):
    """
    Retrieve country code for a given lattitude and longitude

    Parameters
    lat (float or str): Lattitude of location
    lon (float or str): Longitude of location

    Returns
    str: County code of location, retrieveed using reverse geocoding API
    """

    try:
        coordinates = (float(lat), float(lon))
        result = rg.search(coordinates)[0].get('cc')
        print(f"Processed coordinates {coordinates} -> {result}")
        return result
    except Exception as e:
        print(f"Error processing coodinates {lat},{lon} --> {str(e)}")
        return None

In [0]:
# Registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

In [0]:
# Adding country code and city attributes
df_with_location = \
                df.\
                withColumn("country_code",get_country_code_udf(col("latitude"),col('longitude')))     

In [0]:
# Adding significance classification
df_with_location_sig_class = \
                df_with_location.\
                withColumn("sig_class",
                           when(col("sig") < 100, "Low").\
                           when((col("sig") >= 100) & (col("sig") < 500),"Modeerate").\
                           otherwise("High") 
                           )

In [0]:
# Save the transformed dataframe to the golf storage container
# Set silver output path variable
gold_output_path = f"{gold_adls}earthquke_events_gold/"

# Write dataframe as a parquet file to gold container
df.write.mode('append').parquet(gold_output_path)