# Worldwide Earthquake Events API - Gold Layer Processing

In [None]:
from pyspark.sql.functions import when, col, udf  # For conditional logic, column operations, and user-defined functions in Spark
from pyspark.sql.types import StringType         # For specifying return type of UDFs
# Ensure the below library is installed on your Fabric environment for reverse geocoding
import reverse_geocoder as rg

# ---------------------------
# Gold Layer: Enrichment and Business Logic for Earthquake Data
# ---------------------------

# Read cleansed data from the silver layer, filtering for recent events (after start_date).
# - The 'earthquake_events_silver' table is the output from the silver layer.
df = spark.read.table("earthquake_events_silver").filter(col('time') > start_date)

# Define a function to retrieve the country code for a given latitude and longitude.
# - Uses the reverse_geocoder library to map coordinates to country codes.
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_code(48.8588443, 2.2943506)
    'FR'
    """
    coordinates = (float(lat), float(lon))
    return rg.search(coordinates)[0].get('cc')

# Register the UDF so it can be used on Spark DataFrames.
get_country_code_udf = udf(get_country_code, StringType())

# Add a 'country_code' column to the DataFrame using the UDF.
df_with_location = (
    df
    .withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))
)

# Add a significance classification column ('sig_class') based on the 'sig' value.
# - Classifies events as 'Low', 'Moderate', or 'High' significance.
df_with_location_sig_class = (
    df_with_location
    .withColumn(
        'sig_class',
        when(col("sig") < 100, "Low")
        .when((col("sig") >= 100) & (col("sig") < 500), "Moderate")
        .otherwise("High")
    )
)

# Write the enriched and classified data to the gold table.
# - Uses 'append' mode to add new records without overwriting.
# - The table 'earthquake_events_gold' serves as the gold layer in the medallion architecture.
df_with_location_sig_class.write.mode('append').saveAsTable('earthquake_events_gold')

# ---------------------------
# Additional Info:
# - The gold layer adds business logic and enrichment (e.g., country code, significance class) to the cleansed data.
# - This layer is optimized for analytics