In [21]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
# import matplotlib.pyplot as plt

### Choose device and import dataset

In [22]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load data
csv_path = "D:/year 3/hk2/Machine Learning/testcode/cleaned_data.csv"
df = pd.read_csv(csv_path)

Using device: cuda


### selecting features and target(label) columns

In [23]:
features = ['store_sales(in millions)', 'store_cost(in millions)', 'total_children', 
            'avg_cars_at home(approx)', 'num_children_at_home', 'gross_weight', 
            'net_weight', 'store_sqft', 'grocery_sqft', 'units_per_case', 'SRP']
target = 'cost'

data = df[features].values
data_output = df[target].values

### Scaling the data to for better trainning

In [24]:
scaler_X = StandardScaler()
scaler_y = StandardScaler()
data = scaler_X.fit_transform(data)
data_output = scaler_y.fit_transform(data_output.reshape(-1, 1)) 

In [25]:
data = torch.tensor(data, dtype=torch.float32).to(device)
data_output = torch.tensor(data_output, dtype=torch.float32).to(device).view(-1, 1)

### Implement the neural network model

In [26]:
# class ImprovedModel(nn.Module):
#     def __init__(self, input_dim):
#         super(ImprovedModel, self).__init__()
#         # self.fc1 = nn.Linear(input_dim, 128)
#         # self.bn1 = nn.BatchNorm1d(128)
#         # self.fc2 = nn.Linear(128, 64)
#         # self.bn2 = nn.BatchNorm1d(64)
#         # self.fc3 = nn.Linear(64, 32)
#         # self.bn3 = nn.BatchNorm1d(32)
#         # self.fc4 = nn.Linear(32, 16)
#         # self.fc5 = nn.Linear(16, 1)
#         self.fc1 = nn.Linear(input_dim, 256)
#         self.fc2 = nn.Linear(256, 128)
#         self.fc3 = nn.Linear(128, 64)
#         self.fc4 = nn.Linear(64, 32)
#         self.fc5 = nn.Linear(32, 16)
#         self.fc6 = nn.Linear(16, 1)
#         self.dropout = nn.Dropout(0.2)
#         self.relu = nn.LeakyReLU(0.01)
    
#     def forward(self, x):
#         # x = self.relu(self.bn1(self.fc1(x)))
#         # # x = self.dropout(x)
#         # x = self.relu(self.bn2(self.fc2(x)))
#         # # x = self.dropout(x)
#         # x = self.relu(self.bn3(self.fc3(x)))
#         # # x = self.dropout(x)
#         # x = self.relu(self.fc4(x))
#         x = self.relu(self.fc1(x))
#         x = self.relu(self.fc2(x))
#         x = self.relu(self.fc3(x))
#         x = self.relu(self.fc4(x))
#         x = self.relu(self.fc5(x))
#         x = self.fc6(x)0
#         return x

In [27]:
class NeuralNetworkModel(nn.Module):
    def __init__(self, input_dim):
        super(NeuralNetworkModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.fc2 = nn.Linear(128, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.fc3 = nn.Linear(64, 32)
        self.bn3 = nn.BatchNorm1d(32)
        self.fc4 = nn.Linear(32, 16)
        self.fc5 = nn.Linear(16, 1)
        self.dropout = nn.Dropout(0.2)
        self.relu = nn.Softmax()
    
    def forward(self, x):
        x = self.relu(self.bn1(self.fc1(x))) #self.bn1(
        x = self.dropout(x)
        x = self.relu(self.bn2(self.fc2(x)))
        x = self.dropout(x)
        x = self.relu(self.bn3(self.fc3(x)))
        x = self.dropout(x)
        x = self.relu(self.fc4(x))
        x = self.fc5(x)
        return x

### training and testing model using batches

In [28]:
x_train, x_test, y_train, y_test = train_test_split(data,data_output,test_size=0.2,random_state=42)
batch_size = 64
train_dataset = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
learning_rate=0.001
model = NeuralNetworkModel(x_train.shape[1]).to(device)
loss=nn.MSELoss()
optium=torch.optim.AdamW(model.parameters(),learning_rate)
for epoch in range(100):
    model.train()
    l=0.0
    for batch_X, batch_y in train_loader:
        optium.zero_grad()
        y_predicted=model(batch_X)
        los=loss(y_predicted,batch_y)
        los.backward()
        optium.step()
        l+=los.item()
    if (epoch+1)%10==0:
        print(f"epoch {epoch+1}: the loss = {l/len(train_loader):.5f}")
with torch.no_grad():
    y_predicted=model(x_test)
    y_predicted_rounded=y_predicted.round()
    accuracy=y_predicted_rounded.eq(y_test.round()).sum()/y_test.shape[0]#float(y_test.shape[0])
    print(f"the acccuracy of the model is: {accuracy:.5f}")
    # for predict, real in zip(y_predicted[:10], y_test[:10]):
    #     print (f"output: {predict.item()}; predicted: {real.item()}")

  return self._call_impl(*args, **kwargs)


epoch 10: the loss = 0.98020
epoch 20: the loss = 0.97849
epoch 30: the loss = 0.97785
epoch 40: the loss = 0.97717
epoch 50: the loss = 0.97655
epoch 60: the loss = 0.97573
epoch 70: the loss = 0.97599
epoch 80: the loss = 0.97558
epoch 90: the loss = 0.97630
epoch 100: the loss = 0.97458
the acccuracy of the model is: 0.28330


### Attemping Cross validation using K Fold

In [29]:
k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)

In [30]:
results = []
for fold, (train_ids, val_ids) in enumerate(kfold.split(data)):
    print(f"Fold {fold + 1}/{k_folds}")
    
    # Split data into training and validation sets
    x_train, y_train = data[train_ids], data_output[train_ids]
    x_val, y_val = data[val_ids], data_output[val_ids]
    
    # Create DataLoader for training
    # batch_size = 64
    # train_dataset = TensorDataset(x_train, y_train)
    # train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    # Initialize model, loss, and optimizer
    model = NeuralNetworkModel(x_train.shape[1]).to(device)
    criterion = nn.MSELoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=0.001,weight_decay=0)
    
    # Training loop
    num_epochs = 50
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.5f}")

            
    model.eval()
    with torch.no_grad():
        y_pred = model(x_val)
        y_pred = torch.tensor(scaler_y.inverse_transform(y_pred.cpu().numpy()), dtype=torch.float32).to(device)  # Inverse transform predictions
        y_val = torch.tensor(scaler_y.inverse_transform(y_val.cpu().numpy()), dtype=torch.float32).to(device)  # Inverse transform validation data
        accuracy=y_pred.eq(y_val.round()).sum()/y_val.shape[0]#float(y_test.shape[0])
        print(f"the acccuracy of the model is: {accuracy:.5f}")
        mse = criterion(y_pred, y_val)
        print(f"Validation MSE for Fold {fold + 1}: {mse.item():.5f}")
        
        # Calculate R² score
        ss_total = torch.sum((y_val - torch.mean(y_val))**2)
        ss_residual = torch.sum((y_val - y_pred)**2)
        r2 = 1 - (ss_residual / ss_total)
        print(f"R² Score for Fold {fold + 1}: {r2.item():.5f}")
        
        results.append({
            "fold": fold + 1,
            "mse": mse.item(),
            "r2": r2.item()
        })
 

Fold 1/5
Epoch [10/50], Loss: 0.98055
Epoch [20/50], Loss: 0.97884
Epoch [30/50], Loss: 0.97738
Epoch [40/50], Loss: 0.97592
Epoch [50/50], Loss: 0.97529
the acccuracy of the model is: 0.00000
Validation MSE for Fold 1: 878.98645
R² Score for Fold 1: 0.02716
Fold 2/5


  return self._call_impl(*args, **kwargs)


Epoch [10/50], Loss: 0.97916
Epoch [20/50], Loss: 0.97720
Epoch [30/50], Loss: 0.97645
Epoch [40/50], Loss: 0.97649
Epoch [50/50], Loss: 0.97340
the acccuracy of the model is: 0.00000
Validation MSE for Fold 2: 877.48810
R² Score for Fold 2: 0.03593
Fold 3/5


  return self._call_impl(*args, **kwargs)


Epoch [10/50], Loss: 0.98042
Epoch [20/50], Loss: 0.97806
Epoch [30/50], Loss: 0.97697
Epoch [40/50], Loss: 0.97600
Epoch [50/50], Loss: 0.97505
the acccuracy of the model is: 0.00000
Validation MSE for Fold 3: 860.96033
R² Score for Fold 3: 0.03087
Fold 4/5


  return self._call_impl(*args, **kwargs)


Epoch [10/50], Loss: 0.98050
Epoch [20/50], Loss: 0.97753
Epoch [30/50], Loss: 0.97686
Epoch [40/50], Loss: 0.97520
Epoch [50/50], Loss: 0.97446
the acccuracy of the model is: 0.00000
Validation MSE for Fold 4: 867.69952
R² Score for Fold 4: 0.03174
Fold 5/5


  return self._call_impl(*args, **kwargs)


Epoch [10/50], Loss: 0.98034
Epoch [20/50], Loss: 0.97896
Epoch [30/50], Loss: 0.97724
Epoch [40/50], Loss: 0.97560
Epoch [50/50], Loss: 0.97481
the acccuracy of the model is: 0.00000
Validation MSE for Fold 5: 878.45184
R² Score for Fold 5: 0.02931


### Evaluate the cross validation result

In [31]:
print("\nCross-Validation Results:")
for result in results:
    print(f"Fold {result['fold']}: MSE = {result['mse']:.5f}, R² = {result['r2']:.5f}")

# Calculate average performance across folds
avg_mse = np.mean([result['mse'] for result in results])
avg_r2 = np.mean([result['r2'] for result in results])
print(f"\nAverage MSE across folds: {avg_mse:.5f}")
print(f"Average R² across folds: {avg_r2:.5f}")


Cross-Validation Results:
Fold 1: MSE = 878.98645, R² = 0.02716
Fold 2: MSE = 877.48810, R² = 0.03593
Fold 3: MSE = 860.96033, R² = 0.03087
Fold 4: MSE = 867.69952, R² = 0.03174
Fold 5: MSE = 878.45184, R² = 0.02931

Average MSE across folds: 872.71725
Average R² across folds: 0.03100
