In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, when, year, month, avg, count, max, min, 
    sum as spark_sum, stddev, round as spark_round,
    desc, asc, sqrt, pow as spark_pow, lit, 
    udf, array, struct, explode, window, to_timestamp,
    date_format, dayofweek, quarter, weekofyear,
    coalesce, isnan, isnull, abs as spark_abs
)
from pyspark.sql.types import (
    StructType, StructField, FloatType, IntegerType, 
    DoubleType, StringType, BooleanType
)
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator, 
    MulticlassClassificationEvaluator,
    RegressionEvaluator
)
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import time
import matplotlib.pyplot as plt
import seaborn as sns

In [0]:
earthquake_schema = StructType([
    StructField("magnitude", FloatType(), True),
    StructField("cdi", FloatType(), True),
    StructField("mmi", FloatType(), True),
    StructField("sig", IntegerType(), True),
    StructField("nst", IntegerType(), True),
    StructField("dmin", FloatType(), True),
    StructField("gap", FloatType(), True),
    StructField("depth", FloatType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Month", IntegerType(), True),
    StructField("tsunami", IntegerType(), True)
])

In [0]:
df_raw = spark.read \
    .option("header", "true") \
    .schema(earthquake_schema) \
    .csv("/Volumes/workspace/default/data/earthquake_data_tsunami.csv") 

In [0]:
df_raw.printSchema()
df_raw.show(5, truncate=False)

root
 |-- magnitude: float (nullable = true)
 |-- cdi: float (nullable = true)
 |-- mmi: float (nullable = true)
 |-- sig: integer (nullable = true)
 |-- nst: integer (nullable = true)
 |-- dmin: float (nullable = true)
 |-- gap: float (nullable = true)
 |-- depth: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- tsunami: integer (nullable = true)

+---------+---+---+---+---+-----+----+-------+--------+---------+----+-----+-------+
|magnitude|cdi|mmi|sig|nst|dmin |gap |depth  |latitude|longitude|Year|Month|tsunami|
+---------+---+---+---+---+-----+----+-------+--------+---------+----+-----+-------+
|7.0      |8.0|7.0|768|117|0.509|17.0|14.0   |-9.7963 |159.596  |2022|11   |1      |
|6.9      |4.0|4.0|735|99 |2.229|34.0|25.0   |-4.9559 |100.738  |2022|11   |0      |
|7.0      |3.0|3.0|755|147|3.125|18.0|579.0  |-20.0508|-178.346 |2022|11   |1      |
|7.3 

In [0]:
# Transformation 1: calculated columns


df_transformed = df_raw \
    .withColumn("magnitude_category", 
                when(col("magnitude") < 5.0, "Minor")
                .when((col("magnitude") >= 5.0) & (col("magnitude") < 6.0), "Light")
                .when((col("magnitude") >= 6.0) & (col("magnitude") < 7.0), "Moderate")
                .when((col("magnitude") >= 7.0) & (col("magnitude") < 8.0), "Major")
                .otherwise("Great")) \
    .withColumn("depth_category",
                when(col("depth") < 70, "Shallow")
                .when((col("depth") >= 70) & (col("depth") < 300), "Intermediate")
                .otherwise("Deep")) \
    .withColumn("distance_from_equator", 
                sqrt(spark_pow(col("latitude"), 2))) \
    .withColumn("tsunami_risk_score",
                when(col("tsunami") == 1, col("magnitude") * 10 + col("depth") / 10)
                .otherwise(0)) \
    .withColumn("seismic_intensity",
                col("magnitude") * col("sig") / 100) \
    .withColumn("quarter", quarter(lit(f"{df_raw.select('Year').first()[0]}-01-01"))) \
    .withColumn("hemisphere",
                when(col("latitude") >= 0, "Northern").otherwise("Southern")) \
    .withColumn("ocean_region",
                when((col("longitude") >= -180) & (col("longitude") < -30), "Atlantic")
                .when((col("longitude") >= -30) & (col("longitude") < 75), "Indian")
                .when((col("longitude") >= 75) & (col("longitude") <= 180), "Pacific")
                .otherwise("Unknown"))

In [0]:
# Filter 1: major earthquakes (magnitude >= 6.5)
df_major = df_transformed.filter(col("magnitude") >= 6.5)
print(f"Major earthquakes (mag >= 6.5): {df_major.count()}")


Major earthquakes (mag >= 6.5): 782


In [0]:
# Filter 2: tsunami-generating earthquakes
df_tsunami = df_transformed.filter(col("tsunami") == 1)
print(f"Tsunami-generating earthquakes: {df_tsunami.count()}")



Tsunami-generating earthquakes: 304


In [0]:
# Filter 3: Filter recent earthquakes (2021-2022)
df_recent = df_transformed.filter(col("Year").isin(2021, 2022))
print(f"Recent earthquakes (2021-2022): {df_recent.count()}")



Recent earthquakes (2021-2022): 82


In [0]:
# GroupBy Statistics by magnitude category
magnitude_stats = df_transformed \
    .groupBy("magnitude_category") \
    .agg(
        count("*").alias("earthquake_count"),
        spark_round(avg("magnitude"), 2).alias("avg_magnitude"),
        spark_round(max("magnitude"), 2).alias("max_magnitude"),
        spark_round(min("magnitude"), 2).alias("min_magnitude"),
        spark_round(avg("depth"), 2).alias("avg_depth"),
        spark_sum(col("tsunami")).alias("tsunami_events"),
        spark_round(stddev("magnitude"), 3).alias("magnitude_std")
    ) \
    .orderBy("earthquake_count", ascending=False)

magnitude_stats.show()

+------------------+----------------+-------------+-------------+-------------+---------+--------------+-------------+
|magnitude_category|earthquake_count|avg_magnitude|max_magnitude|min_magnitude|avg_depth|tsunami_events|magnitude_std|
+------------------+----------------+-------------+-------------+-------------+---------+--------------+-------------+
|          Moderate|             499|         6.67|          6.9|          6.5|    72.88|           194|         0.14|
|             Major|             255|         7.32|          7.9|          7.0|    82.05|           100|        0.273|
|             Great|              28|         8.28|          9.1|          8.0|    73.23|            10|        0.304|
+------------------+----------------+-------------+-------------+-------------+---------+--------------+-------------+



In [0]:
 #GroupBy THE Yearly tsunami statistics
yearly_tsunami_stats = df_transformed \
    .groupBy("Year") \
    .agg(
        count("*").alias("total_earthquakes"),
        spark_sum(col("tsunami")).alias("tsunami_count"),
        spark_round(avg(when(col("tsunami") == 1, col("magnitude"))), 2).alias("avg_tsunami_magnitude"),
        spark_round(avg(when(col("tsunami") == 1, col("depth"))), 2).alias("avg_tsunami_depth")
    ) \
    .withColumn("tsunami_percentage", 
                spark_round(col("tsunami_count") * 100.0 / col("total_earthquakes"), 2)) \
    .orderBy("Year", ascending=False)

yearly_tsunami_stats.show()

+----+-----------------+-------------+---------------------+-----------------+------------------+
|Year|total_earthquakes|tsunami_count|avg_tsunami_magnitude|avg_tsunami_depth|tsunami_percentage|
+----+-----------------+-------------+---------------------+-----------------+------------------+
|2022|               40|           32|                 6.86|           133.52|              80.0|
|2021|               42|           33|                 7.09|            65.54|             78.57|
|2020|               27|           15|                 7.09|             54.0|             55.56|
|2019|               33|           26|                 6.88|            84.94|             78.79|
|2018|               43|           33|                 6.99|            114.0|             76.74|
|2017|               36|           27|                 6.85|             76.0|              75.0|
|2016|               43|           31|                 7.02|            84.94|             72.09|
|2015|              

In [0]:
# GroupBy Regional analysis
regional_analysis = df_transformed \
    .groupBy("ocean_region", "depth_category") \
    .agg(
        count("*").alias("event_count"),
        spark_round(avg("magnitude"), 2).alias("avg_magnitude"),
        spark_sum("tsunami").alias("tsunami_events"),
        spark_round(avg("seismic_intensity"), 2).alias("avg_intensity")
    ) \
    .orderBy(["ocean_region", "event_count"], ascending=[True, False])

regional_analysis.show()

+------------+--------------+-----------+-------------+--------------+-------------+
|ocean_region|depth_category|event_count|avg_magnitude|tsunami_events|avg_intensity|
+------------+--------------+-----------+-------------+--------------+-------------+
|    Atlantic|       Shallow|        191|         6.96|            97|        69.31|
|    Atlantic|  Intermediate|         33|         6.98|            17|        66.55|
|    Atlantic|          Deep|         22|          6.9|            12|        51.64|
|      Indian|       Shallow|         48|         6.85|            12|         66.1|
|      Indian|  Intermediate|          9|         7.02|             2|         64.8|
|     Pacific|       Shallow|        380|         6.94|           123|        57.94|
|     Pacific|  Intermediate|         69|         6.91|            28|        55.26|
|     Pacific|          Deep|         30|         6.99|            13|        54.99|
+------------+--------------+-----------+-------------+----------

In [0]:
df_main = df_transformed.alias("main")


In [0]:
df_after = df_transformed.alias("after")


In [0]:
related_earthquakes = df_main.join(
    df_after,
    (df_main["Year"] == df_after["Year"]) &
    (spark_abs(df_main["Month"] - df_after["Month"]) <= 1) &
    (spark_abs(df_main["latitude"] - df_after["latitude"]) < 5) &
    (spark_abs(df_main["longitude"] - df_after["longitude"]) < 5) &
    (df_main["magnitude"] > df_after["magnitude"]) &
    (df_main["magnitude"] >= 6.5),
    "inner"
) \
.select(
    df_main["magnitude"].alias("main_magnitude"),
    df_main["latitude"].alias("main_lat"),
    df_main["longitude"].alias("main_lon"),
    df_main["Year"].alias("year"),
    df_main["Month"].alias("main_month"),
    df_after["magnitude"].alias("related_magnitude"),
    df_after["Month"].alias("related_month"),
    df_after["tsunami"].alias("related_tsunami")
) \
.withColumn("magnitude_difference", 
            col("main_magnitude") - col("related_magnitude"))

print("Potential related earthquakes (mainshock-aftershock pairs):")
related_earthquakes.show(10)

Potential related earthquakes (mainshock-aftershock pairs):
+--------------+--------+--------+----+----------+-----------------+-------------+---------------+--------------------+
|main_magnitude|main_lat|main_lon|year|main_month|related_magnitude|related_month|related_tsunami|magnitude_difference|
+--------------+--------+--------+----+----------+-----------------+-------------+---------------+--------------------+
|           7.2| -38.355| -73.326|2011|         1|              6.7|            2|              0|                 0.5|
|           7.2| -38.355| -73.326|2011|         1|              6.9|            2|              0|           0.2999997|
|           7.7|  -0.414| 132.885|2009|         1|              7.4|            1|              0|           0.2999997|
|           7.2|   -17.6| 167.856|2002|         1|              6.6|            1|              0|           0.5999999|
|           7.5|   6.898| 126.579|2001|         1|              6.8|            1|              0|  

In [0]:
df_transformed.createOrReplaceTempView("earthquakes")


In [0]:
sql_query1 = """
    SELECT 
        magnitude,
        depth,
        latitude,
        longitude,
        Year,
        Month,
        sig as significance,
        tsunami_risk_score,
        magnitude_category,
        ocean_region
    FROM earthquakes
    WHERE tsunami = 1
    ORDER BY magnitude DESC, sig DESC
    LIMIT 10
"""

In [0]:
print("SQL Query 1: Top 10 tsunami-generating earthquakes by magnitude")
top_tsunamis = spark.sql(sql_query1)
top_tsunamis.show()

SQL Query 1: Top 10 tsunami-generating earthquakes by magnitude
+---------+------+--------+---------+----+-----+------------+------------------+------------------+------------+
|magnitude| depth|latitude|longitude|Year|Month|significance|tsunami_risk_score|magnitude_category|ocean_region|
+---------+------+--------+---------+----+-----+------------+------------------+------------------+------------+
|      8.3| 22.44|-31.5729| -71.6744|2015|    9|        1960| 85.24400196075439|             Great|    Atlantic|
|      8.3| 598.1|  54.892|  153.221|2013|    5|        1115|142.80999946594238|             Great|     Pacific|
|      8.2| 47.39| 15.0222| -93.8993|2017|    9|        2910|  86.7389980316162|             Great|    Atlantic|
|      8.2|  25.0|-19.6097| -70.7691|2014|    4|        1332| 84.49999809265137|             Great|    Atlantic|
|      8.2|  35.0| 55.3154| -157.829|2021|    7|        1252| 85.49999809265137|             Great|    Atlantic|
|      8.2| 46.66| 55.4742| -157

In [0]:
# SQL Query 2: Monthly statistics with window functions
sql_query2 = """
    WITH monthly_stats AS (
        SELECT 
            Year,
            Month,
            COUNT(*) as monthly_count,
            AVG(magnitude) as avg_mag,
            MAX(magnitude) as max_mag,
            SUM(tsunami) as tsunami_count,
            AVG(depth) as avg_depth
        FROM earthquakes
        GROUP BY Year, Month
    )
    SELECT 
        Year,
        Month,
        monthly_count,
        ROUND(avg_mag, 2) as avg_magnitude,
        max_mag as max_magnitude,
        tsunami_count,
        ROUND(avg_depth, 1) as avg_depth,
        SUM(monthly_count) OVER (PARTITION BY Year ORDER BY Month) as cumulative_yearly_count,
        RANK() OVER (PARTITION BY Year ORDER BY max_mag DESC) as magnitude_rank_in_year
    FROM monthly_stats
    ORDER BY Year DESC, Month DESC
    LIMIT 20
"""

In [0]:
print("\nSQL Query 2: Monthly statistics with window functions")
monthly_analysis = spark.sql(sql_query2)
monthly_analysis.show()


SQL Query 2: Monthly statistics with window functions
+----+-----+-------------+-------------+-------------+-------------+---------+-----------------------+----------------------+
|Year|Month|monthly_count|avg_magnitude|max_magnitude|tsunami_count|avg_depth|cumulative_yearly_count|magnitude_rank_in_year|
+----+-----+-------------+-------------+-------------+-------------+---------+-----------------------+----------------------+
|2022|   11|            7|         6.94|          7.3|            6|    367.1|                     40|                     2|
|2022|   10|            1|          6.7|          6.7|            1|     20.0|                     33|                     8|
|2022|    9|            7|          7.0|          7.6|            6|     47.4|                     32|                     1|
|2022|    8|            1|          6.6|          6.6|            1|     30.0|                     25|                     9|
|2022|    7|            1|          7.0|          7.0|         

In [0]:
# SQL Query 3: Complex analysis with CTEs
sql_query3 = """
    WITH depth_categories AS (
        SELECT 
            CASE 
                WHEN depth < 70 THEN 'Shallow'
                WHEN depth < 300 THEN 'Intermediate'
                ELSE 'Deep'
            END as depth_cat,
            magnitude,
            tsunami,
            sig
        FROM earthquakes
    ),
    category_stats AS (
        SELECT 
            depth_cat,
            COUNT(*) as count,
            AVG(magnitude) as avg_mag,
            STDDEV(magnitude) as std_mag,
            SUM(tsunami) as tsunami_total,
            AVG(sig) as avg_significance
        FROM depth_categories
        GROUP BY depth_cat
    )
    SELECT 
        depth_cat as depth_category,
        count as earthquake_count,
        ROUND(avg_mag, 3) as average_magnitude,
        ROUND(std_mag, 3) as magnitude_std_dev,
        tsunami_total,
        ROUND(100.0 * tsunami_total / count, 2) as tsunami_percentage,
        ROUND(avg_significance, 1) as avg_significance_score
    FROM category_stats
    ORDER BY earthquake_count DESC
"""

In [0]:
print("\nSQL Query 3: Statistical analysis by depth category")
depth_analysis = spark.sql(sql_query3)
depth_analysis.show()


SQL Query 3: Statistical analysis by depth category
+--------------+----------------+-----------------+-----------------+-------------+------------------+----------------------+
|depth_category|earthquake_count|average_magnitude|magnitude_std_dev|tsunami_total|tsunami_percentage|avg_significance_score|
+--------------+----------------+-----------------+-----------------+-------------+------------------+----------------------+
|       Shallow|             619|             6.94|            0.455|          232|             37.48|                 883.2|
|  Intermediate|             111|             6.94|            0.386|           47|             42.34|                 847.2|
|          Deep|              52|            6.956|            0.452|           25|             48.08|                 763.1|
+--------------+----------------+-----------------+-----------------+-------------+------------------+----------------------+



In [0]:
# Demonstrate impact of caching
start = time.time()
count1 = df_transformed.filter(col("magnitude") > 6.0).count()
time1 = time.time() - start
print(f"First action (count): {count1} rows in {time1:.3f} seconds")

start = time.time()
avg1 = df_transformed.filter(col("magnitude") > 6.0).agg(avg("depth")).collect()[0][0]
time2 = time.time() - start
print(f"Second action (avg): {avg1:.2f} depth in {time2:.3f} seconds")

First action (count): 782 rows in 0.389 seconds
Second action (avg): 75.88 depth in 0.321 seconds


In [0]:

# Inefficient query (filter after join)
print("Inefficient approach - filter after operations:")
start = time.time()
inefficient = df_transformed \
    .select("*") \
    .withColumn("complex_calc", col("magnitude") * col("sig") * col("depth")) \
    .filter(col("magnitude") > 7.0) \
    .count()
inefficient_time = time.time() - start
print(f"Time: {inefficient_time:.3f} seconds")


Inefficient approach - filter after operations:
Time: 0.460 seconds


In [0]:
# transformations 
trans_start = time.time()
lazy_df1 = df_raw.filter(col("magnitude") > 5.0)
print(f"Transformation 1 (filter): {time.time() - trans_start:.4f} seconds")

Transformation 1 (filter): 0.0003 seconds


In [0]:
trans_start = time.time()
lazy_df2 = lazy_df1.select("magnitude", "depth", "tsunami")
print(f"Transformation 2 (select): {time.time() - trans_start:.4f} seconds")

trans_start = time.time()
lazy_df3 = lazy_df2.withColumn("magnitude_squared", col("magnitude") ** 2)
print(f"Transformation 3 (withColumn): {time.time() - trans_start:.4f} seconds")

trans_start = time.time()
lazy_df4 = lazy_df3.groupBy("tsunami").agg(avg("magnitude_squared"))
print(f"Transformation 4 (groupBy + agg): {time.time() - trans_start:.4f} seconds")

Transformation 2 (select): 0.0003 seconds
Transformation 3 (withColumn): 0.0002 seconds
Transformation 4 (groupBy + agg): 0.0002 seconds


In [0]:
# Machine Learning
ml_data = df_transformed.select(
    "magnitude", "depth", "cdi", "mmi", "sig", "nst", 
    "dmin", "gap", "latitude", "longitude", "tsunami"
).na.drop()  # Remove any null values

print(f"ML Dataset size: {ml_data.count()} records")

ML Dataset size: 782 records


In [0]:
# These are transformations - (Lazy - No Execution)
trans_start = time.time()
lazy_df1 = df_raw.filter(col("magnitude") > 5.0)
print(f"Transformation 1 (filter): {time.time() - trans_start:.4f} seconds")

trans_start = time.time()
lazy_df2 = lazy_df1.select("magnitude", "depth", "tsunami")
print(f"Transformation 2 (select): {time.time() - trans_start:.4f} seconds")

trans_start = time.time()
lazy_df3 = lazy_df2.withColumn("magnitude_squared", col("magnitude") ** 2)
print(f"Transformation 3 (withColumn): {time.time() - trans_start:.4f} seconds")

trans_start = time.time()
lazy_df4 = lazy_df3.groupBy("tsunami").agg(avg("magnitude_squared"))
print(f"Transformation 4 (groupBy + agg): {time.time() - trans_start:.4f} seconds")

print("\nNo actual computation performed yet - just building execution plan!")



Transformation 1 (filter): 0.0003 seconds
Transformation 2 (select): 0.0005 seconds
Transformation 3 (withColumn): 0.0002 seconds
Transformation 4 (groupBy + agg): 0.0002 seconds

No actual computation performed yet - just building execution plan!


In [0]:
assembler = VectorAssembler(
    inputCols=["magnitude", "depth", "sig", "latitude", "longitude", "gap"],
    outputCol="features"
)
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)

train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)
print(f"Training set: {train_data.count()} records")
print(f"Test set: {test_data.count()} records")

print("\nClassification Model: Tsunami Prediction")

rf_classifier = RandomForestClassifier(
    featuresCol="scaled_features",
    labelCol="tsunami",
    numTrees=100,
    maxDepth=5,
    seed=42
)

#  pipeline
classification_pipeline = Pipeline(stages=[assembler, scaler, rf_classifier])

# Train model
print("Training Random Forest model...")
start = time.time()
rf_model = classification_pipeline.fit(train_data)
print(f"Training completed in {time.time() - start:.2f} seconds")

# Make predictions
predictions = rf_model.transform(test_data)

# Evaluate
evaluator = BinaryClassificationEvaluator(
    labelCol="tsunami",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"Model AUC-ROC: {auc:.4f}")

Training set: 633 records
Test set: 149 records

Classification Model: Tsunami Prediction
Training Random Forest model...
Training completed in 3.80 seconds
Model AUC-ROC: 0.7246
