<a href="https://colab.research.google.com/github/martadftese/hello-world/blob/master/code_first_assignment_MC886.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# ================================== TestLinearRegression.py ======================================

import unittest
from sklearn import datasets, linear_model
from sklearn.metrics import mean_squared_error, r2_score
from LinearRegression import LinearRegressionLearner
from matplotlib import pyplot
import numpy as np
import Utils
import math

class LinearRegressionLearnerTestCase(unittest.TestCase):
    def load_diabetes_data(self):
        diabetes = datasets.load_diabetes()

        # Split the data into training/testing sets
        self.X_Train = diabetes.data[:-20]
        self.X_Test = diabetes.data[-20:]

        # Split the targets into training/testing sets
        self.Y_Train = diabetes.target[:-20]
        self.Y_Test = diabetes.target[-20:]


    def load_metro_traffic_data(self):
        self.X_Train, self.Y_Train, self.X_Test, self.Y_Test = \
            Utils.load_and_hot_encode('Metro_Interstate_Traffic_Volume.csv',
                                      'Metro_Interstate_Traffic_Volume_formatted.csv',
                                      'Metro_Interstate_Traffic_Volume_formatted_headers.csv')


    def test_learning_dataset(self):
        self.load_metro_traffic_data()
        # self.load_diabetes_data()
        number_of_iterations = 1000
        learning_rate = 0.1
        batch_size = int(len(self.X_Test)/4)

        print('X_Train shape: ', np.shape(self.X_Train))
        print('Y_Train shape: ', np.shape(self.Y_Train))
        print('X_Test shape: ', np.shape(self.X_Test))
        print('Y_Test shape: ', np.shape(self.Y_Test))

        regr = linear_model.SGDRegressor(max_iter=number_of_iterations)
        regr.fit(self.X_Train, self.Y_Train)
        lib_y_pred = regr.predict(self.X_Test)

        print("=============================== LIB =================================")
        print('Coefficients: \n', regr.coef_)
        lib_error = math.sqrt(mean_squared_error(self.Y_Test, lib_y_pred))
        print("Root Mean squared error: %.2f" % lib_error)
        print('Variance score: %.2f' % r2_score(self.Y_Test, lib_y_pred))

        learner = LinearRegressionLearner()
        learner.set_normalize_data(False)
        costs = learner.fit(self.X_Train, self.Y_Train, number_of_iterations, learning_rate, batch_size)
        my_y_pred_test = [learner.predict(example) for example in self.X_Test]
        my_y_pred_train = [learner.predict(example) for example in self.X_Train]

        pyplot.plot(range(int(len(costs)/4)), costs[:int(len(costs)/4)])
        pyplot.xlabel("Number of iterations")
        pyplot.ylabel("Cost")
        pyplot.show()

        print("=============================== MY ===================================")
        print('Coefficients: %s' % learner.parameters)
        # Results test
        my_error_test = math.sqrt(mean_squared_error(self.Y_Test, my_y_pred_test))
        print("Test Root Mean squared error: %.2f" % my_error_test)
        print('Test Variance score: %.2f' % r2_score(self.Y_Test, my_y_pred_test))
        # Results train
        my_error_train = math.sqrt(mean_squared_error(self.Y_Train, my_y_pred_train))
        print("Train Root Mean squared error: %.2f" % my_error_train)
        print('Train Variance score: %.2f' % r2_score(self.Y_Train, my_y_pred_train))

        self.assertTrue(True)

if __name__ == '__main__':
    unittest.main()

# ==================================== LineaRegression.py ======================================

import numpy as np


class LinearRegressionLearner:
    def __init__(self):
        self.learning_rate = 0
        self.number_of_parameters = 0
        self.parameters = np.array([])
        self.number_of_features = 0
        self.normalization_means = np.array([])
        self.normalization_deviations = np.array([])
        self.normalize_data = False
        self.number_of_examples = 0
        self.batch_size = 0
        self.inputs = np.array([])
        self.number_of_inputs = 0

    """
        The data should be an row vector
        this returns the data normalized using x(i) = (x(i) - mean / std_deviation)
    """
    def normalize(self, data):
        return (data - self.normalization_means) / self.normalization_deviations

    def set_normalize_data(self, should_normalize):
        self.normalize_data = should_normalize

    def find_normalization_params(self, inputs):
        self.normalization_means = np.mean(inputs, axis=0)
        self.normalization_deviations = np.std(inputs, axis=0)

    def adapt_inputs(self):
        if self.normalize_data:
            self.find_normalization_params(self.inputs)
            self.inputs = np.array([self.normalize(example) for example in self.inputs])
        self.inputs = np.c_[np.ones((self.number_of_examples, 1)), self.inputs]

    def save_algorithm_params(self, inputs, batch_size, learning_rate):
        self.inputs = inputs
        self.number_of_examples, self.number_of_features = np.shape(inputs)
        if batch_size == -1:
            self.batch_size = self.number_of_examples
        else:
            self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.number_of_parameters = self.number_of_features + 1
        self.parameters = np.random.rand(self.number_of_parameters, )

    def fit(self, inputs, outputs, number_of_iterations, learning_rate, batch_size=-1):
        self.save_algorithm_params(inputs, batch_size, learning_rate)
        self.adapt_inputs()
        cost_per_iteration = []
        for iteration in range(number_of_iterations):
            permutation = np.random.permutation(self.batch_size)
            input_batch = self.inputs[permutation]
            output_batch = outputs[permutation]
            self.parameters = self.parameters - self.learning_rate * input_batch.T.dot(input_batch.dot(self.parameters) - output_batch) / self.batch_size
            #cost_per_iteration.append(self.current_cost(input_batch, output_batch))
        return cost_per_iteration

    def current_cost(self, inputs, outputs):
        return ((inputs.dot(self.parameters) - outputs).sum())**2 / (2 * len(inputs))

    def predict(self, input):
        if self.normalize_data:
            input = self.normalize(input)
        return self.parameters[0] + self.parameters[1:].T.dot(input)

# ========================================= Utils.py =============================================

import numpy as np
import re
import pandas as pd
import imageio
import os


def apply_hot_encoding(data, headers):
    encoded_data = np.empty((1, len(data)))
    new_headers = np.array([])
    for column, header in zip(data.T, headers):
        is_category = isinstance(column[0], str)
        if not is_category:
            encoded_data = np.vstack((encoded_data, column))
            new_headers = np.append(new_headers, header)
        elif re.match('([0-9]{4})-([0-9]{2})-([0-9]{2}) (.*)', column[0]):
            dates = np.array([pd.Timestamp(date_str) for date_str in column])
            # Hot encode week day i.e. monday, thursday, wednesday, tuesday, friday, saturday, sunday
            days_of_week = np.array(['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'])
            date_column = np.array([date.day_name() for date in dates])
            encoded_data, new_headers = encode_categories(encoded_data, new_headers, days_of_week, date_column)
            # Hot encode hour of day
            hours_of_day_column = np.array([hour_of_day(date) for date in dates])
            hours_of_day = np.array(['%dth' % hour for hour in range(24)])
            encoded_data, new_headers = encode_categories(encoded_data, new_headers,
                                                          hours_of_day, hours_of_day_column)
        else:
            encoded_data, new_headers = encode_categories(encoded_data, new_headers,
                                                          np.unique(column), column)

    return encoded_data[1:].T, new_headers


def encode_categories(encoded_data, new_headers, categories, data):
    for category in categories:
        new_column = np.array([int(category == value) for value in data])
        encoded_data = np.vstack((encoded_data, new_column))
        new_headers = np.append(new_headers, category)
    return encoded_data, new_headers


def part_of_day(date):
    if 0 <= date.hour < 2:
        return 'Early Dawn'
    elif 3 <= date.hour < 5:
        return 'Late Dawn'
    elif 5 <= date.hour < 9:
        return 'Early Morning'
    elif 10 <= date.hour < 12:
        return 'Late Morning'
    elif 12 <= date.hour < 15:
        return 'Early Afternoon'
    elif 15 <= date.hour < 19:
        return 'Late Afternoon'
    elif 19 <= date.hour < 22:
        return 'Early Night'
    elif 22 <= date.hour <= 23:
        return 'Late Night'

def hour_of_day(date):
    return '%dth' % date.hour


def generate_gif_for_path(path=os.getcwd(), name='movie', duration=0.04):
    image_folder = os.fsencode(path)
    filenames = []
    for file in os.listdir(image_folder):
        filename = os.fsdecode(file)
        if filename.endswith( ('.jpeg', '.png', '.gif') ):
            filenames.append(filename)

    filenames.sort() # this iteration technique has no built in order, so sort the frames
    images = list(map(lambda filename: imageio.imread(filename), filenames))
    imageio.mimsave(os.path.join('%s.gif' % name), images, duration = duration) # modify duration as needed

def isWeekDay(data):
    return not isWeekendDay(data)

def isWeekendDay(data):
    date = pd.Timestamp(data[len(data) - 2])
    return data[0] != 'None' or date.day_name() == 'Saturday' or date.day_name() == 'Sunday'

def load_and_hot_encode(file_path, output_path, headers_path):
    data = pd.read_csv(file_path)
    data_array = data.to_numpy()
    data_array = np.array(list(filter(isWeekDay, data_array)))
    np.random.shuffle(data_array)
    data_rows, data_columns = np.shape(data_array)
    formatted_data, headers = apply_hot_encoding(data_array[:, :data_columns - 1], data.head())
    np.savetxt(output_path, formatted_data, delimiter=',', fmt='%1.2f')
    # Save headers
    np.savetxt(headers_path, headers, delimiter=',', fmt='%s')
    # Check params
    formatted_data_rows, formatted_data_columns = np.shape(formatted_data)
    assert (formatted_data_rows == data_rows)
    # Prepare separation
    percentage_of_training_data = 0.90
    # Separate training data
    x_train = formatted_data[:int(data_rows * percentage_of_training_data), :formatted_data_columns]
    y_train = np.ravel(
        data_array[:int(data_rows * percentage_of_training_data), data_columns - 1:data_columns])
    np.savetxt("Training_X.csv", x_train, delimiter=',', fmt='%1.2f')
    np.savetxt("Training_Y.csv", y_train, delimiter=',', fmt='%1.2f')
    # Separate test data
    x_test = formatted_data[int(data_rows * percentage_of_training_data):, :formatted_data_columns]
    y_test = np.ravel(
        data_array[int(data_rows * percentage_of_training_data):, data_columns - 1:data_columns])
    np.savetxt("Test_X.csv", x_test, delimiter=',', fmt='%1.2f')
    np.savetxt("Test_Y.csv", y_test, delimiter=',', fmt='%1.2f')
    return (x_train, y_train, x_test, y_test)


ModuleNotFoundError: ignored