In [0]:
'''from datetime import date, timedelta

# Remove this before running Data Factory Pipeline
start_date = date.today() - timedelta(1)

silver_adls = "abfss://silver@earthquakeadlstorage.dfs.core.windows.net/"
gold_adls = "abfss://gold@earthquakeadlstorage.dfs.core.windows.net/"

silver_data = f"{silver_adls}earthquake_events_silver/"'''
import json

# Get base parameters
dbutils.widgets.text("bronze_params", "")
dbutils.widgets.text("silver_params", "")

bronze_params = dbutils.widgets.get("bronze_params")
silver_params = dbutils.widgets.get("silver_params")

# Debug: Print the raw input values for troubleshooting
print(f"Raw bronze_params: {bronze_params}")
print(f"Raw silver_params: {silver_params}")

# Parse the JSON string
bronze_data = json.loads(bronze_params)

# Access individual variables
start_date = bronze_data.get("start_date", "")
end_date = bronze_data.get("end_date", "")
silver_adls = bronze_data.get("silver_adls", "")
gold_adls = bronze_data.get("gold_adls", "")
silver_data = silver_params

# Debug: Print the extracted values for verification
print(f"Start Date: {start_date}, End Date: {end_date}")
print(f"Silver ADLS Path: {silver_adls}, Gold ADLS Path: {gold_adls}")
     

In [0]:
from datetime import date, timedelta
from pyspark.sql.functions import col, when, udf, unix_timestamp, lit
from pyspark.sql.types import StringType
import reverse_geocoder as rg

In [0]:
df = spark.read.parquet(silver_data).filter(col('time') > start_date)

silver_path = f"{silver_adls}earthquake_events_silver/"

# Read silver
df = spark.read.parquet(silver_path)

In [0]:
# Convert BIGINT time â†’ timestamp and filter
df = df.withColumn("time_ts", (col("time")/1000).cast("timestamp")) \
       .filter(col("time_ts") > start_date)

In [0]:
df = df.limit(100)   # testing only

In [0]:
@udf(StringType())
def get_country_code(lat, lon):
    try:
        result = rg.search((float(lat), float(lon)))[0].get('cc')
        return result
    except:
        return None

In [0]:
# Add country
df = df.withColumn("country_code", get_country_code(col("latitude"), col("longitude")))


In [0]:
# Add significance class
df = df.withColumn(
        'sig_class',
        when(col("sig") < 100, "Low")
        .when((col("sig") >= 100) & (col("sig") < 500), "Moderate")
        .otherwise("High")
    )

In [0]:

df.printSchema()

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: long (nullable = true)
 |-- updated: long (nullable = true)
 |-- time_ts: timestamp (nullable = true)
 |-- country_code: string (nullable = true)
 |-- sig_class: string (nullable = false)



In [0]:
df_with_location_sig_class = \
                            df.\
                                withColumn('sig_class', 
                                            when(col("sig") < 100, "Low").\
                                            when((col("sig") >= 100) & (col("sig") < 500), "Moderate").\
                                            otherwise("High")
                                            )
     

In [0]:
df_with_location_sig_class.printSchema()
     

root
 |-- id: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- elevation: double (nullable = true)
 |-- title: string (nullable = true)
 |-- place_description: string (nullable = true)
 |-- sig: long (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- time: long (nullable = true)
 |-- updated: long (nullable = true)
 |-- time_ts: timestamp (nullable = true)
 |-- country_code: string (nullable = true)
 |-- sig_class: string (nullable = false)



In [0]:
# Save the transformed DataFrame to the Silver container
gold_output_path = f"{gold_adls}earthquake_events_gold/"

In [0]:

# Append DataFrame to Silver container in Parquet format
df_with_location_sig_class.write.mode('append').parquet(gold_output_path)