# Наивный классификатор Байеса

## 0 Импорт необходимых библиотек

In [1]:
from typing import Iterable, Dict, Union, List
from collections import Counter

import pandas as pd
import numpy as np

from sklearn import metrics
from sklearn.model_selection import train_test_split

## 1 Naive Bayes with distributions

Разбить данные на тренировочную и тестовую выборки.
На основе обучающей создать модель наивного байеса, используя данное соотношение:

$P(class | x_1, x_2, \dots, x_n) = P(x_1|class) \cdot P(x_2|class) \cdot ... \cdot P(x_n|class) \cdot P(class)$

На тестовой выборке посчитать accuracy, precision и recall.

Если в данных содержатся числовые признаки, то предполагать для них нормальное распределение: 

$\mu = \frac{1}{n}\sum\limits_{i=1}^{n} x_i$ – среднее

$\sigma = \left[ \frac{1}{n-1} \sum\limits_{i=1}^{n} (x_i - \mu)^2 \right]^{0.5}$ – стандартное отклонение

$f(x) = \frac{1}{\sigma\sqrt{2\pi}}\exp({-\frac{(x-\mu)^2}{2\sigma^2})}$ – функция плотности для нормального распределения

### 1.1 Naive Bayes from Scratch (normal distribution incl.)

In [61]:
# noinspection PyMethodMayBeStatic, PyPep8Naming, PyShadowingNames
class NaiveBayesFromScratch:
    def __init__(self, num_features_cols: List[int], num_strategy="split"):
        self._classes_counts: Dict[Union[str, int], int] = dict()
        self._classes_probas: Dict[Union[str, int], float] = dict()
        self._features_probas_if_class: Dict[int, Dict[int, Dict[Union[str, int], float]]] = {}
        self._features_bins: np.ndarray = np.array([])
        self.num_features_cols: List[int] = num_features_cols
        self.num_strategy: str = num_strategy
        if self.num_strategy == "split":
            # Means & standard deviations for numeric features (index match by self.num_features)
            self._means: Dict[Union[str, int], np.ndarray] = {}
            self._stds: Dict[Union[str, int], np.ndarray] = {}

    def fit(self, X_train: np.ndarray, y_train: np.ndarray) -> None:
        self._count_classes_probas(y_train)
        if self.num_strategy == "split":
            X_train = self._split_numeric_features(X_train)
        else:
            self._means, self._stds = self._count_distribution_params(X_train, y_train)
        self._count_features_if_class_probas(X_train, y_train)

    def _count_classes_probas(self, y_train: np.ndarray) -> None:
        self._classes_counts = Counter(y_train)
        for label, value in self._classes_counts.items():
            self._classes_probas[label] = value / len(y_train)

    def _split_numeric_features(self, X: np.ndarray) -> np.ndarray:
        """Split given numeric features into quartiles
           and modify given X by adding four new binary features
           (boundaries: 25%, 50%, 75%)"""

        if self._features_bins.size == 0:
            self._features_bins = np.percentile(X[:, self.num_features_cols],
                                                q=(0, 25, 50, 75, 100),
                                                axis=0)

        for col_idx, bins in enumerate(self._features_bins.transpose()):
            values = X[:, self.num_features_cols[col_idx]]
            for i in range(bins.size - 1):
                if i == bins.size - 1:
                    is_between = (values > bins[i]) & (values <= bins[i + 1])
                else:
                    is_between = (values >= bins[i]) & (values < bins[i + 1])
                X = np.column_stack((X, is_between))

        return np.delete(X, self.num_features_cols, axis=1)
    
    def _count_distribution_params(self, 
                                   X: np.ndarray, 
                                   y: np.ndarray) -> Iterable[Dict[Union[str, int], np.ndarray]]:
        return {label : np.mean(X[:, self.num_features_cols][y_train == label], axis=0) for label in np.unique(y)}, \
               {label : np.std(X[:, self.num_features_cols][y_train == label], axis=0) for label in np.unique(y)}

    def _count_features_if_class_probas(self,
                                        X_train: np.ndarray,
                                        y_train: np.ndarray) -> None:
        for label in self._classes_counts:
            for feature_idx, feature in enumerate(X_train.transpose()):
                if self.num_strategy == "dist" and feature_idx in self.num_features_cols:
                    pass
                else:
                    for value in np.unique(feature):
                        self._features_probas_if_class.setdefault(label, dict()).setdefault(feature_idx, dict())[value] = \
                            np.sum((X_train[:, feature_idx] == value) & (y_train == label)) / self._classes_counts[label]

    def predict(self, X_test: np.ndarray) -> Iterable[np.ndarray]:
        if self.num_strategy == "split":
            X_test = self._split_numeric_features(X_test)
        predicted_labels, predicted_dist = np.array([]), np.array([])
        for instance in X_test:
            predicted_label, inst_dist = self._predict_single(instance)
            predicted_dist = inst_dist if predicted_dist.size == 0 else np.row_stack((predicted_dist, inst_dist))
            predicted_labels = np.hstack((predicted_labels, predicted_label))
        return predicted_labels, predicted_dist

    def _predict_single(self, instance: np.ndarray) -> Iterable[np.ndarray]:
        labels_dist = []
        for label, label_proba in self._classes_probas.items():
            predicted_proba = label_proba
            for idx, value in enumerate(instance):
                if self.num_strategy == "dist" and idx in self.num_features_cols:
                    real_idx = self.num_features_cols.index(idx)
                    mean, std = self._means[label][real_idx], self._stds[label][real_idx]
                    value_proba = np.exp((-1 * (value - mean) ** 2) / (2 * std ** 2)) / (std * np.sqrt(np.pi))
                else:
                    value_proba = self._features_probas_if_class[label][idx].get(value, 10 ** (-5))
                predicted_proba *= value_proba
            labels_dist.append((label, predicted_proba))
        return np.array(max(labels_dist, key=lambda t: t[1])[0]), np.array(labels_dist)

    def accuracy_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        return metrics.accuracy_score(y_true, y_pred)

    def precision_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        return metrics.precision_score(y_true, y_pred)

    def recall_score(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        return metrics.recall_score(y_true, y_pred)

    def all_scores(self, y_true: np.ndarray, y_pred: np.ndarray) -> Iterable[float]:
        """Return accuracy, precision and recall scores on given labels"""
        return self.accuracy_score(y_true, y_pred), self.precision_score(y_true, y_pred), \
               self.recall_score(y_true, y_pred)

### 1.2 Подготовка данных

In [35]:
url = 'https://raw.githubusercontent.com/otverskoj/First-steps-in-Data-Analysis/main/datasets/classification/occupancy_detection_preprocessed.csv'
names = ['date', 'temperature', 'humidity', 'light', 'co2', 'humidity_ratio', 'occupancy']
df = pd.read_csv(url, names=names, skiprows=1).drop(['date'], axis=1).sample(frac=1).reset_index(drop=True)

X, y = df.drop(["occupancy"], axis=1).values, df.loc[:, "occupancy"].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, stratify=y)

### 1.3 Апробация на своих данных

In [62]:
num_cols = list(range(X.shape[1]))
bayes = NaiveBayesFromScratch(num_cols, num_strategy="dist")
bayes.fit(X_train, y_train)
y_pred, _ = bayes.predict(X_test)
print("Accuracy: ", bayes.accuracy_score(y_test, y_pred))
print("Precision: ", bayes.precision_score(y_test, y_pred))
print("Recall: ", bayes.recall_score(y_test, y_pred))

Accuracy:  0.9686770428015564
Precision:  0.8817843866171003
Recall:  0.9983164983164983
