In [5]:

import sys
from enum import Enum, unique
from os import listdir
import logging
import json

from sklearn.metrics import classification_report, mean_squared_error, precision_score, confusion_matrix, accuracy_score
from abc import ABCMeta
from sklearn.neural_network import MLPClassifier


from sklearn import svm
from sklearn.model_selection import PredefinedSplit
from sklearn.linear_model import LogisticRegression as LogisticRegressionSKlearn

logger = logging.getLogger("classifiers")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)


import math
import pickle
from numbers import Number
from typing import Union
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from skimage.transform import rescale
from pathlib import Path
import os

from sklearn.base import is_classifier, clone
from sklearn.metrics import check_scoring
from sklearn.model_selection import learning_curve, check_cv
from sklearn.model_selection._validation import _fit_and_score
from sklearn.utils import indexable, Parallel, delayed


def read_file(path_file: str, shuffle=False) -> (np.ndarray, np.ndarray):
    """Function to read datafile and returns a tuple with the following format: (X, y). X represents all the features
        and y represents all the outputs for each data example
    """
    data = pd.read_csv(path_file, header=None, skiprows=1).values
    if shuffle:
        np.random.shuffle(data)

    return data[:, 0:], data[:, 0].reshape(data.shape[0], 1)


def represent_data_graphically(data: np.ndarray, file_save: str, rows: int = 10, cols: int = 10):
    data_image_size = int(math.sqrt(len(data[0, :])))
    data_len = len(data)

    fig, axis = plt.subplots(rows, cols, figsize=(data_image_size, data_image_size))
    for row in range(rows):
        for col in range(cols):
            example_id = np.random.randint(data_len)
            axis[row, col].imshow(data[example_id, :].reshape(data_image_size, data_image_size, order="F"))
    plt.savefig(file_save)


def sigmoid(z: Union[Number, np.ndarray]) -> np.ndarray:
    """return the sigmoid of z
    """
    return .5 * (1 + np.tanh(.5 * z))


# Computes the gradient of sigmoid function
def sigmoid_gradient(z):
    """computes the gradient of the sigmoid function
    """
    sigmoid_val = sigmoid(z)
    return sigmoid_val * (1 - sigmoid_val)


def rescale_image(data, factor):
    data_size = int(data.shape[0] ** 0.5)
    img = rescale(data.reshape(data_size, data_size), factor, mode='reflect')
    x = img.shape[0] ** 2
    return img.reshape(x, 1).ravel()


def rescale_dataset(dataset, factor=0.75):
    rescaled_data = []
    for img in dataset:
        rescaled_data.append(rescale_image(img, factor))
    return np.asarray(rescaled_data)


def plot_image(data):
    data_size = int(data.shape[0] ** 0.5)
    img = data.reshape(data_size, data_size)
    plt.imshow(img)
    plt.show()


def save_object(obj, file_name):
    base_file_name = os.path.basename(file_name)
    Path(file_name.replace(base_file_name, '')).mkdir(parents=True, exist_ok=True)
    with open(file_name, 'wb') as file:
        pickle.dump(obj, file)


def convert_image(data):
    k = np.where(data * 255 > 128, 1, 0)
    return k


def validation_curve(estimator, X, y, param_name, param_range, groups=None,
                     cv=None, scoring=None, n_jobs=None, pre_dispatch="all",
                     verbose=0, error_score=np.nan):
    X, y, groups = indexable(X, y, groups)

    cv = check_cv(cv, y, classifier=is_classifier(estimator))
    scorer = check_scoring(estimator, scoring=scoring)

    parallel = Parallel(n_jobs=n_jobs, pre_dispatch=pre_dispatch,
                        verbose=verbose)
    out = parallel(delayed(_fit_and_score)(
        clone(estimator), X, y, scorer, train, test, verbose,
        parameters={param_name: v}, fit_params=None, return_train_score=True,
        error_score=error_score, return_estimator=True, return_times=True)
                   # NOTE do not change order of iteration to allow one time cv splitters
                   for train, test in cv.split(X, y, groups) for v in param_range)

    out = np.asarray(out)
    estimators = out[:, 4]
    out_scores = np.asarray(out[:, :2])
    fit_time = out[:, 2]
    score_time = out[:, 3]
    n_params = len(param_range)
    n_cv_folds = out_scores.shape[0] // n_params
    out_scores = out_scores.reshape(n_cv_folds, n_params, 2).transpose((2, 1, 0))

    return estimators, np.float64(out_scores[0]), np.float64(out_scores[1]), np.float64(fit_time), \
           np.float64(score_time)


def plot_validation_curve(train_scores, test_scores, title, xlabel, ylabel, param_range):
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.ylim(0.0, 1.1)

    plt.semilogx(param_range, train_scores, label="Training Score", color="blue", marker="o")
    plt.semilogx(param_range, test_scores, label="Cross-validation score", color="orange", marker="o")

    plt.legend(loc="best")
    plt.show()


def plot_time_per_parameter(fit_times, score_times, title, xlabel, ylabel, param_range):
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)

    plt.semilogx(param_range, fit_times, label="Fitting times", color="blue", marker="o")
    plt.semilogx(param_range, score_times, label="Scoring times", color="orange", marker="o")

    plt.legend(loc="best")
    plt.show()


def plot_test_accuracy(x_data, y_data, title, xlabel, ylabel):
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    
    plt.semilogx(x_data, y_data, label="Test set accuracy", color="orange", marker="o")

    plt.legend(loc="best")
    plt.show()

def get_classifiers(path):
    folders = [f for f in listdir(path)]
    classifiers = {}

    for folder_name in folders:
        folder_path = f"{path}/{folder_name}"
        if folder_name == 'others':
            continue
        classifiers[folder_name] = []
        for file_name in [f for f in listdir(folder_path)]:
            file_path = f"{folder_path}/{file_name}"

            with open(file_path, 'rb') as output:
                classifier = pickle.load(output)
                classifiers[folder_name].append(classifier)

    return classifiers


def save_best_classifiers(classifiers_list, file_name='best_classifiers'):
    best = sorted(classifiers_list, reverse=True,
                  key=lambda c: (c.params[str(Label.CV)], c.params[str(Label.TRAIN)], c.params[str(Label.TEST)]))[0]

    save_object(best, f'{file_name}/{best.name}/{best.variation_param}')

    return best


@unique
class Label(Enum):
    TEST = "accuracy train set"
    CV = "accuracy cv set"
    TRAIN = "accuracy test set"

    def __str__(self):
        return self.value


class Classifier(metaclass=ABCMeta):
    def __init__(self, name, classifier, X: np.ndarray, y: np.ndarray, variation_param=None):
        self.name = name
        self.classifier = classifier
        self.params = {}
        self.variation_param = variation_param

        self.X: np.ndarray = X
        self.y: np.ndarray = y

        self.history = None

        self.train_scores = None
        self.valid_scores = None

    def __train_model(self, x, y):
        logger.info("Training model...")
        return self.classifier.fit(x, y)

    def predict(self, x):
        logger.info("Predicting...")
        return self.classifier.predict(x)

    def error(self, x, y):
        logger.info(f"Calculating error")
        return mean_squared_error(y, self.predict(x)) / 2

    def set_new_number_iter(self, iterations):
        self.classifier.max_iter = iterations

    def train(self, from_previous=False):
        if from_previous:
            self.classifier.warm_start = from_previous

        logger.info(f"Starting train: {self.name}")
        self.history = self.__train_model(self.X, self.y)

    def save_classifier(self, file_name="classifier"):
        save_object(self, file_name)

    def save_history(self, file_name="history"):
        save_object(self.history, file_name)

    def generate_report(self, X, y):
        return classification_report(y_true=y, y_pred=self.predict(X))

    def precision(self, X, y, average=None):
        return precision_score(y_true=y, y_pred=self.predict(X), average=average, zero_division=1)

    def accuracy(self, X, y, label='accuracy'):
        self.params[label] = accuracy_score(y_true=y, y_pred=self.predict(X))
        return self.params[label]

    def confusion_matrix(self, X, y, label='confusion_matrix'):
        self.params[label] = confusion_matrix(y_true=y, y_pred=self.predict(X))
        return self.params[label]

    def update_params(self, **kwargs):
        for key, value in kwargs.items():
            self.params[key] = value
        logger.info(f"Params updated with {kwargs}")
        return self.params

    def save_report(self, file_name="report.json"):
        with open(file_name, 'w') as file:
            file.write(json.dumps(self.generate_report()))
        logger.info(f"Report saved into file: {file_name}")
    
    def __repr__(self):
        return self.__str__()

    def __str__(self):
        return f"Classifier : {self.name} ->  {self.params} -> Best value for: {self.variation_param}\n"


class PolynomialSvm(Classifier):
    def __init__(self, classifier, X, y, variation_param):
        self.X = X
        self.y = y
        # self.C = C
        # self.degree = degree
        self.classifier = classifier
        self.variation_param = variation_param
        super().__init__(self.__class__.__name__, classifier, self.X, self.y, self.variation_param)

    def save_classifier(self, file_name=None):
        super().save_classifier(
            file_name if file_name is not None else f'classifiers/{self.name}_{self.variation_param}/'
                                                    f'{eval(f"self.classifier.{self.variation_param}")}.classifier')

    def __str__(self):
        return super().__str__()  # + f"C->{self.C}\tdegree->{self.degree}\n"


class NeuralNetwork(Classifier):
    def __init__(self, X, y, alpha, Lambda, hidden_layer_sizes, iterations, activation, batch_size, solver="sgd",
                 variation_param=None, verbose=False):
        self.alpha = alpha
        self.hidden_layer_sizes = hidden_layer_sizes
        self.max_iter = iterations
        self.variation_param = variation_param
        super().__init__(self.__class__.__name__,
                         MLPClassifier(alpha=Lambda, learning_rate_init=alpha, activation=activation,
                                       hidden_layer_sizes=self.hidden_layer_sizes, solver=solver,
                                       max_iter=iterations, verbose=verbose, n_iter_no_change=10,
                                       batch_size=batch_size),
                         X, y, self.variation_param)

    def save_classifier(self, file_name=None):
        super().save_classifier(
            file_name if file_name is not None else f'classifiers/{self.name}_alpha_{self.alpha}_'
                                                    f'hidden_size_{self.hidden_layer_sizes}_max_iter_{self.max_iter}')

    def __str__(self):
        return super().__str__() + f"alpha->{self.alpha}\thidden_layer_sizes->{self.hidden_layer_sizes}\tmax_iter->{self.max_iter}\n"


class LogisticRegression(Classifier):
    def __init__(self, classifier, X, y, variation_param):
        self.X = X
        self.y = y
        # self.C = C
        # self.max_iter = max_iter
        self.variation_param = variation_param
        # LogisticRegression_sklearn(C=C, verbose=verbose, max_iter=max_iter, n_jobs=-1),
        super().__init__(self.__class__.__name__, classifier, self.X, self.y, self.variation_param)

    def save_classifier(self, file_name=None):
        super().save_classifier(
            file_name if file_name is not None else f'classifiers/{self.name}_{self.variation_param}/'
                                                    f'{eval(f"self.classifier.{self.variation_param}")}.classifier')  # _C_{self.C}_max_iter_{self.max_iter}')

    def __str__(self):
        return super().__str__()  # + f"C->{self.C}\tmax_iter->{self.max_iter}\n"




def f(filename, x, y):
    pixel_size = x.shape[-1]
    headlines = ['label']
    for i in range(1, pixel_size + 1):
        headlines.append(f'pixel{i}')

    np.savetxt(filename, np.c_[y, x], delimiter=',', header=','.join(headlines))


def set_validation_score_and_curve(classifier, x_train, y_train, x_cv, y_cv, x_test, y_test, parameter,
                                   parameter_values, classifier_class):
    data_x, data_y = np.concatenate((x_train, x_cv)), np.concatenate((y_train, y_cv))

    train_indices = np.full((x_train.shape[0],), -1, dtype=int)
    cv_indices = np.full((x_cv.shape[0],), 0, dtype=int)
    ps = PredefinedSplit(np.append(train_indices, cv_indices))

    estimators_svm, train_scores_svm, valid_scores_svm, fit_times, score_times = validation_curve(
        classifier, data_x, data_y.ravel(), parameter, parameter_values, cv=ps, n_jobs=-1)

    for i in range(estimators_svm.shape[0]):
        classifier = estimators_svm[i]
        train_score = train_scores_svm[i]
        valid_score = valid_scores_svm[i]
        fit_time = fit_times[i]
        score_time = score_times[i]

        c: Classifier = eval(classifier_class)
        c.update_params(train_score=train_score, valid_score=valid_score,
                        fit_time=fit_time, score_time=score_time)

        c.accuracy(x_test, y_test, "Test set Accuracy")
        c.confusion_matrix(x_cv, y_cv, "CV confusion matrix")
        c.confusion_matrix(x_test, y_test, "Test confusion matrix")
        c.save_classifier()


def main():
    import google.colab
    C = (0.001, 0.002, 0.01, 0.02, 0.1, 0.2, 1, 5, 10, 50, 100, 500, 1000)
    degrees = [2, 3, 4, 5, 6, 7, 8, 9, 10]
    alphas = (0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 50, 100, 500, 1000)
    iterations = [200, 500, 1000, 2000]

    x_train, y_train = read_file('../dataset/merged_train_set.csv')
    x_cv, y_cv = read_file('../dataset/merged_cv_set.csv')
    x_test, y_test = read_file('../dataset/merged_test_set.csv')

    x_train = x_train / 255
    x_cv = x_cv / 255
    x_test = x_test / 255

    set_validation_score_and_curve(
        svm.SVC(kernel='poly', C=C[0], probability=True, degree=degrees[0], verbose=True),
        x_train[:100], y_train[:100], x_cv[:100], y_cv[:100], x_test[:100], y_test[:100], "C", C,
        "PolynomialSvm(classifier, x_train, y_train, parameter)")

    set_validation_score_and_curve(
        LogisticRegressionSKlearn(C=C[0], verbose=True, max_iter=1000, n_jobs=-1),
        x_train[:100], y_train[:100], x_cv[:100], y_cv[:100], x_test[:100], y_test[:100], "C", C,
        "LogisticRegression(classifier, x_train, y_train, parameter)")

    classifiers = get_classifiers("classifiers")
    for classifier_name, classifier_list in classifiers.items():

        train_scores = []
        valid_scores = []
        fit_times = []
        score_times = []
        tests_accuracy = []

        for classifier in classifier_list:
            train_scores.append(classifier.params['train_score'])
            valid_scores.append(classifier.params['valid_score'])
            fit_times.append(classifier.params['fit_time'])
            score_times.append(classifier.params['score_time'])
            tests_accuracy.append(classifier.params['Test set Accuracy'])

        train_scores = np.array(train_scores)
        valid_scores = np.array(valid_scores)
        fit_times = np.array(fit_times)
        score_times = np.array(score_times)
        tests_accuracy = np.array(tests_accuracy)

        plot_validation_curve(train_scores, valid_scores, f"Validation Curve with SVM Degree:{degrees[0]}",
                              "C", "Score", C)

        plot_time_per_parameter(fit_times, score_times, "Time of fitting and scoring proccesses", "C", "Time (s)", C)
        plot_test_accuracy(C, tests_accuracy, "Test set accuracy", "C", "Accuracy")


if __name__ == '__main__':
    main()
    


ModuleNotFoundError: No module named 'google'