## **Worldwide Earthquake Events API - Gold Layer Processing**

In [2]:
from pyspark.sql.functions import when, col, udf
from pyspark.sql.types import StringType

#ensure the below library is installed on your fabric environment
import reverse_geocoder as rg

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 6, Finished, Available, Finished)

In [3]:
df = spark.read.table("earthquake_events_silver").filter(col('time') > start_date)

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 7, Finished, Available, Finished)

In [4]:
def get_country_code(lat, lon):
    """
    Retrive the country code for a given latitude and longitude.
    
    Parameters:
    lat (float or str): Latitude of the location.
    lon (float or str): Longitude of the location.

    Returns:
    str: Country code od the location, retrieved using the reverse geocoding API.

    Example:
    >> get_country_details(48.85.88.443, 2.2943506)
    'FR'
    """

    coordinates = (float(lat), float(lon))
    return rg.search(coordinates)[0].get('cc')

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 8, Finished, Available, Finished)

In [5]:
# Registering the udfs so they can be used on spark dataframes
get_country_code_udf = udf(get_country_code, StringType())

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 9, Finished, Available, Finished)

In [6]:
# Adding country_code and city attributes
df_with_location = \
            df.\
                withColumn("country_code", get_country_code_udf(col("latitude"), col("longitude")))

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 10, Finished, Available, Finished)

In [7]:
# Adding significance classification
df_with_location_sig_class = \
                            df_with_location.\
                                withColumn('sig_class',
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                           )

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 11, Finished, Available, Finished)

In [8]:
# appending the data to the gold table
df_with_location_sig_class.write.mode('append').saveAsTable('earthquake_events_gold')

StatementMeta(, 00204ca2-5130-420f-9c70-451a7baa5fed, 12, Finished, Available, Finished)