In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, round, lit, year, month, dayofmonth, count, sum as spark_sum, max as spark_max, avg
from pyspark.sql import types as T
from pyspark.sql.functions import broadcast

spark = SparkSession.builder \
    .appName("ForestFire_Project") \
    .config("spark.master", "spark://spark-master:7077") \
    .config("spark.driver.memory", "2g") \
    .config("spark.mongodb.output.uri", "mongodb://mongodb:27017/bigdata.batch_results") \
    .getOrCreate()

# paths
weather_path = "hdfs://namenode:9000/raw_data/turkey_weather/era5_daily.csv"
fire_path = "hdfs://namenode:9000/raw_data/turkey/*.csv"

# 1) read
df_weather = spark.read.csv(weather_path, header=True, inferSchema=True)
df_fire = spark.read.option("mergeSchema","true").csv(fire_path, header=True, inferSchema=True)

print("Reading Done.")

# 2) CLEAN fire dates (remove invalid dates)
df_fire = df_fire.withColumn("date_join", to_date(col("acq_date")))
null_dates = df_fire.filter(col("date_join").isNull()).count()
print("NULL date rows:", null_dates)

df_fire = df_fire.filter(col("date_join").isNotNull())

# 3) create grid keys (0.1 degree grid)
grid_step = 0.1
df_fire = df_fire.withColumn("lat_grid", (round(col("latitude")/grid_step)*grid_step).cast(T.DoubleType())) \
                 .withColumn("lon_grid", (round(col("longitude")/grid_step)*grid_step).cast(T.DoubleType()))

# 4) aggregate fire records per cell-day
fire_agg = df_fire.groupBy("lat_grid","lon_grid","date_join").agg(
    count("*").alias("n_fires"),
    spark_sum("frp").alias("sum_frp"),
    spark_max("brightness").alias("max_brightness"),
    avg("confidence").alias("mean_confidence"),
    count( (col("daynight") == "D").cast("int") ).alias("count_day"),
    count( (col("daynight") == "N").cast("int") ).alias("count_night")
)

print("Fire aggregation completed.")

# 5) prepare weather data
df_weather_clean = df_weather.withColumnRenamed("latitude","lat_grid") \
                             .withColumnRenamed("longitude","lon_grid") \
                             .withColumn("date_join", to_date(col("time"))) \
                             .withColumn("lat_grid", col("lat_grid").cast(T.DoubleType())) \
                             .withColumn("lon_grid", col("lon_grid").cast(T.DoubleType()))

print("Distinct weather grid cells:",
      df_weather_clean.select("lat_grid","lon_grid").distinct().count())

# 6) join
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 20 * 1024 * 1024)

df_integrated = df_weather_clean.join(
    broadcast(fire_agg),
    on=["lat_grid","lon_grid","date_join"],
    how="left"
)

print("JOIN Completed.")

# 7) fill missing values + add date features
df_final = df_integrated.fillna({
    'n_fires':0,
    'sum_frp':0.0,
    'max_brightness':0.0,
    'mean_confidence':0.0,
    'count_day':0,
    'count_night':0
}).withColumn("is_fire", (col("n_fires") > 0).cast("int")) \
  .withColumn("year", year(col("date_join"))) \
  .withColumn("month", month(col("date_join"))) \
  .withColumn("day", dayofmonth(col("date_join")))

# 8) Show sample
print("\n=== SAMPLE ROWS ===")
df_final.select("date_join","lat_grid","lon_grid","temp_max","n_fires","sum_frp","is_fire").show(20)

# 9) Show fire distribution
print("\n=== FIRE DISTRIBUTION ===")
df_final.groupBy("is_fire").count().show()

print("\nPipeline completed. No file saved.")


Reading Done.
NULL date rows: 4698
Fire aggregation completed.
Distinct weather grid cells: 585
JOIN Completed.

=== SAMPLE ROWS ===
+----------+--------+--------+----------+-------+-------+-------+
| date_join|lat_grid|lon_grid|  temp_max|n_fires|sum_frp|is_fire|
+----------+--------+--------+----------+-------+-------+-------+
|2018-01-01|    42.0|    26.0| 12.369537|      0|    0.0|      0|
|2018-01-02|    42.0|    26.0|11.1371155|      0|    0.0|      0|
|2018-01-03|    42.0|    26.0|  8.972076|      0|    0.0|      0|
|2018-01-04|    42.0|    26.0|  5.990387|      0|    0.0|      0|
|2018-01-05|    42.0|    26.0| 10.610748|      0|    0.0|      0|
|2018-01-06|    42.0|    26.0|12.7430725|      0|    0.0|      0|
|2018-01-07|    42.0|    26.0| 11.988434|      0|    0.0|      0|
|2018-01-08|    42.0|    26.0|  9.124176|      0|    0.0|      0|
|2018-01-09|    42.0|    26.0| 6.8346252|      0|    0.0|      0|
|2018-01-10|    42.0|    26.0|  7.244293|      0|    0.0|      0|
|2018-01-

In [4]:
# Print total number of rows for debugging (optional)
print(f"Total Rows: {df_integrated.count()}")

Total Rows: 6554130


In [None]:
# ---------- 1) SAVE FULL checkpoint ----------
print(f"Saving FULL dataset to {full_output_path} ...")
df_final.write.mode("overwrite").parquet(full_output_path)
print("Full dataset saved successfully.")

In [6]:
from pyspark.sql.functions import col, rand
from pyspark.storagelevel import StorageLevel

# ---------- 0) params ----------
full_output_path = "hdfs://namenode:9000/processed_data/turkey_integrated_full"
train_output_path = "hdfs://namenode:9000/processed_data/turkey_training_ready"
neg_ratio = 3            # desired ratio: negatives : positives = 3 : 1 (i.e., non-fire = 3 * fire)
rnd_seed = 42
shuffle_partitions = 200  # adjust according to your cluster



# ---------- 2) cache before expensive counts ----------
# Repartition to distribute data, then persist to avoid repeated computation for counts and sampling
df_final = df_final.repartition(shuffle_partitions).persist(StorageLevel.MEMORY_AND_DISK)
pos_count = df_final.filter(col("is_fire") == 1).count()
neg_count = df_final.filter(col("is_fire") == 0).count()
print(f"Counts -> pos: {pos_count}, neg: {neg_count}")

# ---------- 3) determine sampling fraction safely ----------
target_neg_count = pos_count * neg_ratio
if neg_count == 0:
    raise ValueError("No negative rows found â€” check df_final!")
fraction = target_neg_count / float(neg_count)
fraction = min(1.0, fraction)  # ensure <= 1.0 for sampling without replacement
print(f"Sampling fraction for negative class (capped to 1.0): {fraction:.6f}")

# ---------- 4) undersample negatives ----------
df_pos = df_final.filter(col("is_fire") == 1)
df_neg = df_final.filter(col("is_fire") == 0)
df_neg_sampled = df_neg.sample(withReplacement=False, fraction=fraction, seed=rnd_seed)

# ---------- 5) union (balanced) ----------
df_balanced = df_pos.unionByName(df_neg_sampled)

# ---------- 6) optional: light shuffle (avoid heavy global orderBy) ----------
# Approach: add a random column, repartition by it, then sort within partitions.
# This is cheaper than a full global orderBy(rand()) while providing a good shuffle.
from pyspark.sql.functions import rand as spark_rand
df_balanced = df_balanced.withColumn("_rand", spark_rand(seed=rnd_seed)) \
                         .repartition(shuffle_partitions, "_rand") \
                         .sortWithinPartitions("_rand") \
                         .drop("_rand")

# ---------- 7) quick checks ----------
print("\n=== BALANCED DISTRIBUTION ===")
df_balanced.groupBy("is_fire").count().show()

print("\nSample rows:")
df_balanced.select("date_join","lat_grid","lon_grid","temp_max","n_fires","sum_frp","is_fire").show(10, truncate=False)

# ---------- 8) Save READY-TO-TRAIN (only if you confirm) ----------
# Uncomment below to persist the balanced dataset to HDFS
# df_balanced.write.mode("overwrite").parquet(train_output_path)
# print("Saved balanced training dataset to:", train_output_path)

Counts -> pos: 5230, neg: 6406370
Sampling fraction for negative class (capped to 1.0): 0.002449

=== BALANCED DISTRIBUTION ===
+-------+-----+
|is_fire|count|
+-------+-----+
|      1| 5230|
|      0|15563|
+-------+-----+


Sample rows:
+----------+--------+--------+---------+-------+------------------+-------+
|date_join |lat_grid|lon_grid|temp_max |n_fires|sum_frp           |is_fire|
+----------+--------+--------+---------+-------+------------------+-------+
|2018-10-16|38.0    |41.5    |28.173492|2      |40.7              |1      |
|2020-03-12|42.0    |29.0    |10.052887|0      |0.0               |0      |
|2019-02-21|37.0    |34.5    |11.490631|0      |0.0               |0      |
|2020-06-21|36.0    |44.5    |31.717194|0      |0.0               |0      |
|2020-09-03|41.0    |45.0    |20.95108 |0      |0.0               |0      |
|2020-11-10|38.0    |33.5    |16.048492|2      |14.899999999999999|1      |
|2018-07-15|42.0    |44.0    |21.86734 |0      |0.0               |0      |
|

In [7]:
# ---------- 8) Save READY-TO-TRAIN ----------
df_balanced.write.mode("overwrite").parquet(train_output_path)
print("Saved balanced training dataset to:", train_output_path)

Saved balanced training dataset to: hdfs://namenode:9000/processed_data/turkey_training_ready


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# 1. Initialize Spark
spark = SparkSession.builder \
    .appName("ForestFire_ML_Training") \
    .config("spark.master", "spark://spark-master:7077") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

# 2. Read the prepared dataset (Balanced Data)
train_path = "hdfs://namenode:9000/processed_data/turkey_training_ready"
df = spark.read.parquet(train_path)

print(f"Total Data Loaded: {df.count()}")

# 3. Temporal Split
# Train: before 2020
# Test: 2020 and after
train_data = df.filter(col("year") < 2020)
test_data = df.filter(col("year") >= 2020)

print(f"Training Set (2018-2019): {train_data.count()}")
print(f"Test Set (2020): {test_data.count()}")

# ... (same initialization and loading code)

# ---------- features (very important modification) ----------
# We removed n_fires, frp, brightness, confidence because they cause data leakage
feature_cols = [
    "lat_grid", "lon_grid", "month", "day",  # location and time
    "temp_mean", "temp_max", "temp_min",     # temperature
    "humidity_mean", "humidity_min",         # humidity (very important)
    "wind_speed_mean", "wind_speed_max"      # wind
]

# Ensure the columns exist
present_features = [c for c in feature_cols if c in df_balanced.columns]
print("Using features (SAFE LIST):", present_features)

assembler = VectorAssembler(inputCols=present_features, outputCol="features", handleInvalid="skip")  # skip is better than keep to avoid invalid values
df_ready = assembler.transform(df_balanced).select("features", "is_fire", "date_join", "year")  # ensure 'year' exists

# ---------- split strategy (date correction) ----------
# Train: 2018 + 2019
# Test: 2020 (because you don't have 2021)
train = df_ready.filter(col("year") < 2020)
test  = df_ready.filter(col("year") == 2020)

print("Sizes: train=", train.count(), "test=", test.count())

if test.count() == 0:
    raise RuntimeError("Test set is empty! Check your year column or dataset.")

# ---------- training ----------
# Increased number of trees slightly to improve accuracy
rf = RandomForestClassifier(labelCol="is_fire", featuresCol="features", numTrees=100, maxDepth=10, seed=42)
print("Training Model...")
model = rf.fit(train)

# ---------- evaluation ----------
print("Predicting...")
preds = model.transform(test)

# accuracy & f1
mcc = MulticlassClassificationEvaluator(labelCol="is_fire", predictionCol="prediction", metricName="accuracy")
acc = mcc.evaluate(preds)
mcc_f1 = MulticlassClassificationEvaluator(labelCol="is_fire", predictionCol="prediction", metricName="f1")
f1 = mcc_f1.evaluate(preds)

# ROC & PR AUC
bce_roc = BinaryClassificationEvaluator(labelCol="is_fire", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = bce_roc.evaluate(preds)

print(f"\n=== FINAL RESULTS (Test 2020) ===")
print(f"Accuracy: {acc:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC:  {roc_auc:.4f}")

# Feature Importance (to see what actually influences fires)
rfModel = model
print("\nFeature Importances:")
importances = rfModel.featureImportances
for i, imp in enumerate(importances):
    if i < len(present_features):
        print(f" - {present_features[i]}: {imp:.4f}")

Total Data Loaded: 20793
Training Set (2018-2019): 13500
Test Set (2020): 7293
Using features (SAFE LIST): ['lat_grid', 'lon_grid', 'month', 'day', 'temp_mean', 'temp_max', 'temp_min', 'humidity_mean', 'humidity_min', 'wind_speed_mean', 'wind_speed_max']
Sizes: train= 13500 test= 7293
Training Model...
Predicting...

=== FINAL RESULTS (Test 2020) ===
Accuracy: 0.8553
F1 Score: 0.8451
ROC AUC:  0.9285

Feature Importances:
 - lat_grid: 0.2240
 - lon_grid: 0.1368
 - month: 0.1059
 - day: 0.0416
 - temp_mean: 0.0520
 - temp_max: 0.0597
 - temp_min: 0.0393
 - humidity_mean: 0.0917
 - humidity_min: 0.1805
 - wind_speed_mean: 0.0396
 - wind_speed_max: 0.0290


In [8]:
model_save_path = "hdfs://namenode:9000/models/turkey_fire_model_v1"

model.write().overwrite().save(model_save_path)

print("Model saved successfully at:", model_save_path)


Model saved successfully at: hdfs://namenode:9000/models/turkey_fire_model_v1
