If you are working on Visual Studio, it is recommended to generate a new dockerfile with the following code:  
  
  `FROM jupyter/pyspark-notebook:latest`  
  `RUN pip install pandas openpyxl geopy osmnx numpy pyspark geopandas`


In [None]:
# Import libraries
import pandas as pd
import geopandas as gpd
import numpy as np
import os
import osmnx as ox
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import log,  avg, when, col, udf, lit, split, ceil, upper, regexp_replace, row_number, countDistinct, count
from pyspark.sql.types import *
from geopy.distance import geodesic
from geopy.geocoders import Nominatim
from shapely.geometry import Point



In [None]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Big Data") \
    .getOrCreate()

In [None]:
# Read the dataset
t3=pd.read_excel("/data/full/task_3_dataset_enlarged.xlsx", header=1)
t3.rename(columns={"Latitude *": "Latitude", "Longitude *": "Longitude", "Energy  Predicted range": "Energy Predicted range"}, inplace=True)

t3 = spark.createDataFrame(t3)
t3.show(5)

+----------+----------------+----------------+----------------------+-------------------------------+
|  Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|
+----------+----------------+----------------+----------------------+-------------------------------+
|Location 1|     45.09934203|     7.708654993|                  40k+|                      645649.82|
|Location 2|45.5584398861864|9.24029489212583|                  40k+|                      682948.72|
|Location 3|45.5686020311218|9.36304684610199|                  40k+|                      563589.74|
|Location 4|45.6277610515318|8.71249975733013|                  40k+|                      488764.71|
|Location 5|41.9240785964876| 12.516346882219|                  40k+|                      313372.09|
+----------+----------------+----------------+----------------------+-------------------------------+
only showing top 5 rows



The coordinates (Latitude and Longitude) have been corrected in the original file.  
The original values had an incorrect order of magnitude (e.g., 1e+15), likely due to data entry or conversion errors.  
The adjustment brought the values back within plausible ranges for Italy:
- Latitude between 35° and 47°
- Longitude between 6° and 20°  

Coordinates outside these limits were considered erroneous and corrected by reducing the order of magnitude or replacing invalid values with the mean of valid coordinates.  
Only a few entries contained erroneous values, which were manually corrected. 
Otherwise, this adjustment would have been implemented programmatically by manipulating the strings and converting them into numerical values.

In [42]:
t3.printSchema()

root
 |-- Location: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Energy Predicted range: string (nullable = true)
 |-- Energy predicted Point Estimate: double (nullable = true)



In [43]:
# "Energy Predicted range" column has been encoded for better manipulation
t3 = t3.withColumn(
    "Energy Predicted range",
    when(t3["Energy Predicted range"] == "0k-10k", 0)
    .when(t3["Energy Predicted range"] == "10k-40k", 1)
    .when(t3["Energy Predicted range"] == "40k+", 2))

In [None]:
# Function to retrieve the presence of a highway junction within a km radius
def highway_junction(lat, lon, radius_km=1):
    tags = {"highway": "motorway_junction"} 
    
    try:
        toll_booths = ox.features_from_point((lat, lon), tags=tags, dist=radius_km * 1000)
        
        if toll_booths.empty:
            return False
        
        for index, toll_booth in toll_booths.iterrows():
            if 'geometry' in toll_booth and toll_booth.geometry.is_valid:
                toll_coords = (toll_booth.geometry.y, toll_booth.geometry.x)
                distance = geodesic((lat, lon), toll_coords).km
                if distance <= radius_km:
                    return True  # Junction found

    except Exception as e:
        return False

    return False  # No Junction found

highway_junction_udf = udf(lambda lat, lon: highway_junction(lat, lon), BooleanType())
t3=t3.withColumn("Highway Junction", highway_junction_udf(col("Latitude"), col("Longitude")))

t3.show()

+-----------+----------------+----------------+----------------------+-------------------------------+----------------+
|   Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|Highway Junction|
+-----------+----------------+----------------+----------------------+-------------------------------+----------------+
| Location 1|     45.09934203|     7.708654993|                     2|                      645649.82|           false|
| Location 2|45.5584398861864|9.24029489212583|                     2|                      682948.72|            true|
| Location 3|45.5686020311218|9.36304684610199|                     2|                      563589.74|            true|
| Location 4|45.6277610515318|8.71249975733013|                     2|                      488764.71|            true|
| Location 5|41.9240785964876| 12.516346882219|                     2|                      313372.09|            true|
| Location 6|     43.68321857|     10.43

In [None]:
# Function to find the city of a given latitude and longitude using OSM

def trova_comune_osm(latitudine, longitudine):
    geolocator = Nominatim(user_agent="osm_geocoder")
    
    try:
        # Inverse geocoding to find the city
        location = geolocator.reverse((latitudine, longitudine), language="it")
        if location and location.raw.get('address'):
            address = location.raw['address']
            comune = address.get('city') or address.get('town') or address.get('village') or address.get('municipality')
            if comune:
                return comune
            else:
                return "City not specified in the data"
        else:
            return "Address not found"
    except Exception as e:
        return f"Error during geocoding: {e}"

trova_comune_udf = udf(lambda lat, lon: trova_comune_osm(lat, lon), StringType())

t3 = t3.withColumn("station_city", trova_comune_udf(col("Latitude"), col("Longitude")))

The following population dataset has been created by collecting the data from Istat website.

In [None]:
population=spark.createDataFrame(pd.read_excel("/data/full/Population Italy.xlsx", header=0))
population.createOrReplaceTempView("Population_Italy")

# Divide the dataset into two tables
pop_age = spark.sql("""
    SELECT Age, `Total Male`, `Total Female`, `Total`
    FROM Population_Italy 
    WHERE Age IS NOT NULL AND TRIM(`Age`) != 'NaN'
""")

pop_city = spark.sql("""
    SELECT `Region`, `Province`, `City Code`, `City Name`, `Total Male`, `Total Female`, `Total`
    FROM Population_Italy 
    WHERE `City Name` IS NOT NULL AND TRIM(`City Name`) != 'NaN'
""")

In [48]:
pop_age.show(5)
pop_city.show(5)

+---+----------+------------+------+
|Age|Total Male|Total Female| Total|
+---+----------+------------+------+
|  0|    196699|      185369|382068|
|  1|    204524|      191748|396272|
|  2|    208869|      197972|406841|
|  3|    213468|      200474|413942|
|  4|    221623|      209801|431424|
+---+----------+------------+------+
only showing top 5 rows

+-------+---------+---------+--------------------+----------+------------+-----+
| Region| Province|City Code|           City Name|Total Male|Total Female|Total|
+-------+---------+---------+--------------------+----------+------------+-----+
|Sicilia|Agrigento|  84001.0|           Agrigento|     27124|       28193|55317|
|Sicilia|Agrigento|  84002.0|Alessandria della...|      1159|        1223| 2382|
|Sicilia|Agrigento|  84003.0|             Aragona|      4243|        4482| 8725|
|Sicilia|Agrigento|  84004.0|              Bivona|      1511|        1618| 3129|
|Sicilia|Agrigento|  84005.0|              Burgio|      1198|        1282| 

In [49]:
# Calculation of the percentage of People in Driving Age
filtered_pop_age = pop_age.filter((col("Age") >= 18) & (col("Age") <= 85))
filtered_total = filtered_pop_age.agg({"Total": "sum"}).collect()[0][0]
total_sum = pop_age.agg({"Total": "sum"}).collect()[0][0]
percentage = (filtered_total / total_sum)

print(f"Percentage of People in Driving Age: {(percentage*100):.2f}%")


Percentage of People in Driving Age: 81.56%


In [None]:
# Calculation of the number of People in Driving Age per city
pop_city_pandas = pop_city.toPandas()
city_region_dict = pop_city_pandas.set_index('City Name')['Region'].to_dict()
city_population_dict = pop_city_pandas.set_index('City Name')['Total'].to_dict()

def map_region(city_name):
    return city_region_dict.get(city_name, "Unknown")
def map_population(city_name):
    return city_population_dict.get(city_name, 0)

map_region_udf = udf(map_region, StringType())
map_population_udf = udf(map_population, IntegerType())

t3 = t3.withColumn(
    "Region", 
    upper(map_region_udf(t3["station_city"]))
).withColumn(
    "People in Driving Age", 
    ceil(map_population_udf(t3["station_city"]) * percentage)
)

t3.show()


+-----------+----------------+----------------+----------------------+-------------------------------+----------------+-------------------+--------------+---------------------+
|   Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|Highway Junction|       station_city|        Region|People in Driving Age|
+-----------+----------------+----------------+----------------------+-------------------------------+----------------+-------------------+--------------+---------------------+
| Location 1|     45.09934203|     7.708654993|                     2|                      645649.82|           false|             Torino|      PIEMONTE|               690792|
| Location 2|45.5584398861864|9.24029489212583|                     2|                      682948.72|            true|  Cinisello Balsamo|     LOMBARDIA|                61130|
| Location 3|45.5686020311218|9.36304684610199|                     2|                      563589.74|            t

In [51]:
# Calculation of the number of Electric/Plug in Hybrid Vehicles per region 

electric_vehicles = spark.read.csv("/data/full/electric.csv", header=True)
electric_vehicles = electric_vehicles.withColumn("Totale", electric_vehicles["Totale"].cast(IntegerType())).withColumn("Year", electric_vehicles["Year"].cast(IntegerType()))
electric_vehicles = electric_vehicles.filter(electric_vehicles["Year"] == 2023) # We take the most recent data available

# Creation of a dictionary with the total number of electric vehicles per region
el_v_per_region = electric_vehicles.groupBy("Area geografica").sum("Totale")
el_v_dict = {row["Area geografica"]: row["sum(Totale)"] for row in el_v_per_region.collect()}
el_v_dict.setdefault("MOLISE", 0) # Adding Molise as it is not present in the original dataset

# Creation of a dictionary with the total number of residents per region
pop_region = pop_city.groupBy("Region").sum("Total")
pop_region_dict = {row["Region"].upper(): row["sum(Total)"] for row in pop_region.collect()}


# Creation of a dictionary with the number of electric vehicles per capita per region
ev_percapita = {region: el_v_dict[region] / pop_region_dict[region] for region in pop_region_dict}

ev_percapita

{'SICILIA': 0.0013225537864958936,
 'TOSCANA': 0.015410399154332654,
 'PIEMONTE': 0.0045666801260248234,
 'VENETO': 0.0049870444429605115,
 'LOMBARDIA': 0.007209300747425685,
 'PUGLIA': 0.000945861434874861,
 'MARCHE': 0.0018842287293346187,
 'CAMPANIA': 0.0011894292671512873,
 'ABRUZZO': 0.0012811396867467792,
 'CALABRIA': 0.000717569295215298,
 'TRENTINO ALTO ADIGE': 0.06360812692979693,
 'SARDEGNA': 0.0011430791898061946,
 'MOLISE': 0.0,
 'EMILIA ROMAGNA': 0.004795043315906554,
 'LAZIO': 0.005768082356922888,
 'LIGURIA': 0.0017132287103993977,
 'FRIULI VENEZIA GIULIA': 0.00212913282577572,
 'BASILICATA': 0.00022487238492155701,
 'UMBRIA': 0.001047545699912685,
 "VALLE D'AOSTA": 0.12403062966395162}

In [None]:
# Calculation of the number of Electric/Plug in Hybrid Vehicles per city 
def calculate_ev(region, people_in_driving_age):
    ev_ratio = ev_percapita.get(region, 0)
    return people_in_driving_age * ev_ratio if people_in_driving_age is not None else None

calculate_ev_udf = udf(calculate_ev, DoubleType())

t3 = t3.withColumn(
    "Estimated number of electric vehicles",
    calculate_ev_udf(col("Region"), col("People in Driving Age"))
)

t3.show(5)

+----------+----------------+----------------+----------------------+-------------------------------+----------------+-----------------+---------+---------------------+-------------------------------------+
|  Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|Highway Junction|     station_city|   Region|People in Driving Age|Estimated number of electric vehicles|
+----------+----------------+----------------+----------------------+-------------------------------+----------------+-----------------+---------+---------------------+-------------------------------------+
|Location 1|     45.09934203|     7.708654993|                     2|                      645649.82|           false|           Torino| PIEMONTE|               690792|                   3154.6260976169397|
|Location 2|45.5584398861864|9.24029489212583|                     2|                      682948.72|            true|Cinisello Balsamo|LOMBARDIA|                61130|    

In [None]:
amenities=['restaurant', 'fuel', 'parking', 'atm', 'bank', 'hospital', 'pub', 
    'fast_food', 'townhall', 'nightclub', 'car_wash', 'conference_centre', 
    'payment_terminal', 'training', 'school', 'bus_station', 'pharmacy', 
    'supermarket', 'post_office']

# Function to count amenities per tipology
def count_amenities(lat, lon, radius=1000):
    amenity_counts = {}

    for amenity in amenities:
        try:
            location_point = Point(lon, lat)
            gdf = ox.features_from_point((lat, lon), tags={'amenity': amenity}, dist=radius)
            amenity_counts[amenity] = len(gdf)
        except Exception:
            amenity_counts[amenity] = 0

    return amenity_counts

count_amenities_udf = udf(lambda lat, lon: count_amenities(lat, lon), MapType(StringType(), IntegerType()))

t3 = t3.withColumn("amenities", count_amenities_udf(col("Latitude"), col("Longitude")))

# Explode the dictionary into separate columns
for amenity in amenities:
    t3 = t3.withColumn(amenity, col("amenities").getItem(amenity))


t3 = t3.drop("amenities")

t3.show()

+-----------+----------------+----------------+----------------------+-------------------------------+----------------+-------------------+--------------+---------------------+-------------------------------------+----------+----+-------+---+----+--------+---+---------+--------+---------+--------+-----------------+----------------+--------+------+-----------+--------+-----------+-----------+
|   Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|Highway Junction|       station_city|        Region|People in Driving Age|Estimated number of electric vehicles|restaurant|fuel|parking|atm|bank|hospital|pub|fast_food|townhall|nightclub|car_wash|conference_centre|payment_terminal|training|school|bus_station|pharmacy|supermarket|post_office|
+-----------+----------------+----------------+----------------------+-------------------------------+----------------+-------------------+--------------+---------------------+----------------------------------

In [None]:
# Intermediate save
# t3.toPandas().to_csv("/data/full/t3_before_competitors.csv", index=False)

In [None]:
# Calculation of competitors within 1km

pun = spark.read.csv(
    "/data/full/pun_data_cleaned.csv",
    header=True,
    inferSchema=True,
    sep=","
)

pun = pun.withColumn(
    "Latitude", regexp_replace(col("Latitude"), ",", ".")
    ).withColumn(
    'Latitude', col('Latitude').cast(StringType()).cast(FloatType()))
pun = pun.withColumn(
    "Longitude", regexp_replace(col("Longitude"), ",", ".")
    ).withColumn(
    'Longitude', col('Longitude').cast(StringType()).cast(FloatType()))

pun.show(5)

+--------------------+--------------------+---------+---------+
|        businessName|             evse_id| Latitude|Longitude|
+--------------------+--------------------+---------+---------+
|    EMOBITALY S.r.l.|                  IT| 42.42088|12.236132|
|A2A E.MOBILITY S....|IT*A2A*E119480574...| 45.58421| 10.17963|
|A2A E.MOBILITY S....|IT*A2A*E119480575...|45.632458| 9.860249|
|A2A E.MOBILITY S....|IT*A2A*E119480575...|45.355762| 9.679891|
|A2A E.MOBILITY S....|IT*A2A*E119480575...|45.578396|10.009855|
+--------------------+--------------------+---------+---------+
only showing top 5 rows



In [None]:
# Function to calculate distance between two points
def calculate_distance(lat1, lon1, lat2, lon2):
    try:
        return geodesic((lat1, lon1), (lat2, lon2)).km
    except:
        return None
    
distance_udf = udf(
    lambda lat1, lon1, lat2, lon2: calculate_distance(lat1, lon1, lat2, lon2), 
    FloatType()
)

In [None]:
t3_alias = t3.alias("t3")
pun_alias = pun.alias("pun")

# Cross join to calculate distances between each point of t3 and all points of pun
cross_joined = t3_alias.crossJoin(pun_alias)

with_distances = cross_joined.withColumn(
    "distance_km",
    distance_udf(
        col("t3.Latitude"), col("t3.Longitude"),
        col("pun.Latitude"), col("pun.Longitude")
    )
)


In [None]:
# Filters competitor within 1 km
competitors_within_1km = with_distances.filter(col("distance_km") <= 1)

# Counts the competitors per location
competitors_count = (
    competitors_within_1km.groupBy("Location")
    .count()
    .rdd.map(lambda row: (row["Location"], row["count"]))
    .collectAsMap()
)

def map_competitors(location):
    return competitors_count.get(location, 0)

map_competitors_udf = udf(map_competitors, IntegerType())

t3 = t3.withColumn(
    "competitors_within_1km",
    map_competitors_udf(t3["Location"])
)

t3.show()

+-----------+----------------+----------------+----------------------+-------------------------------+----------------+--------------------+--------------+---------------------+-------------------------------------+----------+----+-------+---+----+--------+---+---------+--------+---------+--------+-----------------+----------------+--------+------+-----------+--------+-----------+-----------+----------------------+
|   Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|Highway Junction|        station_city|        Region|People in Driving Age|Estimated number of electric vehicles|restaurant|fuel|parking|atm|bank|hospital|pub|fast_food|townhall|nightclub|car_wash|conference_centre|payment_terminal|training|school|bus_station|pharmacy|supermarket|post_office|competitors_within_1km|
+-----------+----------------+----------------+----------------------+-------------------------------+----------------+--------------------+--------------+-------

In [None]:
# Intermediate save
# t3.toPandas().to_csv("/data/full/t3_with_competitors.csv", index=False)

Calculation of Diversity Index, already seen in task 2

In [None]:
# competitors_within_1km is needed for the calculation of Diversity Index
amenities.append('competitors_within_1km')

t3 = t3.fillna({col: 0 for col in amenities})

@udf(DoubleType())
def shannon_index_udf(*args):
    row = np.array(args, dtype=float)
    total = row.sum()
    if total == 0:
        return 0.0
    proportions = row / total
    proportions = proportions[proportions > 0]
    return float(-np.sum(proportions * np.log(proportions)))

@udf(IntegerType())
def richness_index_udf(*args):
    row = np.array(args, dtype=float)
    return int(np.sum(row > 0))

# Calculate Shannon Index & Richness Index
t3 = t3.withColumn("Shannon_Index", shannon_index_udf(*[col(c) for c in amenities]))
t3 = t3.withColumn("Richness_Index", richness_index_udf(*[col(c) for c in amenities]))

# Calculate the Diversity Index
alpha = 0.8
beta = 0.2
t3 = t3.withColumn(
    "Diversity_Index",
    col("Shannon_Index") + lit(alpha) * col("Richness_Index") - lit(beta) * col("competitors_within_1km")
)

t3.show(5)


+----------+----------------+----------------+----------------------+-------------------------------+----------------+-----------------+---------+---------------------+-------------------------------------+----------+----+-------+---+----+--------+---+---------+--------+---------+--------+-----------------+----------------+--------+------+-----------+--------+-----------+-----------+----------------------+------------------+--------------+------------------+
|  Location|        Latitude|       Longitude|Energy Predicted range|Energy predicted Point Estimate|Highway Junction|     station_city|   Region|People in Driving Age|Estimated number of electric vehicles|restaurant|fuel|parking|atm|bank|hospital|pub|fast_food|townhall|nightclub|car_wash|conference_centre|payment_terminal|training|school|bus_station|pharmacy|supermarket|post_office|competitors_within_1km|     Shannon_Index|Richness_Index|   Diversity_Index|
+----------+----------------+----------------+----------------------+-----

We use the task 2 output dataset to train, validate and test our models in order to finally use t3 to make new predictions.  
We need the two dataset matching the schema

In [166]:
kpi=spark.read.csv("/data/full/kpi.csv", header=True)

In [None]:
# Calculation of missing columns
kpi = kpi.withColumn(
    "Highway Junction", 
    highway_junction_udf(col("latitude"), col("longitude"))
).withColumn(
    "station_city", 
    trova_comune_udf(col("latitude"), col("longitude"))
).withColumn(
    "Region", 
    upper(map_region_udf(col("station_city")))
).withColumn(
    "People in Driving Age", 
    ceil(map_population_udf(col("station_city")) * percentage)
).withColumn(
    "Estimated number of electric vehicles",
    calculate_ev_udf(col("Region"), col("People in Driving Age"))
)

kpi.show(5)

+------------------+--------------------+-------------------+------------------+-------------+------------------+----------------------------+---------+---------------------+---------+---------+----------+----+-------+---+----+--------+---+---------+--------+---------+--------+-----------------+----------------+--------+------+-----------+--------+-----------+-----------+----------+------------------+--------------+-----------------+----------------------+--------------------+------------------+-------------------+--------------------------+-------------------------+-----------------------+--------------------+-------------------------+----------------------------+----------------+--------------------+---------+---------------------+-------------------------------------+
|           EVSE ID|  indirizzo_stazione|energia_erogata_kwh|     ricavi_totali|costo_stimato|Numero_di_sessioni|competitors_within_1km_count|Plug Type|connector_isSuspended| latitude|longitude|restaurant|fuel|parking|

In [None]:
# Column renaming
t3 = t3.withColumnRenamed("Energy predicted Point Estimate", "Energy kwh")

columns_models = {
    "energia_erogata_kwh": "Energy kwh",
    "Diversity_Index": "Diversity_Index",
    "Highway Junction": "Highway Junction",
    "Estimated number of electric vehicles": "Estimated number of electric vehicles"
}

kpi2 = kpi.select(
    [col(original).alias(new) for original, new in columns_models.items()]
)

# Convertion of boolean values to numeric
kpi2 = kpi.withColumn("Highway Junction", when(col("Highway Junction") == True, 1).otherwise(0))
t3 = t3.withColumn("Highway Junction", when(col("Highway Junction") == True, 1).otherwise(0))


### Models  
For our study we decided to develop three models:  
- **Linear Regression**: this model provides a simple, interpretable baseline. It assumes a linear relationship between the input features and the target variable, making it an ideal starting point to understand the fundamental trends and relationships within the dataset.  
- **Random Forest**: This ensemble learning method excels in capturing complex, non-linear relationships and interactions between variables. By averaging predictions from multiple decision trees, it also helps reduce overfitting and improves robustness, making it suitable for datasets with high variability or noise.  
- **Decision Tree**: This model is straightforward, easy to visualize, and interpretable. It allows us to understand the feature splits and importance in a hierarchical manner, serving as a complementary approach to the Random Forest for identifying key drivers of the outcome.



In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, when
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import abs


In [None]:
# Convertion of string values to float
def convert_columns_to_float(df, columns):
    for column in columns:
        df = df.withColumn(column, col(column).cast("float"))
    return df

columns_to_convert = ["Diversity_Index", "Highway Junction", "Estimated number of electric vehicles", "Energy kwh"]

kpi_df = convert_columns_to_float(kpi2, columns_to_convert)
t3_df = convert_columns_to_float(t3, columns_to_convert)

# Fill missing values
kpi_df = kpi_df.na.fill(0)
t3_df = t3_df.na.fill(0)

In [None]:
# Specification of features and target
features = ["Diversity_Index", "Highway Junction", "Estimated number of electric vehicles"]
target = "Energy kwh"

assembler = VectorAssembler(inputCols=features, outputCol="features")
kpi_df = assembler.transform(kpi_df).select("features", target)

# Split training, validation & test set
train_data, val_data, test_data = kpi_df.randomSplit([0.6, 0.2, 0.2], seed=42)


In [None]:
# Define a function to train a model with early stopping
def train_model_with_early_stopping(model, train_data, val_data, test_data, max_epochs=100, tol=1e-4, patience=3):
    
    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
    history = {"training_error": [], "validation_error": [], "accuracy": []}  # Initialize history
    best_accuracy = -float("inf")
    epochs_without_improvement = 0
    best_model = None # set initial best_model to None

    # Print table header
    print(f"{'Epoch':<10}{'Training Error':<20}{'Validation Error':<20}{'Accuracy':<20}")
    print("=" * 70)

    for epoch in range(1, max_epochs + 1):
        # Train the model
        trained_model = model.fit(train_data)

        # Evaluate performance on training and validation datasets
        train_predictions = trained_model.transform(train_data)
        val_predictions = trained_model.transform(val_data)

        train_error = evaluator.evaluate(train_predictions)
        val_error = evaluator.evaluate(val_predictions)
        accuracy = 1 - abs(val_error - train_error)  # Calculate accuracy

        # Log metrics for this epoch
        history["training_error"].append(train_error)
        history["validation_error"].append(val_error)
        history["accuracy"].append(accuracy)

        # Print metrics for the current epoch
        print(f"{epoch:<10}{train_error:<20.4f}{val_error:<20.4f}{accuracy:<20.4f}")

        # Check for improvement
        if accuracy > best_accuracy + tol:
            best_accuracy = accuracy
            best_model = trained_model
            epochs_without_improvement = 0
        else:
            epochs_without_improvement += 1

        # Early stopping if there's no improvement
        if epochs_without_improvement >= patience:
            print("Early stopping triggered.")
            break

    print("=" * 70)

    # Final testing on the best model
    if best_model is not None:
        test_predictions = best_model.transform(test_data)
        test_error = evaluator.evaluate(test_predictions)
        print(f"Final Test Error: {test_error:.4f}")
    else:
        print("No improvement was achieved during training.")

    return best_model, history


In [None]:
from pyspark.ml.regression import LinearRegression

# Inizialize the model
lr_model = LinearRegression(featuresCol="features", labelCol=target)

# Train the model
best_lr_model, lr_history = train_model_with_early_stopping(lr_model, train_data, val_data, test_data)


In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

dt_model = DecisionTreeRegressor(featuresCol="features", labelCol=target)
best_dt_model, dt_history = train_model_with_early_stopping(dt_model, train_data, val_data, test_data)


In [None]:
from pyspark.ml.regression import RandomForestRegressor

rf_model = RandomForestRegressor(featuresCol="features", labelCol=target, numTrees=50)
best_rf_model, rf_history = train_model_with_early_stopping(rf_model, train_data, val_data, test_data)


In [None]:
# Display metrics in a dataframe
metrics = pd.DataFrame({
    "Model": ["LinearRegression", "DecisionTree", "RandomForest"],
    "Best Accuracy": [max(lr_history["accuracy"]), max(dt_history["accuracy"]), max(rf_history["accuracy"])],
    "Final Training Error": [lr_history["training_error"][-1], dt_history["training_error"][-1], rf_history["training_error"][-1]],
    "Final Validation Error": [lr_history["validation_error"][-1], dt_history["validation_error"][-1], rf_history["validation_error"][-1]],
})
display(metrics)



Now, based on each model best trial, we can make predictions for the new dataset `t3`.

In [None]:
from pyspark.ml.feature import VectorAssembler

features = ["Diversity_Index", "Highway Junction", "Estimated number of electric vehicles"]
t3_df = t3_df.select(*[col(f).cast("float") for f in features])

assembler = VectorAssembler(inputCols=features, outputCol="features")
t3_df = assembler.transform(t3_df)

trained_models = {
    "LinearRegression": best_lr_model,
    "DecisionTree": best_dt_model,
    "RandomForest": best_rf_model,
}

for model_name, model in trained_models.items():
    predictions = model.transform(t3_df).select("prediction").withColumnRenamed("prediction", f"Energy_kwh_{model_name}")
    t3_df = t3_df.withColumn(f"Energy_kwh_{model_name}", predictions["prediction"])

t3_df.show()

In [None]:
# Final save
# t3_df.write.csv("/data/full/t3_final.csv", header=True, mode="overwrite")

The models used have exhibited unsatisfactory performance in the current analysis. This outcome can likely be attributed to a combination of factors intrinsic to the data and the models themselves. Below, we outline the primary possible causes:

1. **Data Quality**: The dataset might suffer from issues such as incompleteness, noise, or inconsistency. For example, missing values or unhandled outliers can significantly impact model accuracy.

2. **Lack of Relevant Data**: If the available data does not adequately represent the phenomenon being modeled, the models may fail to capture meaningful patterns, limiting predictive capabilities.

3. **Low Variability in Data**: Limited variability or high collinearity among features can negatively affect performance, particularly for models like linear regression.

4. **Feature Representation**: The selected features may not be optimal, with variables that are weakly correlated or not representative of the target variable.

5. **Model Limitations**: Linear Regression may not be suitable for non-linear or highly complex problems.
  Decision Trees are prone to overfitting if not properly tuned.
  Random Forest, while more robust, can still struggle with noisy data or weak patterns.

6. **Small Dataset Size**: Models, especially more complex ones like Random Forest, require a sufficient volume of data to generalize effectively.


We then calculate a performance estimation to assess the efficiency and effectiveness of a charging station. This ratio is influenced by three key factors:  
  
- Energy Estimation: The projected energy demand or consumption.
- Diversity Index: A measure accounting for variations in usage patterns.
- Number of EVs: The total number of electric vehicles utilizing the station.  


In [None]:
# Calculation of the estimated performance
t3 = t3.withColumn(
    "Estimated performance",
    (col("Diversity_Index") * log(col("Energy kwh") + 1)) / (col("Estimated number of electric vehicles") + 1)
)


In [None]:
sorted_diversity = t3.orderBy("Diversity_Index", ascending=False)
# Select top 10 locations
top_10_locations = sorted_diversity.select("Location").limit(10).collect()
for row in top_10_locations:
    print(row["Location"])

Location 13
Location 30
Location 49
Location 29
Location 7
Location 52
Location 11
Location 15
Location 53
Location 12


In [193]:
sorted_performance = t3.orderBy("Estimated performance", ascending=False)
# Select top 10 locations
top_10_locations = sorted_performance.select("Location").limit(10).collect()
for row in top_10_locations:
    print(row["Location"])

Location 50
Location 63
Location 82
Location 97
Location 45
Location 38
Location 48
Location 120
Location 124
Location 30


In [194]:
spark.stop()