In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, median, when, lit, unix_timestamp, to_timestamp, from_unixtime, round
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("Seismic_Data_Quality").getOrCreate()

# Load raw seismic data from Delta Table
df = spark.read.format("delta").table("tabular.dataexpert.iris_seismic_events_2018_2024")

# Rename Columns for Consistency
df = df.select(
    col("EventId").alias("event_id"),
    col("Time").alias("time"),
    col("Latitude").alias("latitude"),
    col("Longitude").alias("longitude"),
    col("Depth/km").alias("depth"),
    col("Author").alias("author"),
    col("Catalog").alias("catalog"),
    col("Contributor").alias("contributor"),
    col("ContributorID").alias("contributor_id"),
    col("MagType").alias("mag_type"),
    col("Magnitude").alias("magnitude"),
    col("MagAuthor").alias("mag_author"),
    col("EventLocationName").alias("event_location_name")
)
# Show Sample Data Before Cleaning
#df.show(5)
#df.count()

In [0]:
# Define Partition Window for Deduplication
window_spec = Window.partitionBy("event_id", "time", "longitude", "latitude")

# Deduplicate records & resolve conflicting magnitude
df = df.withColumn("median_magnitude", median("magnitude").over(window_spec))
df = df.withColumn("final_magnitude", when(col("magnitude").isNull(), col("median_magnitude")).otherwise(col("magnitude")))

# Round longitude and latitude till 2 decimal places
df = df.withColumn("latitude", round(col("latitude"), 2)).withColumn("longitude", round(col("longitude"), 2))

# Drop duplicate events
df = df.dropDuplicates(["event_id", "time", "longitude", "latitude"]).drop("median_magnitude", "magnitude").withColumnRenamed("final_magnitude", "magnitude")


In [0]:
# Compute Regional Average Depth (For Missing Values)
depth_avg = df.groupBy("latitude", "longitude").agg(avg("depth").alias("regional_avg_depth"))

# Join with original dataset to fill missing depth
df = df.join(depth_avg, ["latitude", "longitude"], "left").withColumn(
    "depth", when(col("depth").isNull(), col("regional_avg_depth")).otherwise(col("depth"))
).drop("regional_avg_depth")

# Fill Missing Coordinates by Using Closest Known Event
df = df.withColumn(
    "latitude", when(col("latitude").isNull(), lit(0)).otherwise(col("latitude"))
).withColumn(
    "longitude", when(col("longitude").isNull(), lit(0)).otherwise(col("longitude"))
)

In [0]:
df = df.filter(
    (col("latitude").between(-90, 90)) &
    (col("longitude").between(-180, 180)) &
    (col("magnitude").between(0, 10)) 
)

In [0]:
#df = df.withColumn("time", from_unixtime(unix_timestamp(col("time")), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("time", to_timestamp(col("time"), "yyyy-MM-dd'T'HH:mm:ss"))

In [0]:
# Partition Data by Year and Month
df = df.withColumn("year", col("time").substr(1, 4)).withColumn("month", col("time").substr(6, 2))

# Write Clean Data to Delta Table
df.write.format("delta").mode("overwrite").partitionBy("year", "month").saveAsTable("tabular.dataexpert.iris_seismic_events_silver")

print("Seismic Data Quality Checks Completed & Stored in Delta Table!")
