In [None]:
import requests
import json
import math
from datetime import datetime, timedelta
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_timestamp, year, month as pyspark_month, hour, dayofweek,
    when, expr, lit, udf
)
from pyspark.sql.types import (
    StructType, StructField, DoubleType, StringType,
    TimestampType, IntegerType, ArrayType
)

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer
from pyspark.ml.regression import RandomForestRegressionModel

from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import to_timestamp, col

from pyspark.sql.functions import when

In [None]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Earthquake Analysis") \
    .getOrCreate()


# **USGS DATA ML MODEL**

## **Data Preprocessing** - **Historical data**

### Defining functions to get data from USGS API

In [None]:
# Generating year-month combinations
def get_year_month_pairs(start_year, end_year):
    pairs = []
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):
            pairs.append((year, month))
    return pairs

# Building the API URL for the specific month
def build_url(year, month):
    start = f"{year}-{month:02d}-01"
    if month == 12:
        end = f"{year+1}-01-01"
    else:
        end = f"{year}-{month+1:02d}-01"
    return (
        f"https://earthquake.usgs.gov/fdsnws/event/1/query?"
        f"format=geojson&starttime={start}&endtime={end}&minmagnitude=1"
    )

### Fetches the data month wise from 2022 to 2024

In [None]:
# Collecting data into a Pandas DataFrame for easier type conversions
records = []
for (year, month) in get_year_month_pairs(2022, 2024):
    url = build_url(year, month)
    print(f"Fetching {year}-{month:02d}...")
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()

        for f in data['features']:
            props = f['properties']
            coords = f['geometry']['coordinates']

            try:
                time_ms = props['time']
                time_val = datetime.fromtimestamp(time_ms / 1000) if time_ms else None

                record = {
                    'mag': float(props.get('mag')) if props.get('mag') is not None else None,
                    'time': time_val,
                    'tz': int(props.get('tz')) if props.get('tz') is not None else None,
                    'felt': int(props.get('felt')) if props.get('felt') is not None else None,
                    'cdi': float(props.get('cdi')) if props.get('cdi') is not None else None,
                    'mmi': float(props.get('mmi')) if props.get('mmi') is not None else None,
                    'alert': props.get('alert'),
                    'status': props.get('status'),
                    'tsunami': int(props.get('tsunami')) if props.get('tsunami') is not None else None,
                    'sig': int(props.get('sig')) if props.get('sig') is not None else None,
                    'nst': int(props.get('nst')) if props.get('nst') is not None else None,
                    'dmin': float(props.get('dmin')) if props.get('dmin') is not None else None,
                    'rms': float(props.get('rms')) if props.get('rms') is not None else None,
                    'gap': float(props.get('gap')) if props.get('gap') is not None else None,
                    'magType': props.get('magType'),
                    'type': props.get('type'),
                    'longitude': float(coords[0]) if coords[0] is not None else None,
                    'latitude': float(coords[1]) if coords[1] is not None else None,
                    'depth': float(coords[2]) if coords[2] is not None else None,
                }
                records.append(record)
            except Exception as inner_err:
                print(f"Skipping a record due to error: {inner_err}")
    except Exception as e:
        print(f"Failed for {year}-{month:02d}: {e}")
print("Done!")

Fetching 2022-01...
Fetching 2022-02...
Fetching 2022-03...
Fetching 2022-04...
Fetching 2022-05...
Fetching 2022-06...
Fetching 2022-07...
Fetching 2022-08...
Fetching 2022-09...
Fetching 2022-10...
Fetching 2022-11...
Fetching 2022-12...
Fetching 2023-01...
Fetching 2023-02...
Fetching 2023-03...
Fetching 2023-04...
Fetching 2023-05...
Fetching 2023-06...
Fetching 2023-07...
Fetching 2023-08...
Fetching 2023-09...
Fetching 2023-10...
Fetching 2023-11...
Fetching 2023-12...
Fetching 2024-01...
Fetching 2024-02...
Fetching 2024-03...
Fetching 2024-04...
Fetching 2024-05...
Fetching 2024-06...
Fetching 2024-07...
Fetching 2024-08...
Fetching 2024-09...
Fetching 2024-10...
Fetching 2024-11...
Fetching 2024-12...
Done!


### Defining the structure types for each feature

In [None]:

# Step 1: Create pandas DataFrame from records
pandas_df = pd.DataFrame(records)
pandas_df = pandas_df.where(pd.notnull(pandas_df), None)

# Step 2: Replace NaNs with None for integer columns
def safe_int(x):
    try:
        return int(x) if not pd.isna(x) and not math.isnan(x) else None
    except:
        return None

def safe_float(x):
    try:
        return float(x) if not pd.isna(x) and not math.isnan(x) else None
    except:
        return None

# Fix int-like columns
int_columns = ['tsunami', 'sig']
for col in int_columns:
    pandas_df[col] = pandas_df[col].apply(safe_int)

# Fix float columns
float_columns = ['mag', 'dmin', 'rms', 'gap', 'longitude', 'latitude', 'depth']
for col in float_columns:
    pandas_df[col] = pandas_df[col].apply(safe_float)

# Convert time to string (for later Spark to_timestamp conversion)
if 'time' in pandas_df.columns:
    pandas_df['time'] = pandas_df['time'].apply(lambda x: x.isoformat() if isinstance(x, datetime) else x if x is not None else None)

schema = StructType([
    StructField("mag", DoubleType(), True),
    StructField("time", StringType(), True),  # will cast to timestamp later
    StructField("status", StringType(), True),
    StructField("tsunami", IntegerType(), True),
    StructField("sig", IntegerType(), True),
    StructField("dmin", DoubleType(), True),
    StructField("rms", DoubleType(), True),
    StructField("gap", DoubleType(), True),
    StructField("magType", StringType(), True),
    StructField("type", StringType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("depth", DoubleType(), True),
])

### Creating spark dataframe

In [None]:
records = pandas_df.to_dict(orient="records")

df = spark.createDataFrame(records, schema=schema)

from pyspark.sql.functions import to_timestamp, col
df = df.withColumn("time", to_timestamp(col("time")))

df.printSchema()
df.show(5)

root
 |-- mag: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- tsunami: integer (nullable = true)
 |-- sig: integer (nullable = true)
 |-- dmin: double (nullable = true)
 |-- rms: double (nullable = true)
 |-- gap: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- type: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- depth: double (nullable = true)

+----+--------------------+--------+-------+---+-------+------+------+-------+----------+------------+----------+-----+
| mag|                time|  status|tsunami|sig|   dmin|   rms|   gap|magType|      type|   longitude|  latitude|depth|
+----+--------------------+--------+-------+---+-------+------+------+-------+----------+------------+----------+-----+
| 2.6|2022-01-31 23:54:...|reviewed|      0|104|  0.781|  0.33| 335.0|     ml|earthquake|    177.2704|   52.1081|13.14|
| 1.3|2022-01-31 23:49:...|

### Feature engineering

In [None]:
# Step 2: Extract time features
df = df.withColumn("hour", hour(col("time"))) \
       .withColumn("dayofweek", dayofweek(col("time"))) \
       .withColumn("month", pyspark_month(col("time"))) \

# Step 3: Drop nulls for magnitude (our target variable)
df = df.filter(col("mag").isNotNull())

# Step 4: Drop sparse columns (similar to pandas version)
df = df.drop("cdi", "mmi", "felt")

# Step 5: Handle categorical variables
categorical_cols = [c for c in ["alert", "status", "magType", "type"] if c in df.columns]

# Update indexed_cols and encoder_output_cols based on available columns
indexed_cols = [f"{c}_indexed" for c in categorical_cols]
encoder_output_cols = [f"{c}_encoded" for c in categorical_cols]

# Create stages for the pipeline
stages = []

# Indexing for categorical columns
for categorical_col, indexed_col in zip(categorical_cols, indexed_cols):
    indexer = StringIndexer(
        inputCol=categorical_col,
        outputCol=indexed_col,
        handleInvalid="keep"
    )
    stages.append(indexer)

# One-hot encoding for indexed columns
encoder = OneHotEncoder(
    inputCols=indexed_cols,
    outputCols=encoder_output_cols
)
stages.append(encoder)

# Collect all numeric columns (excluding the target 'mag')
numeric_cols = [col_name for col_name, data_type in df.dtypes
               if data_type in ("int", "double") and col_name != "mag"]

# Add the encoded categorical columns
feature_cols = numeric_cols + encoder_output_cols

imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=numeric_cols
).setStrategy("mean")

# Add the Imputer to the pipeline stages before the VectorAssembler
stages.insert(0, imputer)

# Vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="keep"
)
stages.append(assembler)

# Step 6: Create and apply the pipeline
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
transformed_df_hist = pipeline_model.transform(df)

# Save pipeline to use later on real-time data
pipeline_model.write().overwrite().save("models/pipeline_model")


## **Data Preprocessing** - **Real time data**

### Defining function to get real time data from USGS API

In [None]:
def build_url_last_hours(hours=2):
    end = datetime.utcnow()
    start = end - timedelta(hours=hours)
    return (
        f"https://earthquake.usgs.gov/fdsnws/event/1/query?"
        f"format=geojson&starttime={start.strftime('%Y-%m-%dT%H:%M:%S')}"
        f"&endtime={end.strftime('%Y-%m-%dT%H:%M:%S')}&minmagnitude=3"
    )

### Fetches the data for last 2 hours

In [None]:
records_real = []
url = build_url_last_hours()
print("Fetching today's seismic data...")

try:
    response = requests.get(url)
    response.raise_for_status()
    data = response.json()

    for f in data['features']:
        props = f['properties']
        coords = f['geometry']['coordinates']

        try:
            # Convert timestamp from milliseconds
            time_ms = props['time']
            time_val = datetime.fromtimestamp(time_ms / 1000) if time_ms else None

            record = {
                'mag': float(props.get('mag')) if props.get('mag') is not None else None,
                'time': time_val,
                'tz': int(props.get('tz')) if props.get('tz') is not None else None,
                'felt': int(props.get('felt')) if props.get('felt') is not None else None,
                'cdi': float(props.get('cdi')) if props.get('cdi') is not None else None,
                'mmi': float(props.get('mmi')) if props.get('mmi') is not None else None,
                'alert': props.get('alert'),
                'status': props.get('status'),
                'tsunami': int(props.get('tsunami')) if props.get('tsunami') is not None else None,
                'sig': int(props.get('sig')) if props.get('sig') is not None else None,
                'nst': int(props.get('nst')) if props.get('nst') is not None else None,
                'dmin': float(props.get('dmin')) if props.get('dmin') is not None else None,
                'rms': float(props.get('rms')) if props.get('rms') is not None else None,
                'gap': float(props.get('gap')) if props.get('gap') is not None else None,
                'magType': props.get('magType'),
                'type': props.get('type'),
                'longitude': float(coords[0]) if coords[0] is not None else None,
                'latitude': float(coords[1]) if coords[1] is not None else None,
                'depth': float(coords[2]) if coords[2] is not None else None,
            }
            records_real.append(record)
        except Exception as inner_err:
            print(f"Skipping a record due to error: {inner_err}")
except Exception as e:
    print(f"Failed to fetch today's data: {e}")
print("Done!")

Fetching today's seismic data...
Done!


In [None]:
import os, json, uuid

stream_dir = "stream_input"
os.makedirs(stream_dir, exist_ok=True)

if records_real:
    file_path = f"{stream_dir}/quake_{uuid.uuid4().hex}.json"
    with open(file_path, "w") as f:
        for record in records_real:
            # Convert datetime to string
            if isinstance(record.get("time"), datetime):
                record["time"] = record["time"].isoformat()
            json.dump(record, f)
            f.write("\n")
    print(f"Wrote {len(records_real)} records to {file_path}")
else:
    print("No new records found in this interval.")

Wrote 1 records to stream_input/quake_84e8defcca4b456ba2960e6236fedd6b.json


### Defining structure type for each feature

In [None]:
# Step 4: Define PySpark schema
from pyspark.sql.types import *

schema = StructType([
    StructField("mag", DoubleType(), True),
    StructField("time", StringType(), True),
    StructField("status", StringType(), True),
    StructField("tsunami", IntegerType(), True),
    StructField("sig", IntegerType(), True),
    StructField("dmin", DoubleType(), True),
    StructField("rms", DoubleType(), True),
    StructField("gap", DoubleType(), True),
    StructField("magType", StringType(), True),
    StructField("type", StringType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("depth", DoubleType(), True),
])


### Creating spark dataframe

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp

spark = SparkSession.builder.getOrCreate()

stream_df = spark.readStream \
    .schema(schema) \
    .json("stream_input")

# Fix timestamp
stream_df = stream_df.withColumn("time", to_timestamp(col("time")))

# Optional: filter, enrich, predict, etc.
stream_df = stream_df.filter(col("mag").isNotNull())

# Output
query = stream_df.writeStream \
    .format("console") \
    .option("truncate", False) \
    .outputMode("append") \
    .start()

query.awaitTermination(timeout=10)


False

In [None]:
from pyspark.sql.functions import to_timestamp, col
stream_df = stream_df.withColumn("time", to_timestamp(col("time")))

stream_df.printSchema()
# stream_df.show(5)

df_real = stream_df

root
 |-- mag: double (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- status: string (nullable = true)
 |-- tsunami: integer (nullable = true)
 |-- sig: integer (nullable = true)
 |-- dmin: double (nullable = true)
 |-- rms: double (nullable = true)
 |-- gap: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- type: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- depth: double (nullable = true)



### Feature engineering

In [None]:
# Step 2: Extract time features
df_real = df_real.withColumn("hour", hour(col("time"))) \
       .withColumn("dayofweek", dayofweek(col("time"))) \
       .withColumn("month", pyspark_month(col("time"))) \

# Step 3: Drop nulls for magnitude (our target variable)
df_real = df_real.filter(col("mag").isNotNull())

# Step 4: Drop sparse columns (similar to pandas version)
df_real = df_real.drop("cdi", "mmi", "felt")

# Check which categorical columns are actually present in the DataFrame
categorical_cols = [c for c in ["alert", "status", "magType", "type"] if c in df_real.columns]

# Update indexed_cols and encoder_output_cols based on available columns
indexed_cols = [f"{c}_indexed" for c in categorical_cols]
encoder_output_cols = [f"{c}_encoded" for c in categorical_cols]


# Create stages for the pipeline
stages = []

# Indexing for categorical columns
for categorical_col, indexed_col in zip(categorical_cols, indexed_cols):
    indexer = StringIndexer(
        inputCol=categorical_col,
        outputCol=indexed_col,
        handleInvalid="keep"
    )
    stages.append(indexer)

# One-hot encoding for indexed columns
encoder = OneHotEncoder(
    inputCols=indexed_cols,
    outputCols=encoder_output_cols
)
stages.append(encoder)

# Collect all numeric columns (excluding the target 'mag')
numeric_cols = [col_name for col_name, data_type in df_real.dtypes
               if data_type in ("int", "double") and col_name != "mag"]

# Add the encoded categorical columns
feature_cols = numeric_cols + encoder_output_cols

imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=numeric_cols
).setStrategy("mean")

# Add the Imputer to the pipeline stages before the VectorAssembler
stages.insert(0, imputer)

# Vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features",
    handleInvalid="keep"
)
stages.append(assembler)


from pyspark.ml import PipelineModel
pipeline_model = PipelineModel.load("models/pipeline_model")

# Transform the real-time data using the trained pipeline
transformed_df_real = pipeline_model.transform(df_real)

## **Training random forest model using historical data**

In [None]:
# Step 8: Train Random Forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="mag",
    numTrees=100
)
rf_model = rf.fit(transformed_df_hist)

rf_model.write().overwrite().save("models/rf_model")


In [None]:
predictions_train = rf_model.transform(transformed_df_hist)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="mag", predictionCol="prediction")

rmse_train = evaluator.setMetricName("rmse").evaluate(predictions_train)
mae_train = evaluator.setMetricName("mae").evaluate(predictions_train)
r2_train = evaluator.setMetricName("r2").evaluate(predictions_train)

print(f"Training RMSE: {rmse_train}")
print(f"Training MAE: {mae_train}")
print(f"Training R²: {r2_train}")

Training RMSE: 0.16507720437170917
Training MAE: 0.11903806494432777
Training R²: 0.9784576938348237


## **Predicting using real time data**

In [None]:
# Step 9: Make predictions
rf_model = RandomForestRegressionModel.load("models/rf_model")

# Predict on real-time transformed data
pred_results = rf_model.transform(transformed_df_real)

In [None]:
pred_results = pred_results.select(
    "*",
    col("mag").alias("actual_mag"),
    col("prediction").alias("predicted_mag")
)

### Saving the predictions to predicted_earthquakes.csv file


In [None]:
clean_df = pred_results.drop("status_encoded", "features", "magType_encoded", "type_encoded")

query = clean_df.writeStream \
    .format("csv") \
    .outputMode("append") \
    .option("path", "predicted_earthquakes") \
    .option("checkpointLocation", "predicted_earthquakes_checkpoint") \
    .option("header", True) \
    .start()

query.awaitTermination(timeout=20)


False

## **Defining magnitude buckets to create different impact levels**

In [None]:
  # Make sure to import col

bucketed = pred_results.withColumn(
    "impact_level",
    when(col("predicted_mag") < 4.0, "Minor")
    .when(col("predicted_mag") < 5.0, "Light")
    .when(col("predicted_mag") < 6.0, "Moderate")
    .when(col("predicted_mag") < 7.0, "Strong")
    .when(col("predicted_mag") < 8.0, "Major")
    .otherwise("Great")
)

### Storing the results to predicted_earthquakes_with_impact.csv file

In [None]:
bucketed = bucketed.drop("status_encoded", "magType_encoded", "type_encoded", "features") # Drop the 'features' column

query_bucket = bucketed.writeStream \
    .format("csv") \
    .outputMode("append") \
    .option("path", "predicted_earthquakes_with_impact") \
    .option("checkpointLocation", "predicted_earthquakes_with_impact_checkpoint") \
    .option("header", True) \
    .start()

query_bucket.awaitTermination(timeout=20)

False

In [None]:
import os, shutil
for file in os.listdir("predicted_earthquakes_with_impact"):
    if file.startswith("part-") and file.endswith(".csv"):
        shutil.move(f"predicted_earthquakes_with_impact/{file}", "predicted_earthquakes_with_impact.csv")
        break

In [None]:
bucket_counts = bucketed.groupBy("impact_level").count().orderBy("impact_level")

 # **SENTIMENT ANALYSIS ON NEWS DATA**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when, concat_ws
from pyspark.sql.types import StringType, ArrayType, FloatType
import nltk
import re

# Start Spark session
spark = SparkSession.builder.appName("EarthquakeImpactAnalysis").getOrCreate()

# Download required VADER lexicon
nltk.download("vader_lexicon")

# Load CSV file
df = spark.read.option("header", True).csv("news_earthquake.csv")

# Fill missing descriptions
df = df.withColumn("Description", when(col("Description").isNull(), "").otherwise(col("Description")))

# ✅ Fixed UDF: Get compound sentiment score
def get_compound_score(text):
    from nltk.sentiment.vader import SentimentIntensityAnalyzer
    sid = SentimentIntensityAnalyzer()
    return float(sid.polarity_scores(text)['compound'])

sentiment_udf = udf(get_compound_score, FloatType())
df = df.withColumn("Compound", sentiment_udf(col("Description")))

# UDF: Get sentiment label
def get_sentiment_label(compound):
    if compound >= 0.05:
        return "Positive"
    elif compound <= -0.05:
        return "Negative"
    else:
        return "Neutral"

sentiment_label_udf = udf(get_sentiment_label, StringType())
df = df.withColumn("Sentiment_Label", sentiment_label_udf(col("Compound")))

# Impact keywords list
impact_keywords = [
    "death", "deaths", "dead", "fatalities", "killed", "casualties",
    "injured", "hurt", "wounded", "critical condition", "trauma",
    "missing", "unaccounted", "trapped", "buried", "suffocated",
    "collapsed", "destroyed", "damaged", "ruined", "wreckage",
    "cracks", "debris", "rubble", "fractured", "sinkhole", "tilted",
    "building collapse", "infrastructure failure", "landslide", "roadblock",
    "rescue", "rescued", "search operations", "emergency", "firefighters",
    "evacuated", "evacuation", "relocated", "displacement", "cleared",
    "first responders", "military assistance", "national guard",
    "earthquake", "aftershocks", "tsunami", "tremors", "shockwave", "seismic wave",
    "ground shaking", "earth ruptured", "surface crack", "liquefaction",
    "aid", "relief", "donations", "shelter", "tents", "food supply",
    "water shortage", "NGO", "Red Cross", "emergency services", "crisis response",
    "humanitarian",
    "panic", "fear", "chaos", "devastation", "mental health", "helpline",
    "disruption", "unrest", "protest", "looting", "violence"
]

# UDF: Normalize suffixes
def normalize(word):
    for suffix in ["ing", "ed", "es", "s", "ly", "ment"]:
        if word.endswith(suffix) and len(word) > len(suffix) + 2:
            return word[:-len(suffix)]
    return word

# UDF: Extract impact keywords
def extract_keywords(text):
    text = text.lower()
    found = []
    for kw in impact_keywords:
        if normalize(kw.lower()) in text:
            found.append(kw)
    return list(set(found))

keyword_udf = udf(extract_keywords, ArrayType(StringType()))
df = df.withColumn("Impact_Keywords", keyword_udf(col("Description")))

# UDF: Extract magnitude
def extract_magnitude(text):
    text = text.lower()
    patterns = [
        r'magnitude[\s:]*([\d]{1,2}(\.\d{1,2})?)',
        r'([\d]{1,2}(\.\d{1,2})?)\s*magnitude',
        r'mw[\s:]*([\d]{1,2}(\.\d{1,2})?)',
        r'([\d]{1,2}(\.\d{1,2})?)-magnitude'
    ]
    for pattern in patterns:
        match = re.search(pattern, text)
        if match:
            return match.group(1)
    return None

magnitude_udf = udf(extract_magnitude, StringType())
df = df.withColumn("Extracted_Magnitude", magnitude_udf(col("Description")))

# UDF: Assign impact severity bucket
def assign_bucket(compound, magnitude, keywords):
    try:
        magnitude = float(magnitude) if magnitude else None
    except:
        magnitude = None

    keyword_count = len(keywords) if keywords else 0

    if magnitude:
        if magnitude >= 7.0 and compound <= -0.3 and keyword_count >= 4:
            return "Major"
        elif 6.0 <= magnitude < 7.0 and compound <= -0.2 and keyword_count >= 3:
            return "Strong"
        elif 5.0 <= magnitude < 6.0 and keyword_count >= 2:
            return "Moderate"
        elif 4.0 <= magnitude < 5.0 and keyword_count >= 1:
            return "Light"
        else:
            return "Minor"

    if compound <= -0.4 and keyword_count >= 4:
        return "Major"
    elif compound <= -0.3 and keyword_count >= 2:
        return "Strong"
    elif compound <= -0.2 and keyword_count >= 2:
        return "Moderate"
    elif keyword_count >= 1:
        return "Light"
    else:
        return "Minor"

bucket_udf = udf(assign_bucket, StringType())
df = df.withColumn("Impact_Severity_Bucket", bucket_udf(col("Compound"), col("Extracted_Magnitude"), col("Impact_Keywords")))

# Convert Impact_Keywords array to string (CSV-safe)
df = df.withColumn("Impact_Keywords_Str", concat_ws(", ", col("Impact_Keywords")))
df_final = df.drop("Impact_Keywords")


df_final.coalesce(1).write.mode("overwrite").option("header", True).csv("earthquake_news_enriched_output")

print("Final file saved to: 'earthquake_news_enriched_output/'")

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


Final file saved to: 'earthquake_news_enriched_output/'


In [None]:
import os, shutil
for file in os.listdir("earthquake_news_enriched_output"):
    if file.startswith("part-") and file.endswith(".csv"):
        shutil.move(f"earthquake_news_enriched_output/{file}", "earthquake_news_enriched.csv")
        break

print("Saved as: earthquake_news_enriched.csv")

Saved as: earthquake_news_enriched.csv


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, split, explode, trim, count, collect_list, expr
from collections import Counter

# Step 1: Start Spark session
spark = SparkSession.builder.getOrCreate()

# Step 2: Load CSV with flexible options to handle bad rows
df2 = spark.read.option("header", "true") \
    .option("mode", "DROPMALFORMED") \
    .csv("earthquake_news_enriched.csv")

# Step 3: Filter rows with non-null Impact_Severity_Bucket and Impact_Keywords_Str
df2_cleaned = df2.filter(
    col("Impact_Severity_Bucket").isNotNull() &
    col("Impact_Keywords_Str").isNotNull()
)

# Step 4: Normalize and explode keywords
df2_keywords = df2_cleaned.withColumn(
    "keyword", explode(split(lower(col("Impact_Keywords_Str")), ","))
).withColumn("keyword", trim(col("keyword")))

# Step 5: Group by severity and keyword, then count
keyword_counts = df2_keywords.groupBy("Impact_Severity_Bucket", "keyword") \
    .agg(count("*").alias("keyword_count"))

# Step 6: Get Top 10 keywords per Impact_Severity_Bucket
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("Impact_Severity_Bucket").orderBy(col("keyword_count").desc())

top_keywords = keyword_counts.withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 10) \
    .groupBy("Impact_Severity_Bucket") \
    .agg(collect_list("keyword").alias("Top_Keywords"))

# Step 7: Sentiment distribution per Impact_Severity_Bucket
sentiment_dist = df2_cleaned.groupBy("Impact_Severity_Bucket", "Sentiment_Label") \
    .agg(count("*").alias("count"))

# Total articles per severity for calculating sentiment %
total_per_bucket = df2_cleaned.groupBy("Impact_Severity_Bucket") \
    .agg(count("*").alias("total_articles"))

sentiment_dist_pct = sentiment_dist.join(total_per_bucket, on="Impact_Severity_Bucket") \
    .withColumn("sentiment_pct", expr("round(count / total_articles, 3)")) \
    .groupBy("Impact_Severity_Bucket") \
    .agg(
        collect_list(
            expr("concat(Sentiment_Label, ': ', sentiment_pct)")
        ).alias("Sentiment_Distribution")
    )

# Step 8: Join keywords and sentiment into final reference table
impact_summary_top10_df = top_keywords.join(sentiment_dist_pct, on="Impact_Severity_Bucket") \
    .join(total_per_bucket, on="Impact_Severity_Bucket")

# Step 9: Show result
impact_summary_top10_df.show(truncate=False)

+----------------------+---------------------------------------------------------------------------------------------+--------------------------------------------------+--------------+
|Impact_Severity_Bucket|Top_Keywords                                                                                 |Sentiment_Distribution                            |total_articles|
+----------------------+---------------------------------------------------------------------------------------------+--------------------------------------------------+--------------+
|Minor                 |[earthquake, aid, killed, tsunami, damaged, panic, tremors, injured, casualties, dead]       |[Positive: 0.152, Neutral: 0.156, Negative: 0.692]|653           |
|Light                 |[earthquake, aid, damaged, tremors, rescue, rescued, injured, rubble, tsunami, casualties]   |[Neutral: 0.272, Negative: 0.401, Positive: 0.327]|624           |
|Strong                |[earthquake, damaged, injured, aid, killed, tsunami

In [None]:
from pyspark.sql.functions import concat_ws, col

impact_summary_top10_flat = impact_summary_top10_df \
    .withColumn("Top_Keywords", concat_ws(", ", col("Top_Keywords"))) \
    .withColumn("Sentiment_Distribution", concat_ws(", ", col("Sentiment_Distribution")))

impact_summary_top10_flat.coalesce(1).write.mode("overwrite").option("header", True).csv("impact_summary_top10_csv")


In [None]:
import os, shutil

for f in os.listdir("impact_summary_top10_csv"):
    if f.startswith("part-") and f.endswith(".csv"):
        shutil.move(os.path.join("impact_summary_top10_csv", f), "impact_summary_top10.csv")
        break

shutil.rmtree("impact_summary_top10_csv")
print("Saved as impact_summary_top10.csv")


Saved as impact_summary_top10.csv


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
import textwrap
import smtplib
from email.message import EmailMessage

# Step 0: Start Spark session
spark = SparkSession.builder.getOrCreate()

# Step 1: Load real-time seismic predictions
df1_spark = spark.read.option("header", True).csv("predicted_earthquakes_with_impact.csv")

# Step 2: Get latest event by time
latest_event_row = df1_spark.orderBy(desc("time")).limit(1).collect()[0]

# Step 3: Extract values
predicted_mag = float(latest_event_row["predicted_mag"])
impact_level = latest_event_row["impact_level"]
latitude = float(latest_event_row["latitude"])
longitude = float(latest_event_row["longitude"])

# Step 4: Convert reference table to dictionary
reference_pd = impact_summary_top10_df.toPandas()
reference_dict = reference_pd.set_index("Impact_Severity_Bucket").T.to_dict()

# Step 5: Reverse geocoding with fallback
def reverse_geocode(lat, lon):
    geolocator = Nominatim(user_agent="earthquake-alert-system")
    try:
        location = geolocator.reverse((lat, lon), timeout=10)
        if location and location.address:
            return location.address
        else:
            return "Open ocean region (no nearby land)"
    except GeocoderTimedOut:
        return "Geocoding timed out"

location_name = reverse_geocode(latitude, longitude)

# Step 6: Generate alert message
def generate_earthquake_alert(impact_level, predicted_mag, latitude, longitude, location_name, ref_dict):
    if impact_level not in ref_dict:
        return f"No reference data available for impact level: {impact_level}"

    row = ref_dict[impact_level]
    keywords = row["Top_Keywords"]
    sentiments = row["Sentiment_Distribution"]
    article_count = row["total_articles"]

    # Sentiment parsing
    if isinstance(sentiments, dict):
        sentiment_scores = sentiments
    else:
        sentiment_scores = {s.split(":")[0]: float(s.split(":")[1]) for s in sentiments}

    neutral = sentiment_scores.get("Neutral", 0)
    negative = sentiment_scores.get("Negative", 0)
    positive = sentiment_scores.get("Positive", 0)

    neutral_pct = round(neutral * 100, 1)
    negative_pct = round(negative * 100, 1)
    positive_pct = round(positive * 100, 1)

    sentiment_str = f"Neutral: {neutral_pct}%, Negative: {negative_pct}%, Positive: {positive_pct}%"
    keyword_str = ", ".join(keywords[:10])
    key1, key2, key3 = keywords[0], keywords[1], keywords[2]

    if negative_pct > 60:
        sentiment_msg = f"Around {negative_pct}% of the sentiment in related articles showed negative emotion, indicating serious concern in past similar events."
    elif negative_pct > 40:
        sentiment_msg = f"Approximately {negative_pct}% of articles reflected negative sentiment — a moderate level of public concern."
    elif negative_pct > 25:
        sentiment_msg = f"Only about {negative_pct}% of the historical sentiment was negative. This is generally not a cause for major alarm."
    else:
        sentiment_msg = f"Less than {negative_pct}% of the sentiment was negative, suggesting that public response was calm in similar cases."

    # Ocean detection
    location_lower = location_name.lower()
    ocean_keywords = ["ocean", "sea", "trench", "open water", "open ocean", "gulf"]
    is_oceanic = any(word in location_lower for word in ocean_keywords)

    # Advisory logic
    if is_oceanic:
        advisory = (
            f"This earthquake occurred in the ocean region ({location_name}). "
            f"While similar earthquakes on land have shown sentiment patterns like: {sentiment_msg.lower()} "
            f"and often involved impacts like {key1}, {key2}, and {key3}, there is currently no land nearby. "
            f"As a result, this event is not considered a cause for public concern."
        )
    else:
        level = impact_level.lower()
        if level == "minor":
            advisory = (
                f"{sentiment_msg} Minor tremors like these were typically linked to terms like {key1} and {key2}. "
                f"No action is required — just stay informed."
            )
        elif level == "light":
            advisory = (
                f"{sentiment_msg} Light earthquakes have historically been associated with {key1} and {key2}. "
                f"No structural damage expected — stay aware."
            )
        elif level == "moderate":
            advisory = (
                f"{sentiment_msg} Events of moderate severity often involve {key1}, {key2}, or {key3}. "
                f"Monitor updates and ensure surroundings are safe."
            )
        elif level == "strong":
            advisory = (
                f"{sentiment_msg} Strong earthquakes like this have previously included impacts such as {key1}, {key2}, and {key3}. "
                f"Take shelter and prepare for aftershocks."
            )
        elif level == "major":
            advisory = (
                f"{sentiment_msg} Major earthquakes are commonly associated with {key1}, {key2}, and {key3}. "
                f"Follow emergency procedures and official alerts without delay."
            )
        else:
            advisory = f"{sentiment_msg} Stay alert and follow emergency protocols if applicable."

    # Final alert message
    alert_msg = (
        f" Earthquake Alert: **{impact_level} Severity** (Predicted Magnitude: {predicted_mag})\n"
        f" Location: Latitude {latitude}, Longitude {longitude} ({location_name})\n"
        f" Based on {article_count} historical news articles, such earthquakes were often associated with:\n"
        f" Common impacts: {keyword_str}\n"
        f" Sentiment response: {sentiment_str}\n"
        f"{advisory}"
    )
    return alert_msg

alert_message = generate_earthquake_alert(
    impact_level, predicted_mag, latitude, longitude, location_name, reference_dict
)

# Step 7: Print message in terminal with wrapping
print("\n EARTHQUAKE ALERT MESSAGE:\n")
for paragraph in alert_message.split("\n"):
    print(textwrap.fill(paragraph, width=100))

# Step 8: Email utility
def send_email_alert(subject, body, to_email, from_email, app_password):
    msg = EmailMessage()
    msg['Subject'] = subject
    msg['From'] = from_email
    msg['To'] = to_email
    msg.set_content(body)

    try:
        with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
            smtp.login(from_email, app_password)
            smtp.send_message(msg)
        print("Alert email sent successfully.")
    except Exception as e:
        print(f"Failed to send email: {e}")


wrapped_alert = "\n".join([textwrap.fill(p, width=100) for p in alert_message.split("\n")])


subject = f"Earthquake Alert: {impact_level} Severity"
to_email = "guannan.liu@sjsu.edu"
from_email = "chetana.muralidharan.24@gmail.com"
app_password = "gsdokgwxycdrfsfd"

send_email_alert(subject, wrapped_alert, to_email, from_email, app_password)



 EARTHQUAKE ALERT MESSAGE:

 Earthquake Alert: **Moderate Severity** (Predicted Magnitude: 5.469764185157726)
 Location: Latitude -7.1234, Longitude 129.138 (Maluku, Indonesia)
 Based on 126 historical news articles, such earthquakes were often associated with:
 Common impacts: earthquake, damaged, aid, tremors, injured, casualties, aftershocks, deaths, death,
tsunami
 Sentiment response: Neutral: 8.7%, Negative: 89.7%, Positive: 1.6%
Around 89.7% of the sentiment in related articles showed negative emotion, indicating serious
concern in past similar events. Events of moderate severity often involve earthquake, damaged, or
aid. Monitor updates and ensure surroundings are safe.
Alert email sent successfully.
