In [4]:
import os

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import cross_val_score
from csv import reader
from random import randrange
from math import sqrt

"""
  Modelagem e Avaliação

  Utilizar métricas de avaliação, como precisão, recall e F1-score, para
  identificar a melhor configuração.
"""


# Load a CSV file
def load_csv(filename):
    dataset = list()

    with open(filename, "r") as file:
        csv_reader = reader(file)
        for row in csv_reader:
            if not row:
                continue
            dataset.append(row)
    return dataset


# Convert string column to float
def str_column_to_float(dataset, column):
    for row in dataset:
        row[column] = float(row[column].strip())


# Convert string column to integer
def str_column_to_int(dataset, column):
    class_values = [row[column] for row in dataset]
    unique = set(class_values)
    lookup = dict()
    for i, value in enumerate(unique):
        lookup[value] = i
    for row in dataset:
        row[column] = lookup[row[column]]
    return lookup


# Split a dataset into k folds
def cross_validation_split(dataset, n_folds):
    dataset_split = list()
    dataset_copy = list(dataset)
    fold_size = int(len(dataset) / n_folds)

    for i in range(n_folds):
        fold = list()
        while len(fold) < fold_size:
            index = randrange(len(dataset_copy))
            fold.append(dataset_copy.pop(index))
        dataset_split.append(fold)

    return dataset_split


# Calculate accuracy percentage
def accuracy_metric(actual, predicted):
    correct = 0
    for i in range(len(actual)):
        if actual[i] == predicted[i]:
            correct += 1
    return correct / float(len(actual)) * 100.0


# Evaluate an algorithm using a cross validation split
def evaluate_algorithm(dataset, algorithm, n_folds, *args):
    folds = cross_validation_split(dataset, n_folds)
    scores = {"accuracy": [], "precision": [], "recall": [], "f1_score": []}

    for i in range(n_folds):

        # Prepare train, validation and test sets
        train_set = sum(folds[:i] + folds[i + 1 :], [])
        valid_set = folds[i][: len(folds[i]) // 2]
        test_set = folds[i][len(folds[i]) // 2 :]

        # Train the model
        model = algorithm(train_set, valid_set, *args)

        # Predict on train, validation and test sets
        train_actual = [row[-1] for row in train_set]
        train_predicted = [predict(model, row[:-1]) for row in train_set]

        valid_actual = [row[-1] for row in valid_set]
        valid_predicted = [predict(model, row[:-1]) for row in valid_set]

        test_actual = [row[-1] for row in test_set]
        test_predicted = [predict(model, row[:-1]) for row in test_set]

        # Calculate metrics for train, validation and test sets
        train_accuracy = accuracy_metric(train_actual, train_predicted)
        train_precision = precision_metric(train_actual, train_predicted)
        train_recall = recall_metric(train_actual, train_predicted)
        train_f1 = f1_score_metric(train_actual, train_predicted)

        valid_accuracy = accuracy_metric(valid_actual, valid_predicted)
        valid_precision = precision_metric(valid_actual, valid_predicted)
        valid_recall = recall_metric(valid_actual, valid_predicted)
        valid_f1 = f1_score_metric(valid_actual, valid_predicted)

        test_accuracy = accuracy_metric(test_actual, test_predicted)
        test_precision = precision_metric(test_actual, test_predicted)
        test_recall = recall_metric(test_actual, test_predicted)
        test_f1 = f1_score_metric(test_actual, test_predicted)

        scores["accuracy"].append(
            {"train": train_accuracy, "valid": valid_accuracy, "test": test_accuracy}
        )
        scores["precision"].append(
            {"train": train_precision, "valid": valid_precision, "test": test_precision}
        )
        scores["recall"].append(
            {"train": train_recall, "valid": valid_recall, "test": test_recall}
        )
        scores["f1_score"].append(
            {"train": train_f1, "valid": valid_f1, "test": test_f1}
        )

    return scores


# calculate the Euclidean distance between two vectors
def euclidean_distance(row1, row2):
    distance = 0.0
    for i in range(len(row1) - 1):
        distance += (row1[i] - row2[i]) ** 2
    return sqrt(distance)


# Locate the best matching unit
def get_best_matching_unit(codebooks, test_row):
    distances = list()
    for codebook in codebooks:
        dist = euclidean_distance(codebook, test_row)
        distances.append((codebook, dist))
    distances.sort(key=lambda tup: tup[1])
    return distances[0][0]


# Make a prediction with codebook vectors
def predict(codebooks, test_row):
    bmu = get_best_matching_unit(codebooks, test_row)
    return bmu[-1]


# Create a random codebook vector
def random_codebook(train):
    n_records = len(train)
    n_features = len(train[0])
    codebook = [train[randrange(n_records)][i] for i in range(n_features)]
    return codebook


# Train a set of codebook vectors
def train_codebooks(train, n_codebooks, lrate, epochs):
    codebooks = [random_codebook(train) for _ in range(n_codebooks)]
    for epoch in range(epochs):
        rate = lrate * (1.0 - (epoch / float(epochs)))
        for row in train:
            bmu = get_best_matching_unit(codebooks, row)
            for i in range(len(row) - 1):
                error = row[i] - bmu[i]
                if bmu[-1] == row[-1]:
                    bmu[i] += rate * error
                else:
                    bmu[i] -= rate * error
    return codebooks


# LVQ Algorithm
def learning_vector_quantization(train, test, n_codebooks, lrate, epochs):
    codebooks = train_codebooks(train, n_codebooks, lrate, epochs)
    return codebooks


# Update precision metric
def precision_metric(actual, predicted):
    return precision_score(actual, predicted, average="weighted")


# Update recall metric
def recall_metric(actual, predicted):
    return recall_score(actual, predicted, average="weighted")


# Update F1 score metric
def f1_score_metric(actual, predicted):
    return f1_score(actual, predicted, average="weighted")


# load and prepare data
current_dir = os.getcwd()
filename = os.path.join(current_dir, "../dataset/spotify_dataset.csv")
dataset = load_csv(filename)

for i in range(len(dataset[0]) - 1):
    str_column_to_float(dataset, i)

# convert class column to integers
str_column_to_int(dataset, len(dataset[0]) - 1)

# Evaluate algorithm
n_folds = 5
learn_rate = 0.3
n_epochs = 50
n_codebooks = 15

scores = evaluate_algorithm(
    dataset, learning_vector_quantization, n_folds, n_codebooks, learn_rate, n_epochs
)

metrics = ["accuracy", "precision", "recall", "f1_score"]

for metric in metrics:
    print(f"{metric.capitalize()} scores:")
    mean_train = sum(score["train"] for score in scores[metric]) / n_folds
    mean_valid = sum(score["valid"] for score in scores[metric]) / n_folds
    mean_test = sum(score["test"] for score in scores[metric]) / n_folds
    print(
        f"Mean Train: {mean_train:.3f}, Mean Valid: {mean_valid:.3f}, Mean Test: {mean_test:.3f}"
    )

KeyboardInterrupt: 