In [372]:
from functools import cmp_to_key

from sklearn.metrics import mean_absolute_error
from sklearn.metrics import precision_score, recall_score, accuracy_score, confusion_matrix
from sklearn.model_selection import learning_curve
from sklearn.model_selection import cross_validate
from sklearn.model_selection import LearningCurveDisplay

from sklearn.tree import DecisionTreeClassifier
# set matplotlib backend to inline
%matplotlib inline

# import modules
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import f1_score
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.utils.multiclass import unique_labels
from numpy import int64
from sklearn.utils import check_X_y, check_array
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.estimator_checks import check_estimator
from sklearn.feature_selection import f_classif


In [373]:
def read_nhl_data():
    _nhl_data = pd.read_csv('moneypack/all_teams.csv')
    _nhl_data = _nhl_data.dropna()

    columns_to_drop = [
        'name', 'gameId', 'playerTeam',
        'opposingTeam', 'home_or_away',
        'gameDate', 'position',
        'situation', 'iceTime',
        #'playoffGame'
    ]

    _nhl_data = _nhl_data.sort_values(by=['team', 'season'])
    _nhl_data = _nhl_data.drop(columns=columns_to_drop)
    _nhl_data = _nhl_data.groupby(['team', 'season'], as_index=False).mean()
    _nhl_data['playoff_qualified'] = _nhl_data['playoffGame'].apply(lambda val: 1 if val > 0 else 0)
    _nhl_data = _nhl_data.drop(columns=['playoffGame'])

    return _nhl_data


def read_nhl_data_2023():
    _nhl_data_2023 = pd.read_csv('moneypack/teams_2023.csv')
    columns_to_drop_from_2023 = ['name', 'position', 'situation', 'iceTime', 'team.1', 'games_played']

    _nhl_data_2023 = _nhl_data_2023.dropna()
    _nhl_data_2023 = _nhl_data_2023.drop(columns=columns_to_drop_from_2023)
    _nhl_data_2023 = _nhl_data_2023.sort_values(by=['team', 'season'])
    _nhl_data_2023 = _nhl_data_2023.groupby(['team', 'season']).mean()
    return _nhl_data_2023


def get_team_names(_team_data):
    duplicates = pd.DataFrame(_team_data['team']).drop_duplicates(subset=None, keep='first', inplace=False,
                                                                  ignore_index=False)
    duplicates = duplicates.sort_values(by=['team'])
    return duplicates['team'].values

In [None]:
by_team_season_all = read_nhl_data()
by_team_season_all = by_team_season_all[(by_team_season_all['season'] != 2023)]
by_team_season_all = by_team_season_all[(by_team_season_all['season'] != 2022)]

seasons_data_all = by_team_season_all.drop(columns=['playoff_qualified', 'season', 'team'])
seasons_target_all = by_team_season_all['playoff_qualified'].values

In [None]:
by_team_season_all = read_nhl_data()
by_team_season_all = by_team_season_all[(by_team_season_all['season'] == 2022)]

seasons_teams_2022 = get_team_names(by_team_season_all)
seasons_data_2022 = by_team_season_all.drop(columns=['playoff_qualified', 'season', 'team'])
seasons_target_2022 = by_team_season_all['playoff_qualified'].values

In [None]:
seasons_data_2023 = read_nhl_data_2023()
seasons_teams_2023 = get_team_names(pd.read_csv('moneypack/teams_2023.csv'))
seasons_target_2023 = [0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1,
                       0]  #<40

In [None]:

best_features = SelectKBest(score_func=f_classif, k=5).fit(seasons_data_all.values, seasons_target_all)
selected_features = seasons_data_all.columns[best_features.get_support()]
sorted_index = np.argsort(best_features.scores_)[::-1]
sorted_scores = best_features.scores_[sorted_index]
sorted_feature_names = np.array(seasons_data_all.columns)[sorted_index]
sorted_features_all = pd.DataFrame({"feature": sorted_feature_names, "score": sorted_scores})

plt.figure(figsize=(20, 20))
sns.barplot(x=sorted_scores, y=sorted_feature_names)
plt.xlabel('Scores')
plt.ylabel('Features')
plt.title('Feature Importance using SelectKBest')
plt.show()


In [None]:
sorted_features_all = sorted_features_all[sorted_features_all['score'] > 100]
sorted_features_all_names = sorted_features_all['feature'].values

scaler = StandardScaler()
seasons_data_all_features = scaler.fit_transform(seasons_data_all[sorted_features_all_names])
seasons_data_2022_features = scaler.fit_transform(seasons_data_2022[sorted_features_all_names])
seasons_data_2023_features = scaler.fit_transform(seasons_data_2023[sorted_features_all_names])


In [None]:
class MyKNearestNeighborsClassifier(BaseEstimator, ClassifierMixin):

    def __init__(self, k_neighbors):
        self.classes_ = None
        self.k_neighbors = k_neighbors
        self._k = None
        self._x = None
        self._y = None

    def get_params(self, deep=True):
        return {"k_neighbors": self.k_neighbors}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self

    def fit(self, x, y):
        x, y = check_X_y(x, y)
        self.classes_ = unique_labels(y)
        self._k = self.__validate_k(self.k_neighbors)
        self._x = x
        self._y = y
        return self

    def predict(self, x_test):
        x_test = check_array(x_test)
        predicted_points = []
        for test_x_point in x_test:
            predicted_points.append(self.__predict_class(test_x_point))
        return np.array(predicted_points)

    def __predict_class(self, x_point):
        distances = []
        # calculate distances to all the training points
        for x_train_point in self._x:
            distances.append(self.__euclidean(x_train_point, x_point))

        nearest_indexes = np.argsort(distances)[:self._k]  # sort distances, take k nearest points' indices
        nearest_classes = self._y[nearest_indexes]  # take their classes
        # count number of occurrences of a class and return the index of max counter.

        # astype = nearest_classes.astype(int64)
        return np.bincount(nearest_classes).argmax()

    def __euclidean(self, point1, point2):
        distance = np.sum(np.square(point1 - point2))
        return np.sqrt(distance)

    def __validate_k(self, k):
        if k <= 0:
            raise Exception("K cannot be less or equal to zero")

        k += 1  # Ties can be broken consistently by expanding K by 1
        return k

# check_estimator(MyKNearestNeighborsClassifier(3))

In [None]:
def plot_learning_curve(estimator, x_data, y_data):
    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(10, 6), sharey=True)

    common_params = {
        "X": x_data,
        "y": y_data,
        "train_sizes": np.linspace(0.1, 1.0, 5),
        "cv": StratifiedShuffleSplit(n_splits=30, test_size=0.2, random_state=0),
        "score_type": "both",
        "n_jobs": 4,
        "line_kw": {"marker": "o"},
        "std_display_style": "fill_between",
        "score_name": "Accuracy",
    }

    LearningCurveDisplay.from_estimator(estimator, **common_params, ax=ax)
    handles, label = ax.get_legend_handles_labels()
    ax.legend(handles[:2], ["Training Score", "Test Score"])
    ax.set_title(f"Learning Curve for {estimator.__class__.__name__}")


def run_cross_validation(classifier_instance, _data, _target, _cv, scoring='neg_root_mean_squared_error'):
    _cv_results = cross_validate(classifier_instance,
                                 _data, _target,
                                 cv=_cv, scoring=scoring,
                                 return_estimator=True)
    _cv_results["test_error"] = -_cv_results["test_score"]
    return _cv_results


def run_prediction(_cv_result, _class, _data, _target, _data_for_prediction, _expected_result=None):
    _y_pred = _class.predict(_data_for_prediction)

    if _expected_result is not None:
        _precision = precision_score(_expected_result, _y_pred, average=None)
        _recall = recall_score(_expected_result, _y_pred, average=None)
        _accuracy = accuracy_score(_expected_result, _y_pred)
        _confusion_matrix = confusion_matrix(_expected_result, _y_pred)
        _f1_score = f1_score(_expected_result, _y_pred)
        _testing_error = mean_absolute_error(_expected_result, _y_pred)
        print(
            f'|{"Classifier":>35}|{"f1":>20}|{"accuracy":>20}|{"precision":>24}|{"recall":>23}|{"test error":>20}|')
        print(
            f'|{_class.__str__()[:40]:>34}|{_f1_score:>20}|{_accuracy:>20}|{_precision.__str__():>24}|{_recall.__str__():>23}|{_testing_error:>20}|')
    return _y_pred


def print_prediction(_teams, _pred, _season, _expected=None):
    _pred_table = pd.DataFrame({"team": _teams, "playoff qualified": _pred})
    if _expected is not None:
        _pred_table['expected'] = _expected
    print(f'\nPrediction for season {_season}\n{_pred_table}\n')



In [None]:
class ModelMetrics:
    def __init__(self, _classifier, _cv_results, _y_pred, _expected_result, _classifier_prefix=''):
        self.classifier_prefix = _classifier_prefix
        self.pred_y = _y_pred
        self.classifier_instance = _classifier
        self.cv_results = _cv_results
        self.precision = precision_score(_expected_result, _y_pred, average=None)
        self.recall = recall_score(_expected_result, _y_pred, average=None)
        self.accuracy = accuracy_score(_expected_result, _y_pred)
        self.confusion_matrix = confusion_matrix(_expected_result, _y_pred)
        self.f1_score = f1_score(_expected_result, _y_pred)
        self.testing_error = mean_absolute_error(_expected_result, _y_pred)

    def comparator(a, b):
        return a.f1_score - b.f1_score

    def get_classifier_instance(self):
        return self.classifier_instance

    def get_cv_results(self):
        return self.cv_results

    def __str__(self):
        return (
            f'|{self.classifier_prefix + self.classifier_instance.__str__()[:20]:>25}|{self.f1_score:>20}|{self.accuracy:>20}|{self.precision.__str__():>24}|{self.recall.__str__():>23}|{self.testing_error:>20}|')


class ModelChooser:
    def __init__(self):
        self.data = []

    def push(self, _classifier, _cv_results, _y_pred, _expected_result, _classifier_prefix=''):
        self.data.append(ModelMetrics(_classifier, _cv_results, _y_pred, _expected_result, _classifier_prefix))

    def get_k_best(self, k):
        _data = sorted(self.data, key=cmp_to_key(ModelMetrics.comparator), reverse=True)
        return _data[:k]

    def print_k_best(self, k):
        print(
            f'|{"Classifier":>31}|{"f1":>20}|{"accuracy":>20}|{"precision":>24}|{"recall":>23}|{"test error":>20}|')
        for i, model in enumerate(self.get_k_best(k)):
            print(f'|{i + 1:>4} {model}')

In [None]:

cv = StratifiedShuffleSplit(n_splits=30, test_size=0.2, random_state=0)
data = seasons_data_all_features
target = seasons_target_all

n_neighbors = 5
max_depth = 5

classifier_knn = KNeighborsClassifier(n_neighbors=n_neighbors, metric='euclidean')
classifier_knn_my = MyKNearestNeighborsClassifier(n_neighbors)
classifier_dec_tree = DecisionTreeClassifier(max_depth=max_depth)
voting_clf_hard = VotingClassifier(estimators=[('knn', classifier_knn), ('decision_tree', classifier_dec_tree)],
                                   voting='hard')
voting_clf_soft = VotingClassifier(estimators=[('knn', classifier_knn), ('decision_tree', classifier_dec_tree)],
                                   voting='soft')

cv_results_knn = run_cross_validation(classifier_knn, data, target, cv)
best_knn = cv_results_knn["estimator"][0]

cv_results_knn_my = run_cross_validation(classifier_knn_my, data, target, cv)
best_knn_my = cv_results_knn_my["estimator"][0]

cv_results_dec_tree = run_cross_validation(classifier_dec_tree, data, target, cv)
best_dec_tree = cv_results_dec_tree["estimator"][0]

cv_results_hard = run_cross_validation(voting_clf_hard, data, target, cv)
best_hard = cv_results_hard["estimator"][0]

cv_results_soft = run_cross_validation(voting_clf_soft, data, target, cv)
best_soft = cv_results_soft["estimator"][0]

In [None]:
chooser = ModelChooser()
chooser.push(best_knn, cv_results_knn, best_knn.predict(seasons_data_2022_features), seasons_target_2022)
chooser.push(best_knn_my, cv_results_knn_my, best_knn_my.predict(seasons_data_2022_features), seasons_target_2022)
chooser.push(best_dec_tree, cv_results_dec_tree, best_dec_tree.predict(seasons_data_2022_features), seasons_target_2022)
chooser.push(best_hard, cv_results_hard, best_hard.predict(seasons_data_2022_features), seasons_target_2022, 'hard')
chooser.push(best_soft, cv_results_soft, best_soft.predict(seasons_data_2022_features), seasons_target_2022, 'soft')
chooser.print_k_best(5)

metrics = chooser.get_k_best(1)[0]
best_classifier = metrics.get_classifier_instance()


In [None]:
plot_learning_curve(best_classifier, seasons_data_2022_features, seasons_target_2022)

In [None]:
prediction_2022 = pd.DataFrame(
    {"team": seasons_teams_2022,
     "playoff qualified": best_classifier.predict(seasons_data_2022_features),
     "expected": seasons_target_2022}
)
print(f'Prediction for season 2022-2023\n{prediction_2022}')

In [None]:
prediction_2023 = pd.DataFrame(
    {"team": seasons_teams_2022,
     "playoff qualified": best_classifier.predict(seasons_data_2023_features),
     "expected": seasons_target_2023}
)
print(f'Prediction for season 2023-2024\n{prediction_2023}')