## Programming and Maths for AI: Task 1

Let's load our dataset

In [40]:
import pandas as pd
df = pd.read_csv('winequalityN.csv')
df

Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,white,7.0,0.270,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6
1,white,6.3,0.300,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6
2,white,8.1,0.280,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6
3,white,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
4,white,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6492,red,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5
6493,red,5.9,0.550,0.10,2.2,0.062,39.0,51.0,0.99512,3.52,,11.2,6
6494,red,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6
6495,red,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5


First, let's count the null values and drop all rows that have null values

In [41]:
df.isna().sum()


type                     0
fixed acidity           10
volatile acidity         8
citric acid              3
residual sugar           2
chlorides                2
free sulfur dioxide      0
total sulfur dioxide     0
density                  0
pH                       9
sulphates                4
alcohol                  0
quality                  0
dtype: int64

In [42]:
df.dropna(inplace=True)
df

Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,white,7.0,0.270,0.36,20.7,0.045,45.0,170.0,1.00100,3.00,0.45,8.8,6
1,white,6.3,0.300,0.34,1.6,0.049,14.0,132.0,0.99400,3.30,0.49,9.5,6
2,white,8.1,0.280,0.40,6.9,0.050,30.0,97.0,0.99510,3.26,0.44,10.1,6
3,white,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
4,white,7.2,0.230,0.32,8.5,0.058,47.0,186.0,0.99560,3.19,0.40,9.9,6
...,...,...,...,...,...,...,...,...,...,...,...,...,...
6491,red,6.8,0.620,0.08,1.9,0.068,28.0,38.0,0.99651,3.42,0.82,9.5,6
6492,red,6.2,0.600,0.08,2.0,0.090,32.0,44.0,0.99490,3.45,0.58,10.5,5
6494,red,6.3,0.510,0.13,2.3,0.076,29.0,40.0,0.99574,3.42,0.75,11.0,6
6495,red,5.9,0.645,0.12,2.0,0.075,32.0,44.0,0.99547,3.57,0.71,10.2,5


Now, we want to check whether the dataset is balanced or not

In [43]:
df['quality'].value_counts()

quality
6    2820
5    2128
7    1074
4     214
8     192
3      30
9       5
Name: count, dtype: int64

We notice that this dataset is highly imbalanced and this will result to issues in results. To resolve this issue, we will work only with the 3 highest classes (5,6,7) and we will take 1,000 samples of each to have a balanced dataset

In [44]:
samples = []

# Loop over each target quality
for quality in [5, 6, 7]:
    # Take 1000 random samples for this quality
    sampled_rows = df[df['quality'] == quality].sample(n=1000, random_state=42)
    samples.append(sampled_rows)

# Combine all sampled rows into one DataFrame
sampled_df = pd.concat(samples, ignore_index=True)
sampled_df

Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,white,6.3,0.270,0.23,2.90,0.047,13.0,100.0,0.99360,3.28,0.43,9.8,5
1,red,7.5,0.610,0.26,1.90,0.073,24.0,88.0,0.99612,3.30,0.53,9.8,5
2,red,15.5,0.645,0.49,4.20,0.095,10.0,23.0,1.00315,2.92,0.74,11.1,5
3,white,6.7,0.240,0.41,2.90,0.039,48.0,122.0,0.99052,3.25,0.43,12.0,5
4,red,7.2,0.580,0.03,2.30,0.077,7.0,28.0,0.99568,3.35,0.52,10.0,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2995,white,6.7,0.340,0.26,1.90,0.038,58.0,138.0,0.98930,3.00,0.47,12.2,7
2996,white,6.0,0.260,0.32,3.50,0.028,29.0,113.0,0.99120,3.40,0.71,12.3,7
2997,red,8.1,0.380,0.28,2.10,0.066,13.0,30.0,0.99680,3.23,0.73,9.7,7
2998,white,6.8,0.240,0.35,6.40,0.048,44.0,172.0,0.99440,3.29,0.55,10.5,7


Now we need to convert text data to numerical ones. To do so, we will convert white to 0 and red to 1 values

In [45]:
# Map 'type' to numbers
sampled_df['type'] = sampled_df['type'].map({'white': 0, 'red': 1})
sampled_df.head()

Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,0,6.3,0.27,0.23,2.9,0.047,13.0,100.0,0.9936,3.28,0.43,9.8,5
1,1,7.5,0.61,0.26,1.9,0.073,24.0,88.0,0.99612,3.3,0.53,9.8,5
2,1,15.5,0.645,0.49,4.2,0.095,10.0,23.0,1.00315,2.92,0.74,11.1,5
3,0,6.7,0.24,0.41,2.9,0.039,48.0,122.0,0.99052,3.25,0.43,12.0,5
4,1,7.2,0.58,0.03,2.3,0.077,7.0,28.0,0.99568,3.35,0.52,10.0,5


In [46]:
# Features
X = sampled_df.drop('quality', axis=1)

# Target
y = sampled_df['quality']

As we have lots of features with different scales each, that can affect performance, we will apply Standardization to our data, in order to get a common scale ranging from 0 to 1.

In [52]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
y = y.to_numpy()             # Convert target to NumPy array


### Train test splitting

Stratify- Ensures that the proportion of a target variable's classes is the same in training, testing and testing sets as it was in the original
The dataset has been split into the following manner:
1. Firstly the dataset is split into train and test which 80% for training and 20% for testing maintaining the distribution of the class in each folder using Stratify.
2. Secondly the test data is further split into 10% tetsing and 10% validation dataset again maintaing the distribution of the classes in the dataset .
Overall, the entire dataset is split in the ratio 80:10:10 (80% training, 10% testing and 10% validation)

In [53]:
#splitting the dataset into train and testing which is 80% training and 20% testing
from sklearn.model_selection import train_test_split


X_train, X_temp, y_train, y_temp = train_test_split(
    X_scaled, y, test_size=0.20, random_state=42, shuffle=True, stratify=y
)

# Now splitting the dataset into test and validation where 20% breaks down to 10% test and 10% validation
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, shuffle=True, stratify=y_temp
)

print(f"Training data: {X_train.shape}, Validation data: {X_val.shape}, Test data: {X_test.shape}")

Training data: (2400, 12), Validation data: (300, 12), Test data: (300, 12)


#### Activation functions: sigmoid and ReLU

Sigmoid and its derivative

In [49]:
import numpy as np

def sigmoid(x):

    s = 1/(1 + np.exp(-x))

    return s

def derivative_sigmoid(z):
    s = 1 / (1 + np.exp(-z))
    ds = s*(1-s)

    return ds


ReLU and its derivative

In [50]:
def relu(x):
    return x * (x > 0)

def derivative_relu(x):
    return 1 * (x>0)

Now let's implement the softmax function

In [51]:
#def softmax(vector):
#    e = np.exp(vector)
#    return e / e.sum()

# if z is very large, exp(z) can overflow
# solution: subtract by max
def softmax(z): #stable, so we dont have exploding issues due to exp
    z = z - np.max(z, axis=1, keepdims=True)
    exp_z = np.exp(z)
    return exp_z / np.sum(exp_z, axis=1, keepdims=True)


### Implement a fully parametrizable neural network class

## Create a NN class

In [54]:
class NeuralNetwork:

    #hidden layer (sigmoid, relu)
    #output layer (softmax)

    def __init__(self, activation_function, no_of_input_nodes, no_of_hidden_nodes, no_of_output_nodes, n_epochs,
                 lambda1,lambda2,lr,dropout, optimizer, momentum,beta1=0.9,beta2=0.999,eps=1e-8):
        self.hidden_layers = len(no_of_hidden_nodes)
        self.activation_function = activation_function
        self.no_of_input_nodes = no_of_input_nodes # as many as the dataset's features
        self.no_of_hidden_nodes = no_of_hidden_nodes # no fixed number, needs tuning
        self.no_of_output_nodes = no_of_output_nodes # as many as the output classes
        self.n_epochs = n_epochs #no. of epochs to run the NN
        self.lambda1 = lambda1 #lambda variable for L1 regularisation
        self.lambda2 = lambda2 #lambda variable for L2 regularisation
        self.learning_rate = lr #how much to update the weights
        self.z_values = [] # need to store for backprop
        self.a_values = []   # need to store for backprop, first value is the actual data
        self.dropout_masks = [] # save the masks for backprop
        self.dropout = dropout #probability for dropout
        self.weights, self.biases = self.weights_and_bias()
        
        #optimizer settings

        #---momentum SGD---
        self.optimizer = optimizer # the optimizer that will be used
        self.momentum = momentum
        self.vW = [np.zeros_like(W) for W in self.weights]  # velocity for momentum
        self.vb = [np.zeros_like(b) for b in self.biases]

        #---adam---
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.t = 0  # timestep

        self.moment1_W = [np.zeros_like(W) for W in self.weights] # moment1 weight initialization
        self.moment1_b = [np.zeros_like(b) for b in self.biases] # moment1 bias initialization

        self.moment2_W = [np.zeros_like(W) for W in self.weights] # moment2 weight initialization
        self.moment2_b = [np.zeros_like(b) for b in self.biases] # moment2 bias initialization



        

    def weights_and_bias(self):
        layers = [self.no_of_input_nodes] + self.no_of_hidden_nodes + [self.no_of_output_nodes] #e.g. [2,3,5,2]
        weights = [] #TODO: weight and bias proper initialization (xavier)
        biases = []
        for i in range(len((layers))-1):

            n_in = layers[i]
            n_out = layers[i+1]

            weights.append(2*np.random.random((n_in,n_out)) - 1)
            biases.append(np.zeros((1, n_out)))
        return weights, biases
    
    def inverted_dropout(self, x, p):
        # p = dropout probability
        mask = (np.random.rand(*x.shape) > p).astype(float)
        self.dropout_masks.append(mask)
        x_dropped = (x * mask)/(1 - p) # the actual dropout
        
        return x_dropped
    
    def forward_pass(self, X, training=True): #add training flag for dropout

        # Clear previous batch values
        self.a_values = []
        self.z_values = []
        self.dropout_masks = []

        #first hidden layer needs to have the actual data
        #all other hidden layers take the result from the previous layer
        a = X
        self.a_values.append(a) # save for backprop
        
        for layer in range(self.hidden_layers):

            W = self.weights[layer]
            b = self.biases[layer]
            z = np.dot(a, W) + b
            self.z_values.append(z) # save for backprop

            if self.activation_function =='sigmoid':
                a = sigmoid(z)
            elif self.activation_function =='relu':
                a = relu(z)
            #print(f'Shape of layer: {a.shape}')

            #implementing inverted dropout
            if training: # In testing, dropout is not applied
                a = self.inverted_dropout(a, self.dropout) 

            self.a_values.append(a) # save for backprop

        #---Output Layer---
        W = self.weights[-1]
        b = self.biases[-1]
        z = np.dot(a, W) + b
        self.z_values.append(z) # save for backprop
        a = softmax(z)
        self.a_values.append(a) # save for backprop
        #print(f'Shape of output layer: {a.shape}')
        return a
        

    def backward_pass(self, X_train, y_true):
        
        N = y_true.shape[0]

        # We convert labels to one-hot so that they match the shape of y_true
        y_one_hot = np.zeros((N, self.no_of_output_nodes))
        y_one_hot[np.arange(N), y_true] = 1

        # make empty lists for gradients
        dW = [None] * len(self.weights)
        db = [None] * len(self.biases)

        # --- Output layer ---
        delta = self.a_values[-1] - y_one_hot  # Derivative of loss: softmax + CE derivative (a(L) - y)
        
        dW[-1] = self.a_values[-2].T.dot(delta) / N

        dW[-1] += self.lambda1 * np.sign(self.weights[-1])   # L1 gradient
        dW[-1] += 2 * self.lambda2 * self.weights[-1]       # L2 gradient


        db[-1] = np.mean(delta, axis=0, keepdims=True) #gradient of bias

        #self.weights[-1] -= self.learning_rate * dW[-1] # update the weight
        #self.biases[-1] -= self.learning_rate * db[-1] # update the bias

        # --- Hidden layers ---
        for layer in range(self.hidden_layers - 1, -1, -1):

            # Backprop error
            delta = delta.dot(self.weights[layer + 1].T)

            #use dropout
            delta *= self.dropout_masks[layer]

            if self.activation_function =='sigmoid':
                delta *= derivative_sigmoid(self.z_values[layer])
            elif self.activation_function =='relu':
                delta *= derivative_relu(self.z_values[layer])
            
            a_prev = X_train if layer == 0 else self.a_values[layer]
            dW[layer] = a_prev.T.dot(delta) / N 
            db[layer] = np.mean(delta, axis=0, keepdims=True) #gradient of bias

            dW[layer] += self.lambda1 * np.sign(self.weights[layer])   # L1 gradient
            dW[layer] += 2 * self.lambda2 * self.weights[layer]       # L2 gradient

            
            #self.weights[layer] -= self.learning_rate * dW[layer] # update the weight
            #self.biases[layer] -= self.learning_rate * db[layer] # update the bias

        self.choose_optimizer(dW, db)




    def compute_loss(self, y_pred, y_true):
        '''
        Cross entropy loss for multi-class classification with L1 and L2 regularization
        '''

        # number of samples
        N = y_true.shape[0]
        
        correct_probs = y_pred[np.arange(N), y_true] #TODO: change to binary formula if dataset is binary
    
        # loss = average of -log(p) whre p is the predicted probability of the correct class
        loss = -np.sum(np.log(correct_probs)) / N

        l1_loss = self.lambda1 * sum(np.sum(np.abs(W)) for W in self.weights)
        l2_loss = self.lambda2 * sum(np.sum(W**2) for W in self.weights)


        return loss + l1_loss + l2_loss
    
    def choose_optimizer(self, dW, db): # function to call the respective optimizer method to be used
        if self.optimizer == "momentum":
            self.momentum_update(dW, db)
        elif self.optimizer == "adam":
            self.adam_update(dW, db)
        else:
            raise ValueError("Unsupported optimizer")
        
    def momentum_update(self,dW,db):
        for i in range(len(self.weights)):
            self.vW[i] = self.momentum * self.vW[i] + (1 - self.momentum) * dW[i]
            self.vb[i] = self.momentum * self.vb[i] + (1 - self.momentum) * db[i]
            self.weights[i] -= self.learning_rate * self.vW[i]
            self.biases[i]  -= self.learning_rate * self.vb[i]

    def adam_update(self,dW,db):
        self.t += 1
        for i in range(len(self.weights)):
            self.moment1_W[i] = self.beta1 * self.moment1_W[i] + (1 - self.beta1) * dW[i]
            self.moment1_b[i] = self.beta1 * self.moment1_b[i] + (1 - self.beta1) * db[i]
            self.moment2_W[i] = self.beta2 * self.moment2_W[i] + (1 - self.beta2) * (dW[i] ** 2)
            self.moment2_b[i] = self.beta2 * self.moment2_b[i] + (1 - self.beta2) * (db[i] ** 2)
            
            mW_hat = self.moment1_W[i] / (1 - self.beta1 ** self.t)
            vW_hat = self.moment2_W[i] / (1 - self.beta2 ** self.t)
            mb_hat = self.moment1_b[i] / (1 - self.beta1 ** self.t)
            vb_hat = self.moment2_b[i] / (1 - self.beta2 ** self.t)
            
            self.weights[i] -= self.learning_rate * mW_hat / (np.sqrt(vW_hat) + self.eps)
            self.biases[i]  -= self.learning_rate * mb_hat / (np.sqrt(vb_hat) + self.eps)


    




In [58]:
#Param tuning:
#lambda1 and lambda 2 can be from 0 to 1

nn = NeuralNetwork(
    activation_function='sigmoid',
    no_of_input_nodes=X_train.shape[1], # input features
    no_of_hidden_nodes=[32,64,64],
    no_of_output_nodes=len(np.unique(y_train)),
    n_epochs=100,
    lambda1=0.1,
    lambda2=0.1,
    lr=0.01,
    dropout=0.5,
    optimizer="adam",
    momentum=0.9
    )

for epoch in range(nn.n_epochs):
    # Forward pass
    y_pred = nn.forward_pass(X_train, training=True)

    # Loss function
    loss = nn.compute_loss(y_pred, y_train)

    # Backward pass
    nn.backward_pass(X_train, y_train)

    if epoch % 5 == 0:
        y_pred_labels = np.argmax(y_pred, axis=1)
        accuracy = np.mean(y_pred_labels == y_train)
        print(f"Epoch {epoch}, Loss: {loss:.4f}, Accuracy: {accuracy:.4f}")




IndexError: index 7 is out of bounds for axis 1 with size 3

| Parameter       | Description                                        | Where it is used                                                     |
| --------------- | -------------------------------------------------- | -------------------------------------------------------------------- |
| `learning_rate` | Step size for gradient descent                     | Used in weight update: `W -= learning_rate * grad_W`                 |
| `update_rule`   | Optimization method (`sgd`, `momentum`, `adam`)    | Determines how gradients are applied to weights                      |
| `decay`         | Learning rate decay per epoch                      | `learning_rate *= (1 / (1 + decay * epoch))`                         |
| `epochs`        | Number of times to iterate over the entire dataset | Controls the main training loop                                      |
| `batch_size`    | Number of samples per mini-batch                   | Used to split data into mini-batches for stochastic gradient descent |
| `momentum`      | Momentum coefficient (if using momentum optimizer) | Used in update: `v = beta*v + (1-beta)*grad`                         |
