# EV Adoption Modeling (Improved Spark Pipeline)

This notebook is an improved PySpark pipeline for predicting `ev_adoption_probability`
using **Spark-supported models only**.

It adds:

- Rich **feature engineering** (interaction + polynomial features)
- Proper **train/test split before target encoding**
- **Target encoding** for high-cardinality `city` and `state`
- A tuned **Gradient Boosted Trees (GBT)** model using `CrossValidator`
- Comparison with a tuned **Random Forest**
- Option to toggle **raw vs log-transformed label**

> Note: The exact R² you reach (e.g., 0.85–0.90) depends on the signal in your data.
This notebook is structured to *maximize* performance with Spark's built-in models.


In [None]:
# ==============================
# 0. Imports & Spark Session
# ==============================
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import (
    RandomForestRegressor,
    GBTRegressor,
    LinearRegression,
    DecisionTreeRegressor,
    GeneralizedLinearRegression
)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

print("Using PySpark - EV adoption modeling")

spark = (
    SparkSession.builder
      .appName("EVAdoptionImproved")
      .master("yarn")                     # adjust for your cluster if needed
      .config("spark.submit.deployMode", "client")
      .config("spark.executor.instances", "2")
      .config("spark.executor.cores", "2")
      .config("spark.executor.memory", "1536m")
      .config("spark.executor.memoryOverhead", "512m")
      .config("spark.dynamicAllocation.enabled", "false")
      .config("spark.default.parallelism", "4")
      .config("spark.sql.shuffle.partitions", "4")
      .config("spark.hadoop.dfs.client.use.datanode.hostname", "true")
      .getOrCreate()
)

spark


In [None]:
# ==============================
# 1. Load Dataset
# ==============================

data_path = "hdfs://namenode:9000/hadoop/ev_dataset_final.csv"  # <-- change if needed

df = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(data_path)
)

print(f"Loaded {df.count():,} rows, {len(df.columns)} columns")
df.printSchema()
df.show(5)


In [None]:
TARGET_ROWS = 50000
total_rows = df.count()

if total_rows > TARGET_ROWS:
    frac = TARGET_ROWS / float(total_rows)
    df_sampled = df.sample(withReplacement=False, fraction=frac, seed=42)
    # Safety: if sample is slightly off, enforce limit
    df_sampled = df_sampled.limit(TARGET_ROWS)
    print(f"Using sampled dataset: {df_sampled.count()} rows (target {TARGET_ROWS})")
else:
    df_sampled = df
    print(f"Dataset has only {total_rows} rows, using full data")

# then use df_sampled instead of df
df = df_sampled

In [None]:
# ==============================
# 2. Missing Value Handling
# ==============================

total_rows = df.count()
print("Total rows:", total_rows)

numeric_cols = []
categorical_cols = []

for col_name in df.columns:
    dtype = str(df.schema[col_name].dataType)
    if 'double' in dtype.lower() or 'int' in dtype.lower():
        numeric_cols.append(col_name)
    elif 'string' in dtype.lower() and col_name not in ['city', 'date', 'state']:
        categorical_cols.append(col_name)

print(f"Numeric columns: {len(numeric_cols)}")
print(f"Categorical columns: {len(categorical_cols)}")
print("Identity columns treated separately: city, date, state")

# Fill numeric nulls with median
df_filled = df
for col_name in numeric_cols:
    median_val = df_filled.approxQuantile(col_name, [0.5], 0.01)[0]
    df_filled = df_filled.fillna({col_name: median_val})

# Fill categorical nulls with mode
for col_name in categorical_cols:
    mode_row = df_filled.groupBy(col_name).count().orderBy(F.desc("count")).first()
    if mode_row and mode_row[0] is not None:
        mode_val = mode_row[0]
        df_filled = df_filled.fillna({col_name: mode_val})

# Check remaining nulls
total_nulls = 0
for col_name in df_filled.columns:
    null_count = df_filled.filter(F.col(col_name).isNull()).count()
    total_nulls += null_count
    if null_count > 0:
        print(f"{col_name}: {null_count} nulls")

print("Total remaining nulls:", total_nulls)


In [None]:
# ==============================
# 3. Base Feature Engineering + Label
# ==============================

# Convert date to year/month
df_model = df_filled.withColumn(
    "date_parsed",
    F.to_date(F.col("date"), "dd/MM/yy")
)

df_model = (
    df_model
      .withColumn("year", F.year("date_parsed"))
      .withColumn("month", F.month("date_parsed"))
      .drop("date", "date_parsed")
)

# Core engineered features
df_model = df_model.withColumn(
    "affordability_index", F.col("vehicle_price_ev") / F.col("income_level")
)

df_model = df_model.withColumn(
    "infra_confidence_score", F.col("battery_range_km") * F.col("charging_stations_per_10km")
)

df_model = df_model.withColumn(
    "savings_ratio", F.col("fuel_price_per_litre") / F.col("electricity_cost_per_kwh")
)

df_model = df_model.withColumn(
    "resale_confidence",
    F.when(F.col("vehicle_resale_value_ice") == 0, 0)
     .otherwise(F.col("vehicle_resale_value_ev") / F.col("vehicle_resale_value_ice"))
)

df_model = df_model.withColumn(
    "market_momentum",
    F.when(F.col("ice_sales_last_year") == 0, 0)
     .otherwise(F.col("ev_sales_last_year") / F.col("ice_sales_last_year"))
)

# Toggle: use raw probability or log-transformed label
use_log_label = False  # set True to try log(label)

if use_log_label:
    df_model = df_model.withColumn(
        "log_ev_adoption", F.log(F.col("ev_adoption_probability") + F.lit(1e-6))
    )
    label_col = "log_ev_adoption"
else:
    label_col = "ev_adoption_probability"

# Drop rows with null label (just in case)
df_model = df_model.filter(F.col(label_col).isNotNull())

print("df_model schema:")
df_model.printSchema()
df_model.select("city", "state", label_col, "affordability_index", "infra_confidence_score").show(5)


In [None]:
# ==============================
# 4. High-Impact Interaction & Polynomial Features
# ==============================

# Infrastructure interactions
df_model = df_model.withColumn("range_x_stations", 
    F.col("battery_range_km") * F.col("charging_stations_per_10km"))

df_model = df_model.withColumn("infra_growth_x_range", 
    F.col("charging_infra_growth_rate") * F.col("battery_range_km"))

df_model = df_model.withColumn("gov_infra_x_stations",
    F.col("gov_infra_investment_crores") * F.col("charging_stations_per_10km"))

# Economic interactions
df_model = df_model.withColumn("income_x_subsidy", 
    F.col("income_level") * F.col("ev_subsidy_amount"))

df_model = df_model.withColumn("affordability_x_subsidy", 
    F.col("affordability_index") * F.col("ev_subsidy_amount"))

df_model = df_model.withColumn("ev_price_x_income",
    F.col("vehicle_price_ev") * F.col("income_level"))

# Environment & traffic interactions
df_model = df_model.withColumn("aqi_x_population", 
    F.col("avg_city_aqi") * F.col("population_density"))

df_model = df_model.withColumn("congestion_x_population", 
    F.col("traffic_congestion_index") * F.col("population_density"))

df_model = df_model.withColumn("aqi_x_congestion", 
    F.col("avg_city_aqi") * F.col("traffic_congestion_index"))

# Behavioral interactions
df_model = df_model.withColumn("awareness_x_anxiety", 
    F.col("ev_awareness_score") * F.col("consumer_range_anxiety_score"))

df_model = df_model.withColumn("momentum_x_awareness",
    F.col("market_momentum") * F.col("ev_awareness_score"))

df_model = df_model.withColumn("resale_x_awareness",
    F.col("resale_confidence") * F.col("ev_awareness_score"))

# Polynomial features (degree 2)
df_model = df_model.withColumn("range_sq", F.col("battery_range_km")**2)
df_model = df_model.withColumn("aqi_sq", F.col("avg_city_aqi")**2)
df_model = df_model.withColumn("population_sq", F.col("population_density")**2)
df_model = df_model.withColumn("congestion_sq", F.col("traffic_congestion_index")**2)

print("Sample of engineered columns:")
df_model.select(
    "battery_range_km", "charging_stations_per_10km",
    "range_x_stations", "infra_growth_x_range",
    "income_x_subsidy", "aqi_x_population",
    "awareness_x_anxiety"
).show(5)


In [None]:
# ==============================
# 5. Train/Test Split + Target Encoding
# ==============================

train_df, test_df = df_model.randomSplit([0.8, 0.2], seed=42)

print(f"Train rows: {train_df.count():,}")
print(f"Test rows: {test_df.count():,}")

# Target encoding on TRAIN only
city_avg = train_df.groupBy("city").agg(F.avg(label_col).alias("city_encoded"))
state_avg = train_df.groupBy("state").agg(F.avg(label_col).alias("state_encoded"))
global_avg = train_df.select(F.avg(label_col)).first()[0]
print("Global mean label:", global_avg)

def apply_target_encoding(data, city_map, state_map, global_mean):
    enc = (
        data
        .join(city_map, on="city", how="left")
        .join(state_map, on="state", how="left")
    )
    enc = enc.na.fill({
        "city_encoded": global_mean,
        "state_encoded": global_mean
    })
    return enc

train_ready = apply_target_encoding(train_df, city_avg, state_avg, global_avg)
test_ready  = apply_target_encoding(test_df, city_avg, state_avg, global_avg)

train_ready.select("city", "state", "city_encoded", "state_encoded").show(5)


In [None]:
# ==============================
# 6. Assembler + Baseline
# ==============================

final_feature_cols = [
    # Core engineered
    "affordability_index",
    "infra_confidence_score",
    "savings_ratio",
    "city_encoded",
    "state_encoded",
    "resale_confidence",
    "market_momentum",

    # Original important
    "fuel_price_per_litre",
    "avg_city_aqi",
    "vehicle_price_ev",
    "income_level",
    "charging_stations_per_10km",
    "ev_subsidy_amount",
    "maintenance_cost_ev",
    "charging_time_minutes",
    "population_density",
    "public_transport_score",
    "traffic_congestion_index",
    "ev_awareness_score",
    "gov_infra_investment_crores",
    "charging_infra_growth_rate",
    "consumer_range_anxiety_score",
    "battery_range_km",

    # Interaction features
    "range_x_stations",
    "infra_growth_x_range",
    "gov_infra_x_stations",
    "income_x_subsidy",
    "affordability_x_subsidy",
    "ev_price_x_income",
    "aqi_x_population",
    "congestion_x_population",
    "aqi_x_congestion",
    "awareness_x_anxiety",
    "momentum_x_awareness",
    "resale_x_awareness",

    # Polynomial features
    "range_sq",
    "aqi_sq",
    "population_sq",
    "congestion_sq",
]

assembler = VectorAssembler(inputCols=final_feature_cols, outputCol="features")

train_data = assembler.transform(train_ready).select("features", label_col)
test_data  = assembler.transform(test_ready).select("features", label_col)

print("Assembled training sample:")
train_data.show(3, truncate=False)

# Baseline model: always predict mean
train_mean = train_data.select(F.avg(label_col)).first()[0]
baseline = test_data.withColumn("prediction", F.lit(train_mean))

evaluator_r2 = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")
evaluator_rmse = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mae")

baseline_r2 = evaluator_r2.evaluate(baseline)
baseline_rmse = evaluator_rmse.evaluate(baseline)
baseline_mae = evaluator_mae.evaluate(baseline)

print(f"Baseline (mean-only) -> R²: {baseline_r2:.4f}, RMSE: {baseline_rmse:.4f}, MAE: {baseline_mae:.4f}")


In [None]:
# ==============================
# 7. Tuned Gradient Boosted Trees (GBT) via CrossValidator
# ==============================

gbt = GBTRegressor(featuresCol="features", labelCol=label_col, seed=42)

paramGrid = (
    ParamGridBuilder()
      .addGrid(gbt.maxDepth, [5, 7, 9])
      .addGrid(gbt.maxIter, [150, 250, 300])
      .addGrid(gbt.stepSize, [0.05, 0.1, 0.15])
      .addGrid(gbt.maxBins, [64, 128, 256])
      .addGrid(gbt.subsamplingRate, [0.8, 1.0])
      .build()
)

evaluator_cv = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")

cv = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_cv,
    numFolds=3,
    parallelism=4  # adjust based on cluster
)

print("Training tuned GBT (this may take a while)...")
cv_model = cv.fit(train_data)
gbt_preds = cv_model.transform(test_data)

gbt_r2 = evaluator_r2.evaluate(gbt_preds)
gbt_rmse = evaluator_rmse.evaluate(gbt_preds)
gbt_mae = evaluator_mae.evaluate(gbt_preds)

print(f"Tuned GBT -> R²: {gbt_r2:.4f}, RMSE: {gbt_rmse:.4f}, MAE: {gbt_mae:.4f}")


In [None]:
# ==============================
# 8. Tuned Random Forest (Optional Comparison)
# ==============================

rf = RandomForestRegressor(featuresCol="features", labelCol=label_col, seed=42)

rf_paramGrid = (
    ParamGridBuilder()
      .addGrid(rf.numTrees, [100, 200])      # 2
      .addGrid(rf.maxDepth, [8, 12])        # 2
      .addGrid(rf.subsamplingRate, [0.8])   # 1
      .build()
)

evaluator_cv = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2")

rf_cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=rf_paramGrid,
    evaluator=evaluator_cv,
    numFolds=2,          # 2 folds instead of 3
    parallelism=4
)

print("Training small-grid tuned Random Forest...")
rf_cv_model = rf_cv.fit(train_data)
rf_preds = rf_cv_model.transform(test_data)

rf_r2 = evaluator_r2.evaluate(rf_preds)
rf_rmse = evaluator_rmse.evaluate(rf_preds)
rf_mae = evaluator_mae.evaluate(rf_preds)

print(f"Tuned RF (small grid) -> R²: {rf_r2:.4f}, RMSE: {rf_rmse:.4f}, MAE: {rf_mae:.4f}")


In [None]:
# ==============================
# 9. Summary of Results
# ==============================

print("\n=== Final Model Comparison ===")
print(f"Baseline (mean-only)  : R² = {baseline_r2:.4f}, RMSE = {baseline_rmse:.4f}, MAE = {baseline_mae:.4f}")
print(f"Tuned GBT             : R² = {gbt_r2:.4f}, RMSE = {gbt_rmse:.4f}, MAE = {gbt_mae:.4f}")
print(f"Tuned Random Forest   : R² = {rf_r2:.4f}, RMSE = {rf_rmse:.4f}, MAE = {rf_mae:.4f}")

print("\nTips if R² is below your target:")
print("  - Change use_log_label between True/False and re-run the notebook.")
print("  - Inspect RF feature importances and drop very weak/noisy features.")
print("  - Adjust hyperparameter grids slightly around best values.")


In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped successfully")