In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [14]:
import numpy as np  # for handling multi-dimensional array operation
import pandas as pd  # for reading data from csv 
import statsmodels.api as sm  # for finding the p-value
from sklearn.preprocessing import MinMaxScaler  # for normalization
from sklearn.model_selection import train_test_split as tts
from sklearn.metrics import accuracy_score 
from sklearn.utils import shuffle

# >> FEATURE SELECTION << #
def remove_correlated_features(X):
    corr_threshold = 0.9
    corr = X.corr()
    drop_columns = np.full(corr.shape[0], False, dtype=bool)
    for i in range(corr.shape[0]):
        for j in range(i + 1, corr.shape[0]):
            if corr.iloc[i, j] >= corr_threshold:
                drop_columns[j] = True
    columns_dropped = X.columns[drop_columns]
    X.drop(columns_dropped, axis=1, inplace=True)
    return columns_dropped
def remove_less_significant_features(X, Y):
    sl = 0.05
    regression_ols = None
    columns_dropped = np.array([])
    for itr in range(0, len(X.columns)):
        regression_ols = sm.OLS(Y, X).fit()
        max_col = regression_ols.pvalues.idxmax()
        max_val = regression_ols.pvalues.max()
        if max_val > sl:
            X.drop(max_col, axis='columns', inplace=True)
            columns_dropped = np.append(columns_dropped, [max_col])
        else:
            break
    regression_ols.summary()
    return columns_dropped

# >> MODEL TRAINING << #
def compute_cost(W, X, Y):
    # calculate hinge loss
    N = X.shape[0]
    distances = 1 - Y * (np.dot(X, W))
    distances[distances < 0] = 0  # equivalent to max(0, distance)
    hinge_loss = reg_strength * (np.sum(distances) / N)
    
    # calculate cost
    cost = 1 / 2 * np.dot(W, W) + hinge_loss
    return cost

# same function should work for vanilla and mini-batch gradient descent as well
def calculate_cost_gradient(W, X_batch, Y_batch):
    # if only one example is passed (eg. in case of SGD)
    #print(type(Y_batch))
    #print(X_batch)
    if type(Y_batch) == np.int or type(Y_batch) == np.int64 or type(Y_batch) == np.float64 or type(Y_batch) == np.float :
        Y_batch = np.array([Y_batch])
        X_batch = np.array([X_batch])  # gives multidimensional array
    
    distance = 1 - (Y_batch * np.dot(X_batch, W))
    dw = np.zeros(len(W))

    for ind, d in enumerate(distance):
        if max(0, d) == 0:
            di = W
        else:
            #print(Y_batch)
            di = W - (reg_strength * Y_batch[ind] * X_batch[ind])
        dw += di

    dw = dw/len(Y_batch)  # average
    return dw

def sgd_1(features, outputs):
    # running the loop 5000 times
    max_epochs = 5000
    weights = np.zeros(features.shape[1])
    # stochastic gradient descent
    for epoch in range(1, max_epochs): 
        # shuffle to prevent repeating update cycles
        X, Y = shuffle(features, outputs)
        for ind, x in enumerate(X):
            ascent = calculate_cost_gradient(weights, x, Y[ind])
            weights = weights - (learning_rate * ascent)
            
    return weights

#SGD with stoppage criterian
def sgd(features, outputs):
    #print("Features : ",features.shape)#620,5
    #print("\nOutputs : ",outputs.shape)#620,
    max_epochs = 5000
    weights = np.zeros(features.shape[1])
    nth = 0
    prev_cost = float("inf")
    cost_threshold = 0.01  # in percent
    # stochastic gradient descent
    for epoch in range(1, max_epochs):
        # shuffle to prevent repeating update cycles
        X, Y = shuffle(features, outputs)
        #print([ind,x] for ind,x in enumerate(X))
        for ind, x in enumerate(X):
            #print("###",str(x))
            ascent = calculate_cost_gradient(weights, x, Y[ind])
            weights = weights - (learning_rate * ascent)
        # convergence check on 2^nth epoch
        if epoch == 2 ** nth or epoch == max_epochs - 1:
            cost = compute_cost(weights, features, outputs)
            print("Epoch is:{} and Cost is: {}".format(epoch, cost))
            # stoppage criterion
            if abs(prev_cost - cost) < cost_threshold * prev_cost:
                return weights
            prev_cost = cost
            nth += 1
    return weights

def init():
    df=pd.read_csv("drive/My Drive/Final Year Research/Dataset/Oneplus 5T/data_files/rgbarray.csv")
    labels = np.load("drive/My Drive/Final Year Research/Dataset/Oneplus 5T/data_files/labeldata.npy")
    metadata=pd.read_csv('drive/My Drive/Final Year Research/Dataset/Oneplus 5T/data_files/metadata.csv')
    df['shutter_speed']=metadata['shutter_speed']
    df['brightness']=metadata['brightness']
    X = df.iloc[:, 0:]  # all rows of column 0 and ahead (features)
    Y = pd.DataFrame(labels.astype(np.float64)).loc[:, 0]  # all rows of labels

    # normalize the features using MinMaxScalar from
    # sklearn.preprocessing
    X_normalized = MinMaxScaler().fit_transform(X.values)
    X = pd.DataFrame(X_normalized)

    # first insert 1 in every row for intercept b
    X.insert(loc=len(X.columns), column='intercept', value=1)

    # test_size is the portion of data that will go into test set
    # random_state is the seed used by the random number generator
    print("splitting dataset into train and test sets...")
    X_train, X_test, y_train, y_test = tts(X, Y, test_size=0.2, random_state=42)

    # train the model
    print("training started...")
    W = sgd(X_train.to_numpy(), y_train.to_numpy())
    print("training finished.")
    print("weights are: {}".format(W))

    #After training the model using SGD we finally got the optimal weights w* which defines the best possible hyperplane separating two classes.

    # testing the model on test set
    y_test_predicted = np.array([])
    #print("W : ",X_test.values[0])
    for i in range(X_test.shape[0]):
        print(i," : ",np.dot(W, X_test.values[i]))
        yp = np.sign(np.dot(W, X_test.values[i])) #model
        y_test_predicted = np.append(y_test_predicted, yp)
    #print("accuracy on test dataset: {}".format(accuracy_score(y_test.values, y_test_predicted)))
    #print("recall on test dataset: {}".format(recall_score(y_test.values, y_test_predicted)))
    #print("precision on test dataset: {}".format(recall_score(y_test.values, y_test_predicted)))
    #print(y_test.values)

# set hyper-parameters and call init
# hyper-parameters are normally tuned using cross-validation
# but following work good enough
reg_strength = 10000 # regularization strength
learning_rate = 0.000001
init()


splitting dataset into train and test sets...
training started...
Epoch is:1 and Cost is: 0.07090397151665402
Epoch is:2 and Cost is: 0.07081610503651643
training finished.
weights are: [0.15502772 0.16383188 0.09237367 0.12552984 0.11702145 0.22972392]
0  :  0.6422899609874867
1  :  0.5631766924815695
2  :  0.6087721288384316
3  :  0.6162188859185245
4  :  0.6211655177197735
5  :  0.6209665983868273
6  :  0.5780275236783456
7  :  0.5715971428738058
8  :  0.4367751406143976
9  :  0.6540138293642745
10  :  0.6082806774951564
11  :  0.3888610380963472
12  :  0.47917719025590516
13  :  0.47039047170495396
14  :  0.6435288429685142
15  :  0.5737980532973136
16  :  0.7324747714284967
17  :  0.42595559112916076
18  :  0.5672312489830216
19  :  0.43015287596853596
20  :  0.4421191015245101
21  :  0.7029420201934785
22  :  0.4438973663810287
23  :  0.5303149414694134
24  :  0.4490787028850008
25  :  0.596459757144898
26  :  0.39311808875974297
27  :  0.4230979850010213
28  :  0.744405609518270