# Worldwide Earthquake Events API - Gold Layer Processing

In [5]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType
# ensure the below library is installed on your fabric environment
import reverse_geocoder as rg

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 9, Finished, Available, Finished)

In [1]:
df = spark.read.table("earthquake_events_silver").filter(col('time') > start_date)

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 5, Finished, Available, Finished)

In [3]:
def get_country_code(lat, lon):
    """
    Retrieve the country code for a given latitude and longitude.

    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code of the location, retrieved using the reverse geocoding API.

    Example:
    >>> get_country_details(48.8588443, 2.2943506)
    'FR'
    """
    coordinates = (float(lat), float(lon))
    return rg.search(coordinates)[0].get('cc')

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 7, Finished, Available, Finished)

In [6]:
# registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 10, Finished, Available, Finished)

In [7]:
# adding country_code and city attributes
df_with_location = \
                df.\
                    withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 11, Finished, Available, Finished)

In [9]:
# adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class', 
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                            )

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 13, Finished, Available, Finished)

In [11]:
# appending the data to the gold table
df_with_location_sig_class.write.mode('append').saveAsTable('earthquake_events_gold')

StatementMeta(, 754e2140-a38e-43be-a624-6260e9cd1cf8, 15, Finished, Available, Finished)