In [1]:
 !pip install  pyspark



In [4]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from multiprocessing import Pool
import time
import warnings

# PySpark libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor as SparkRF

warnings.filterwarnings('ignore')


np.random.seed(42)
data = pd.DataFrame({
    'temperature': np.random.normal(30, 5, 100000),
    'humidity': np.random.uniform(20, 90, 100000),
    'wind_speed': np.random.gamma(2, 3, 100000),
    'vegetation_index': np.random.beta(2, 5, 100000),
    'burn_area': np.random.exponential(10, 100000)
})


# 2. Parallel Preprocessing Function
def process_column(df, col):
    if pd.api.types.is_numeric_dtype(df[col]):
        return pd.Series((df[col] - df[col].mean()) / df[col].std(), name=col)
    return df[col]

def parallel_preprocess(df):
    # Create a list of arguments for process_column (df and column name)
    args = [(df, col) for col in df.columns]
    with Pool(4) as pool:
        processed = pool.starmap(process_column, args)
    return pd.concat(processed, axis=1)


# 3. Preprocessing: Parallel vs Serial Timing

# Parallel
start = time.time()
data_parallel = parallel_preprocess(data)
parallel_time = time.time() - start

# Serial
start = time.time()
data_serial = data.copy()
for col in data.columns:
    if pd.api.types.is_numeric_dtype(data[col]):
        data_serial[col] = (data[col] - data[col].mean()) / data[col].std()
serial_time = time.time() - start

print(f"✅ Parallel preprocessing time: {parallel_time:.2f}s")
print(f"✅ Serial preprocessing time: {serial_time:.2f}s")


X = data_parallel.drop('burn_area', axis=1)
y = data_parallel['burn_area']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)




start = time.time()
lr = LinearRegression().fit(X_train, y_train)
lr_time = time.time() - start
lr_rmse = np.sqrt(mean_squared_error(y_test, lr.predict(X_test)))

# -------------------------------------------
start = time.time()
rf_serial = RandomForestRegressor(n_estimators=100, n_jobs=1).fit(X_train, y_train)
rf_serial_time = time.time() - start
rf_serial_rmse = np.sqrt(mean_squared_error(y_test, rf_serial.predict(X_test)))


start = time.time()
rf_parallel = RandomForestRegressor(n_estimators=100, n_jobs=-1).fit(X_train, y_train)
rf_parallel_time = time.time() - start
rf_parallel_rmse = np.sqrt(mean_squared_error(y_test, rf_parallel.predict(X_test)))



spark = SparkSession.builder.appName("WildfirePrediction").getOrCreate()
spark_df = spark.createDataFrame(data_parallel)
assembler = VectorAssembler(
    inputCols=[c for c in data_parallel.columns if c != 'burn_area'],
    outputCol="features"
)
spark_df = assembler.transform(spark_df).select("features", "burn_area")
train, test = spark_df.randomSplit([0.8, 0.2])

start = time.time()
spark_rf = SparkRF(featuresCol="features", labelCol="burn_area", numTrees=100)
spark_model = spark_rf.fit(train)
spark_time = time.time() - start

spark_preds = spark_model.transform(test)
spark_rmse = spark_preds.selectExpr("sqrt(avg((burn_area - prediction)*(burn_area - prediction))) as rmse").collect()[0]['rmse']


results = pd.DataFrame({
    'Approach': ['Linear Regression', 'RF (Serial)', 'RF (Parallel)', 'Spark RF'],
    'Training Time (s)': [lr_time, rf_serial_time, rf_parallel_time, spark_time],
    'RMSE': [lr_rmse, rf_serial_rmse, rf_parallel_rmse, spark_rmse],
    'Speedup': ['-',
                '-',
                f"{rf_serial_time/rf_parallel_time:.1f}x",
                f"{rf_serial_time/spark_time:.1f}x"]
})

print("\n📊 === Model Training Results ===")
print(results.to_markdown(index=False))



spark.stop()

✅ Parallel preprocessing time: 0.18s
✅ Serial preprocessing time: 0.02s

📊 === Model Training Results ===
| Approach          |   Training Time (s) |     RMSE | Speedup   |
|:------------------|--------------------:|---------:|:----------|
| Linear Regression |           0.0374215 | 0.995494 | -         |
| RF (Serial)       |         158.574     | 1.01995  | -         |
| RF (Parallel)     |         131.571     | 1.01971  | 1.2x      |
| Spark RF          |          40.2768    | 0.998765 | 3.9x      |
