In [None]:
%reset

In [None]:
import torch
import torch.optim as optim
import optuna
import pandas as pd
import numpy as np
import optuna

from torch import nn
from torch.utils.data import DataLoader, Dataset
from optuna.trial import TrialState
from sklearn.model_selection import train_test_split
from sklearn.metrics import  mean_squared_error, mean_absolute_percentage_error, r2_score
from optuna.trial import TrialState
from scipy import stats

In [None]:
#Set this value to true if hyperparameter tuning is complete and the test set should be loaded and predicted on
OUTPUT_TEST = False

In [None]:
#Other parameters for NN training
EPOCHS = 10
BATCH_SIZE = 64
LOSS_FN = nn.MSELoss()

In [None]:
#Load the training and validation datasets
X_train = pd.read_csv("../data/cleaned/training.csv")
y_train = pd.read_csv("../data/cleaned/training_labels.csv")
X_val = pd.read_csv("../data/cleaned/validation.csv")
y_val = pd.read_csv("../data/cleaned/validation_labels.csv")

In [None]:
#Some columns headers contain '[' or ']' which are not compatable with sklearn. They are change to '(' and ')' respectively.
columns = X_train.columns
for col in columns:
    if '[' in col or ']' in col:
        old_name = col
        col = col.replace('[', '(')
        col = col.replace(']', ')')
        
        X_train = X_train.rename(columns={old_name:col})
        X_val = X_val.rename(columns={old_name:col})

In [None]:
#Splitting of the training set into a vedrification and training set with a 90/10 split. This verification set is used for optuna hyperparameter tuning.
X_train, X_verif, y_train, y_verif = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

In [None]:
#Reset the indicies after splitting the dataset
X_train = X_train.reset_index(drop=True)
y_train = y_train.reset_index(drop=True)
X_verif = X_verif.reset_index(drop=True)
y_verif = y_verif.reset_index(drop=True)
X_val = X_val.reset_index(drop=True)
y_test = y_val.reset_index(drop=True)

In [None]:
Y_SUM = sum(y_train.to_numpy().squeeze())
print(Y_SUM)

In [None]:
def weighted_loss(true, pred):
    n = len(true)
    weights = true / torch.sum(true)
    squared = (true - pred)**2
    #df_1 = true.detach().numpy().squeeze()
    #df_2 = weights.detach().numpy().squeeze()
    #df_3 = squared.detach().numpy().squeeze()
    #weights_df = pd.DataFrame({'vals':df_1,'weight':df_2,'squared':df_3})
    #print(weights_df.head())
    weighted = squared * weights
    loss = torch.sum(weighted) / n
    return loss

In [None]:
LOSS_FN = weighted_loss

In [None]:
#Create custom dataset function for perovsktie solar cell database
class CustomDataset(Dataset):
    def __init__(self, features_dataframe, target_dataframe):
        self.features = features_dataframe
        self.target = target_dataframe

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        # Extract features and target for the given index
        features = torch.tensor(self.features.iloc[idx].values, dtype=torch.float32)
        target = torch.tensor(self.target.iloc[idx].values, dtype=torch.float32)
        return features, target

In [None]:
train_dataset = CustomDataset(features_dataframe=X_train, target_dataframe=y_train)
verif_dataset = CustomDataset(features_dataframe=X_verif, target_dataframe=y_verif)
val_dataset = CustomDataset(features_dataframe=X_val, target_dataframe=y_val)

In [None]:
# Create data loaders.
train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
verif_dataloader = DataLoader(verif_dataset, batch_size=BATCH_SIZE)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

In [None]:
#Choose training hardware based on what is locally available
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device")

In [None]:
def define_model(trial):
    # We optimize the number of layers, hidden units and dropout ratio in each layer.
    n_layers = trial.suggest_int("n_layers", 4,6)
    layers = []

    input = len(X_train.columns)
    in_features = input
    for i in range(n_layers):
        out_features = trial.suggest_int("n_units_l{}".format(i), 10, input//2)
        layers.append(nn.Linear(in_features, out_features))
        layers.append(nn.ReLU())
        p = trial.suggest_float("dropout_l{}".format(i), 0.2, 0.5)
        layers.append(nn.Dropout(p))

        in_features = out_features
    layers.append(nn.Linear(in_features, 1))

    return nn.Sequential(*layers)

In [None]:
def objective(trial):
    # Generate the model.
    model = define_model(trial).to(device)
    print(model)
    
    # Generate the optimizers.
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    lr = trial.suggest_float("lr", 1e-5, 1e-1, log=True)
    optimizer = getattr(optim, optimizer_name)(model.parameters(), lr=lr)


    # Training of the model.
    for epoch in range(EPOCHS):
        print(f"Epoch {epoch+1}\n-------------------------------")
        model.train()
        for batch_idx, (X, y) in enumerate(train_dataloader):
            X, y = X.to(device), y.to(device)

            pred = model(X)
            loss = LOSS_FN(pred, y)
            optimizer.zero_grad()
            loss.backward()
            #nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
        
        # Validation of the model.
        model.eval()
        test_loss, avg_error = 0, 0
        verif_error = 0
        verif_loss = 0
        num_batches = len(verif_dataloader)
        verif_size = len(verif_dataset)
        with torch.no_grad():
            for batch_idx, (X, y) in enumerate(verif_dataloader):
                X, y = X.to(device), y.to(device)
                pred = model(X)
                test_loss += LOSS_FN(pred, y).item()
                try:
                    current_error = mean_squared_error(pred, y, squared=False)
                    avg_error += current_error
                    verif_squared_error = (pred - y)**2
                    verif_summed_squared_error = torch.sum(verif_squared_error)
                    verif_error += verif_summed_squared_error
                    verif_loss += mean_absolute_percentage_error(y, pred)
                except: #Some runs weights are believed to overflow causing predictions to all be NaN, a gradient clip layer is used to combat this but in some situations it is not enough and the trial must be pruned
                    print("WARNING: Unstable MSE")
                    # Check for NaN values
                    nan_mask = torch.isnan(pred)
                    num_nan_entries = torch.sum(nan_mask).item()
                    print("Prediction contains {num} NaN entries".format(num=num_nan_entries))
                    print("Pruning Trial")
                    raise optuna.exceptions.TrialPruned()
                
        test_loss /= num_batches #Output metric to gauge how model is doing as training happens
        avg_error /= num_batches #Output metric to gauge how model is doing as training happens

        verif_loss /= num_batches
        verif_loss *= 100
        verif_rmse = np.sqrt(verif_error / verif_size)

        # Evaluate predictions
        accuracy = verif_loss + verif_rmse
        
        print(f"Test Error: \nAvg RMSE: {avg_error}, Avg loss: {test_loss:>8f}")
        trial.report(accuracy, epoch)
        print(f"Optuna accuracy: {accuracy}\n")
        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return accuracy

In [None]:
study = optuna.create_study(pruner=optuna.pruners.SuccessiveHalvingPruner())
study.optimize(objective, n_trials=50)

pruned_trials = study.get_trials(deepcopy=False, states=[TrialState.PRUNED])
complete_trials = study.get_trials(deepcopy=False, states=[TrialState.COMPLETE])

print("Study statistics: ")
print("  Number of finished trials: ", len(study.trials))
print("  Number of pruned trials: ", len(pruned_trials))
print("  Number of complete trials: ", len(complete_trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self, params):
        super(NeuralNetwork, self).__init__()

        n_layers = params['n_layers']
        layer_units = [params[f'n_units_l{i}'] for i in range(n_layers)]
        layer_dropouts = [params[f'dropout_l{i}'] for i in range(n_layers)]

        # Define layers based on the provided parameters
        self.layers = nn.ModuleList()

        for i in range(n_layers):
            in_features = layer_units[i - 1] if i > 0 else len(X_train.columns)
            out_features = layer_units[i]

            self.layers.append(nn.Linear(in_features=in_features, out_features=out_features))
            self.layers.append(nn.Dropout(layer_dropouts[i]))
            
            

        # Output layer
        self.output_layer = nn.Linear(in_features=layer_units[-1], out_features=1)


    def forward(self, x):
        # Define the forward pass
        for layer in self.layers:
            x = layer(x)
            x = torch.relu(x)  # You can use other activation functions based on your task

        # Output layer
        x = self.output_layer(x)

        return x

In [None]:
params = trial.params
params = {'n_layers': 1, 'n_units_l0': 4147, 'dropout_l0': 0.36, 'optimizer': 'RMSprop', 'lr': 0.0002615}
model = NeuralNetwork(params=params)
print(model)

In [None]:
#Create optimizer based on optuna results
op_name = params['optimizer']
if op_name == 'Adam':
    optimizer = torch.optim.Adam(model.parameters(), lr=params['lr'])
elif op_name == 'RMSprop':
    optimizer = torch.optim.RMSprop(model.parameters(), lr=params['lr'])
elif op_name == 'SGD':
    optimizer = torch.optim.SGD(model.parameters(), lr=params['lr'])
else:
    raise ValueError("Optimizer name not found. Ensure it is added to the list above.")

In [None]:
optimizer = torch.optim.RMSprop(model.parameters(), lr=params['lr'])

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)
        #print(loss.detach().numpy())
        # Backpropagation
        optimizer.zero_grad()
        loss.backward()

        #nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer.step()

        if batch % 9 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    preds = []
    true = []
    model.eval()
    test_loss, error = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            preds.append(list(pred.numpy()))
            true.append(list(y.numpy()))
            test_loss += loss_fn(pred, y).item()
            error += mean_squared_error(pred, y, squared=False)
    test_loss /= num_batches
    error /= num_batches
    print(f"Test Error: \nAvg RMSE: {error}, Avg loss: {test_loss:>8f} \n")
    return preds, true

In [None]:
for t in range(EPOCHS):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, LOSS_FN, optimizer)
    test(val_dataloader, model, LOSS_FN)
print("Done!")

In [None]:
train_preds, train_true = test(train_dataloader, model, LOSS_FN)

In [None]:
train_preds = np.concatenate(train_preds).ravel()
train_true = np.concatenate(train_true).ravel()

In [None]:
val_preds, val_true = test(val_dataloader, model, LOSS_FN)

In [None]:
val_preds = np.concatenate(val_preds).ravel()
val_true = np.concatenate(val_true).ravel()

In [None]:
val_rmse = mean_squared_error(val_true,val_preds,squared=False)
val_r = r2_score(val_true,val_preds)

In [None]:
print(val_rmse)
print(val_r)

In [None]:
if not OUTPUT_TEST:
    raise ValueError("OUTPUT_TEST set to False. If you would like to output final test values set to True and continue running from here")

In [None]:
X_test = pd.read_csv("../data/cleaned/test.csv")
y_test = pd.read_csv("../data/cleaned/test_labels.csv")

In [None]:
columns = X_test.columns
for col in columns:
    if '[' in col or ']' in col:
        old_name = col
        col = col.replace('[', '(')
        col = col.replace(']', ')')
        
        X_test = X_test.rename(columns={old_name:col})

In [None]:
test_dataset = CustomDataset(features_dataframe=X_test, target_dataframe=y_test)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

In [None]:
test_pred, test_true = test(test_dataloader, model, LOSS_FN)

In [None]:
test_pred = np.concatenate(test_pred).ravel()
test_true = np.concatenate(test_true).ravel()

In [None]:
error = mean_squared_error(test_true,test_pred,squared=False)
print("RMSE:", error)

In [None]:
#Save test true vals and predictions to csv

pred_data = pd.DataFrame(test_pred)
pred_filepath = '../data/predictions/NN/test_pred_nn.csv'
pred_data.to_csv(pred_filepath, index=False, header=False)
pred_data = pd.DataFrame(test_true)
pred_filepath = '../data/predictions/NN/test_true_nn.csv'
pred_data.to_csv(pred_filepath, index=False, header=False)

#Save train true vals and predictions to csv

pred_data = pd.DataFrame(train_preds)
pred_filepath = '../Data/Predictions/NN/train_pred_nn.csv'
pred_data.to_csv(pred_filepath, index=False, header=False)
pred_data = pd.DataFrame(train_true)
pred_filepath = '../data/predictions/NN/train_true_nn.csv'
pred_data.to_csv(pred_filepath, index=False, header=False)


In [None]:
#Save inputs to csv

pred_data = pd.DataFrame(X_train)
pred_filepath = '../data/predictions/NN/train_input_nn.csv'
pred_data.to_csv(pred_filepath, index=False, header=False)
true_data = pd.DataFrame(X_test)
true_filepath = '../data/predictions/NN/test_input_nn.csv'
true_data.to_csv(true_filepath, index=False, header=False)

In [None]:
#Read in values from csv and calculate RMSE and r values

test_pred_data = np.genfromtxt('../data/predictions/NN/test_pred_nn.csv', delimiter=',', filling_values=np.nan)
test_true_data = np.genfromtxt('../data/predictions/NN/test_true_nn.csv', delimiter=',', filling_values=np.nan)
train_pred_data = np.genfromtxt('../data/predictions/NN/train_pred_nn.csv', delimiter=',', filling_values=np.nan)
train_true_data = np.genfromtxt('../data/predictions/NN/train_true_nn.csv', delimiter=',', filling_values=np.nan)

test_rmse = mean_squared_error(test_true_data,test_pred_data,squared=False)
test_r = r2_score(test_true_data,test_pred_data)

train_rmse = mean_squared_error(train_true_data,train_pred_data,squared=False)
train_r = r2_score(train_true_data,train_pred_data)

print("Train:")
print(train_rmse)
print('Test:')
print(test_rmse)
print(test_r)

In [None]:
print("percent Error:", mean_absolute_percentage_error(test_true_data, test_pred_data)*100)