In [None]:
# Libraries for data analysis
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

# Linear Regression 

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import LinearRegression, Lasso
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score
from sklearn.metrics import mean_absolute_error

# Logistic Regression

from sklearn.linear_model import LogisticRegression

# Neural Networks

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.initializers import GlorotUniform
from tensorflow.keras import regularizers
from itertools import product


---

<h1><center>Data Pre-Processing

---

In [None]:
# Download data
df = pd.read_csv('ev_battery_charging_data.csv')

In [None]:
# Drop rows with any NaN values
df_clean = df.dropna()

# Shape after cleaning
print("\nAfter removing rows with NaN:")
print(f"Rows: {df_clean.shape[0]}, Columns: {df_clean.shape[1]}")

In [None]:
# Drop inputs calculated from the column we are predicting
drop_cols = ['Degradation Rate (%)', 'Efficiency (%)', 'Optimal Charging Duration Class']
df_clean = df_clean.drop(columns=drop_cols)

In [None]:
# List the categorical columns and numerical columns for one-hot encoding later
cat_col = ['Charging Mode', 'Battery Type', 'EV Model']
num_col = ['SOC (%)', 'Voltage (V)', 'Current (A)', 'Battery Temp (°C)', 'Ambient Temp (°C)', 'Charging Cycles']
target = 'Charging Duration (min)'

In [None]:
# Move the column we are predicting to the end
df_clean[target] = df_clean.pop(target)

In [None]:
# Split the data into test and train
from sklearn.model_selection import train_test_split
X = df_clean.drop(columns=target)
y = df_clean[target]
Xtrain_valid, Xtest, ytrain_valid, ytest = train_test_split(X, y, test_size=0.2)

# Split the data into train, validate, test

Xtrain, Xvalid, ytrain, yvalid = train_test_split(Xtrain_valid, ytrain_valid, test_size=0.125)

In [None]:
df_clean.head()

In [None]:
df_clean.describe(include='all')

In [None]:
# df_clean[target].plot(title='Charge Duration', xlabel='Index', ylabel='Time (min)')


In [None]:
# Preprocessor for numerical columns
num_transformer = Pipeline(
    steps=[("scaler", StandardScaler())])

# Preprocessor for categorical columns
cat_transformer = Pipeline(
    steps=[("encoder", OneHotEncoder(drop='first'))])

# Combine preprocessors
preprocessor = ColumnTransformer(
    transformers=[
        ('num', num_transformer, num_col),
        ('cat', cat_transformer, cat_col)])

In [None]:
Xtrain_norm = preprocessor.fit_transform(Xtrain)
Xvalid_norm = preprocessor.transform(Xvalid)
Xtest_norm = preprocessor.transform(Xtest)
feature_names_x = preprocessor.get_feature_names_out()
Xtrain_norm_df = pd.DataFrame(Xtrain_norm, columns=feature_names_x, index=Xtrain.index)
Xvalid_norm_df = pd.DataFrame(Xvalid_norm, columns=feature_names_x, index=Xvalid.index)
Xtest_norm_df = pd.DataFrame(Xtest_norm, columns=feature_names_x, index=Xtest.index)

ytrain_df = pd.DataFrame(ytrain)
yvalid_df = pd.DataFrame(yvalid)
ytest_df = pd.DataFrame(ytest)

---

<h1><center>Linear Regression Models

---

In [None]:
# Linear regression with feature selection
linreg_pipe = Pipeline([('preprocessor', preprocessor), 
                     ('linreg', LinearRegression())])

# Fit the model
linreg_pipe.fit(Xtrain, ytrain)

In [None]:
# Linear Regression R2
pred = linreg_pipe.predict(Xtest)
r2_lasso = np.max(r2_score(ytest, pred))
print(r2_lasso)

In [None]:
# Lasso regression
alphas = np.logspace(-5, 5, 10)
lasso_models = []

for alpha in alphas:
    lasso_pipe = Pipeline([('preprocessor', preprocessor), 
                         ('lasso', Lasso(alpha=alpha))])
    lasso_pipe.fit(Xtrain, ytrain)
    lasso_models.append(lasso_pipe)

In [None]:
# Lasso Regression R2
pred_lasso = []
r2_lasso = []

for i in range(len(alphas)):
    pred_lasso.append(lasso_models[i].predict(Xtest))
    r2_lasso.append(r2_score(ytest, pred_lasso[i]))

r2_lasso_max = max(r2_lasso)
print(r2_lasso_max)
print(np.argmax(r2_lasso))

<h2><center>Logistic Regression Comparison

---

<h1><center>Neural Networks

---

Plan:
1. Build a basic NN to get results
    a. Build pipeline
    b. Build NN model
    c. Compile NN w/ pipeline
    d. Calculate R2 score and compare with linreg
2. Build function to build pipelines 

In [None]:
# Build an MLP

ki = GlorotUniform(seed=2434)

model_mlp = Sequential([
    Dense(64, input_shape=(Xtrain_norm_df.shape[1],), activation='relu', kernel_initializer=ki),
    Dense(32, activation='relu', kernel_initializer=ki),
    Dense(16, activation='relu', kernel_initializer=ki),
    Dense(1, kernel_initializer=ki)
])

model_mlp.compile( optimizer="rmsprop", loss="mse", metrics=["r2_score"] )

In [None]:
# Train the MLP

history_mlp = model_mlp.fit(x=Xtrain_norm_df, y=ytrain_df, epochs=50, validation_data=(Xvalid_norm_df,yvalid_df))

In [None]:
mlp_perf = r2_score(ytrain_df, model_mlp.predict(Xtrain_norm_df))
mlp_perf_valid = r2_score(yvalid_df, model_mlp.predict(Xvalid_norm_df))
print(mlp_perf)
print(mlp_perf_valid)

In [None]:
def build_model(num_layers, lr, x_data, last_reg_layer):

    num_units = np.zeros(num_layers, dtype=int)
    num_units[-1] = 1
    if num_layers > 1:
        for i in range(num_layers-1):
            num_units[i] = 2**(num_layers+1-i)

    layers = []
    ki = GlorotUniform(seed=2434)
    layers.append(Dense(num_units[0], input_shape=(x_data.shape[1],), activation='relu', kernel_initializer=ki))
    for i in range(1, num_layers-1):
        layers.append(Dense(num_units[i], activation='relu', kernel_initializer=ki))
    layers.append(Dense(1, kernel_initializer=ki, kernel_regularizer=last_reg_layer))


    model = Sequential(layers)

    optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr)

    model.compile( optimizer=optimizer, loss="mse", metrics=["r2_score"] )

    return model

In [None]:
param_grid = {
    'num_layers': [2, 3, 4, 5],
    'lr': [1e-3, 1e-2, 1e-1],
    'last_reg_layer': [None, regularizers.l1(1e-2),
                       regularizers.l1(1e-1),
                       regularizers.l2(1e-2), regularizers.l2(1e-1)],
    'epochs': [10, 25, 50]
}


In [None]:
all_combos = list(product(
    param_grid['num_layers'],
    param_grid['lr'],
    param_grid['last_reg_layer'],
    param_grid['epochs']   
))

In [None]:
def reg_to_str(reg):
    if reg is None:
        return "None"
    elif hasattr(reg, 'l1'):
        return f"L1({reg.l1:.3f})"   # rounds to 3 decimals
    elif hasattr(reg, 'l2'):
        return f"L2({reg.l2:.3f})"
    else:
        return str(reg)


In [None]:
results = []

for num_layers, lr, last_reg_layer, epochs in all_combos:

    model = build_model(num_layers, lr, Xtrain_norm_df, last_reg_layer)

    history = model.fit(
        x = Xtrain_norm_df,
        y = ytrain_df,
        epochs=epochs,
        validation_data = (Xvalid_norm_df, yvalid_df),
        verbose=0
    )

    y_hat_valid = model.predict(Xvalid_norm_df)
    perf_valid = r2_score(yvalid_df, y_hat_valid)

    y_hat_train = model.predict(Xtrain_norm_df)
    perf_train = r2_score(ytrain_df, y_hat_train)

    results.append({
    'num_layers': num_layers,
    'lr': lr,
    'last_reg_layer': reg_to_str(last_reg_layer),
    'epochs': epochs,
    'r2_train': perf_train,
    'r2_valid': perf_valid,
    'model': model,
    })

results_df = pd.DataFrame(results)

print(results_df.head())


In [None]:
results_df['last_reg_layer'] = results_df['last_reg_layer'].fillna("None")
results_df

In [None]:
best_model_NN_idx = results_df['r2_valid'].idxmax()
best_model_NN = results_df.loc[best_model_NN_idx]
best_model_NN

In [None]:
best_model_NN_idx_train = results_df['r2_train'].idxmax()
worst_model_NN_idx = results_df['r2_valid'].idxmin()
worst_model_NN_idx_train = results_df['r2_train'].idxmin()

best_model_NN_train = results_df.loc[best_model_NN_idx_train]
worst_model_NN = results_df.loc[worst_model_NN_idx]
worst_model_NN_train = results_df.loc[worst_model_NN_idx_train]

best_model_NN_train

In [None]:
worst_model_NN

In [None]:
worst_model_NN_train

In [None]:
param_names = list(param_grid.keys())
sweeps = []

for i in param_names:
    list_param_values = sorted(results_df[i].unique())
    param_names_copy = [p for p in param_names if p != i]

    sweep = results_df[
        (results_df[param_names_copy[0]] == best_model_NN[param_names_copy[0]]) &
        (results_df[param_names_copy[1]] == best_model_NN[param_names_copy[1]]) &
        (results_df[param_names_copy[2]] == best_model_NN[param_names_copy[2]]) 
    ]
    sweeps.append(sweep)


In [None]:
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.flatten()
bar_width = 0.5
names = ['# of Layers', 'Learning Rate', 'Linear Actication Layer', '# of Epochs']

for idx, sweep in enumerate(sweeps):
    x_vals = sweep[param_names[idx]]
    y_vals = sweep['r2_valid']
    positions = np.arange(len(x_vals))
    
    axes[idx].bar(positions, y_vals, width=bar_width, color='red')
    axes[idx].set_xticks(positions)
    axes[idx].set_xticklabels(x_vals, rotation=45)
    axes[idx].set_title(names[idx])
    axes[idx].set_ylabel("R²")
    axes[idx].set_xlabel(param_names[idx])

plt.subplots_adjust(hspace=0.35)



---

<h1><center>Random Forest

---

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
import time

#5 Random Forest models:
#RF_1, RF_2, RF_3: increasing n_estimators (50, 100, 200)
#RF_4: regularized trees (shallower trees, min leaf = 5)
#RF_5: strongly regularized trees (more trees, deeper but min leaf = 10)

rf_configs = {
    'RF_1 (50 trees)': RandomForestRegressor(
        n_estimators=50,
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        random_state=42,
        n_jobs=-1
    ),
    'RF_2 (100 trees)': RandomForestRegressor(
        n_estimators=100,
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        random_state=42,
        n_jobs=-1
    ),
    'RF_3 (200 trees)': RandomForestRegressor(
        n_estimators=200,
        max_depth=None,
        min_samples_split=2,
        min_samples_leaf=1,
        random_state=42,
        n_jobs=-1
    ),
    'RF_4 (100 trees, depth=10, leaf=5)': RandomForestRegressor(
        n_estimators=100,
        max_depth=10,
        min_samples_split=2,
        min_samples_leaf=5,
        random_state=42,
        n_jobs=-1
    ),
    'RF_5 (300 trees, depth=15, leaf=10)': RandomForestRegressor(
        n_estimators=300,
        max_depth=15,
        min_samples_split=2,
        min_samples_leaf=10,
        random_state=42,
        n_jobs=-1
    ),
}

In [None]:
rf_results = []
rf_fitted_models = {}

for name, model in rf_configs.items():
    #Training the model and recording training time
    start = time.time()
    model.fit(Xtrain_norm_df, ytrain_df.values.ravel())
    train_time = time.time() - start
    
    #Saving the fitted model
    rf_fitted_models[name] = model

    #Get predictions on train and validation sets
    ytrain_pred = model.predict(Xtrain_norm_df)
    yvalid_pred = model.predict(Xvalid_norm_df)

    #Computing performance metrics
    train_r2 = r2_score(ytrain_df, ytrain_pred)
    valid_r2 = r2_score(yvalid_df, yvalid_pred)
    valid_mae = mean_absolute_error(yvalid_df, yvalid_pred)
    valid_rmse = np.sqrt(mean_squared_error(yvalid_df, yvalid_pred))

    #Storing the results
    rf_results.append({
        'Model': name,
        'n_estimators': model.n_estimators,
        'max_depth': model.max_depth,
        'min_samples_leaf': model.min_samples_leaf,
        'Train R²': train_r2,
        'Valid R²': valid_r2,
        'Valid MAE': valid_mae,
        'Valid RMSE': valid_rmse,
        'Train Time (s)': train_time
    })

rf_results_df = pd.DataFrame(rf_results)
rf_results_df.head()

In [None]:
#Plotting train_r2, valid_r2
rf_results_df.plot(
    x='Model',
    y=['Train R²', 'Valid R²'],
    marker='o',
    figsize=(8, 4),
    title='Random Forest Performance (Train vs Valid R²)'
)
plt.ylabel("R²")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
#Selecting model with highest validation R²
best_rf_row = rf_results_df.loc[rf_results_df['Valid R²'].idxmax()]
best_rf_name = best_rf_row['Model']
best_rf = rf_fitted_models[best_rf_name]

print("Best RF model (based on validation R²):")
display(best_rf_row[['Model', 'n_estimators', 'max_depth', 'min_samples_leaf', 
                     'Train R²', 'Valid R²', 'Valid MAE', 'Valid RMSE', 'Train Time (s)']])

#Evaluating the best RF model on the test set
ytest_pred = best_rf.predict(Xtest_norm_df)
test_r2 = r2_score(ytest_df, ytest_pred)
test_mae = mean_absolute_error(ytest_df, ytest_pred)
test_rmse = np.sqrt(mean_squared_error(ytest_df, ytest_pred))

print("\nTest performance of best RF model:")
print(f"Test R²:   {test_r2:.3f}")
print(f"Test MAE:  {test_mae:.3f}")
print(f"Test RMSE: {test_rmse:.3f}")

In [None]:
#Computing feature importances for all engineered features
importances = best_rf.feature_importances_

feat_imp_df = pd.DataFrame({
    'Feature': feature_names_x,
    'Importance': importances
}).sort_values('Importance', ascending=False)

feat_imp_df

In [None]:
#Plotting feature importances for ALL engineered features
feat_imp_df.plot(
    x='Feature',
    y='Importance',
    kind='bar',
    figsize=(10, 4),
    legend=False
)
plt.ylabel("Importance")
plt.title(f"Feature Importances for All Engineered Features ({best_rf_name})")
plt.tight_layout()
plt.show()

---

<h1><center>Extra Workspace

---