In [12]:
import pyro
import pyro.optim as optim
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook as tqdm
from torch.distributions import constraints
from sklearn.model_selection import StratifiedKFold
from sklearn import metrics, preprocessing

In [13]:
pyro.set_rng_seed(1)
pyro.enable_validation(True)
pyro.clear_param_store()

In [14]:
# wilt_atrr = ['GLCM_pan', 'Mean_Green', 'Mean_Red', 'Mean_NIR', 'SD_pan']

# loading datasets
df = pd.read_csv('wine.data', header=None)
# X = torch.from_numpy(wine.iloc[:,1:-1].values)
# y = torch.from_numpy(wine.iloc[:,0].astype(int).values)
X = df.iloc[:,1:-1]
y = df.iloc[:,0].astype(int)
y = y - 1

In [28]:
softplus = torch.nn.Softplus() # takie ReLU tylko ciągłe

def model(X):
    # rejestrujemy zmienną do przestrzeni optymalizacji (store pyro)
    mu_param = pyro.param("mu", torch.zeros_like(naive_bayes.current_class_probs))
    # ograniczamy wartości do nieujemnych
    sigma_param = pyro.param("sigma", torch.ones_like(naive_bayes.current_class_probs), constraint=constraints.positive)
    params = pyro.distributions.Normal(loc=mu_param, scale=sigma_param).to_event(1)
    with pyro.plate("map", len(X)):
        pyro.sample("probs", params, obs=X)

def guide(X):
    # rejestrujemy zmienną do przestrzeni optymalizacji (store pyro)
    mu_param = pyro.param("mu", torch.zeros_like(naive_bayes.current_class_probs))
    # ograniczamy wartości do nieujemnych
    sigma_param = pyro.param("sigma", torch.ones_like(naive_bayes.current_class_probs), constraint=constraints.positive)
    probs_prior = pyro.distributions.Normal(loc=mu_param, scale=sigma_param).to_event(1)
    return pyro.sample("probs", probs_prior, infer={'is_auxiliary': True})

def train(X):
    pyro.clear_param_store()
    num_iterations=5000
    optim = pyro.optim.Adam({"lr": 0.01})
    svi = pyro.infer.SVI(model, guide, optim, loss=pyro.infer.Trace_ELBO(), num_samples=len(X))
    losses = list()
    t=tqdm(range(num_iterations))
    for j in t:
        loss = svi.step(X)
        losses.append(loss)
        t.set_postfix(loss=loss)
    return (svi, losses)

In [29]:
class NaiveBayesClassifier:
    def __init__(self):
        pass
    
    def fit(self, X, y):
        self.X = X
        self.y = y
        
        self.available_y = np.unique(y)
        self.num_features = X.shape[1]
        self._count_y_prob()
        self.params_for_probs = list()
        
        for target in self.available_y:
            self.X_current_class = torch.from_numpy(X[y==target])
            self.current_class_probs = torch.from_numpy(np.random.randn(self.num_features))
            train(self.X_current_class)
            mu = pyro.param("mu")
            sigma = pyro.param("sigma")
            self.params_for_probs.append(torch.stack([mu, sigma], dim=0))
            
        for i in range(len(self.params_for_probs)):
            self.params_for_probs[i] = self.params_for_probs[i].detach().numpy()
            
    def _count_y_prob(self):
        total_quantity = len(self.y)
        self.p_y = [np.count_nonzero(self.y == i) / total_quantity for i in self.available_y]
    
    def predict(self, X):
        predicted = list()
        for i in range(len(X)):
            predicted.append(self._predict_one_example(i, X[i, :]))
        return np.asarray(predicted)

    def _predict_one_example(self, i: int, x: np.ndarray):
        certainity_for_ys = list()
        for y in self.available_y:  # for every class
            certainity_for_ys.append(self.p_y[y])
            for i in range(len(x)):  # for every feature
                certainity_for_ys[-1] *= self._p_xi_on_condition_y(i, y, x[i])
        return self.available_y[certainity_for_ys.index(max(certainity_for_ys))]
    
    def _p_xi_on_condition_y(self, feature_index, y_index, x_i):
#         print(y_index)
#         print(feature_index)
#         print(self.params_for_probs)
        multiplier = 1 / np.sqrt(2 * np.pi * self.params_for_probs[y_index][0, feature_index])
        exp = - (x_i - self.params_for_probs[y_index][1, feature_index]) ** 2 / (2 * self.params_for_probs[y_index][0, feature_index])
        return multiplier * np.power(np.e, exp)

In [30]:
naive_bayes = NaiveBayesClassifier()
# naive_bayes.fit(X, y)

In [31]:
def crossval_research(data, target):
    splitter = StratifiedKFold(n_splits=7, shuffle=True)
    split_set_generator = splitter.split(data, target)

    # trainning and testing
    y_pred = list()
    y_true = list()

    train_indices, test_indices = next(split_set_generator)
#     print(type(train_indices))
#     print(type(data))
#     print(data)
    X_train = data[train_indices]
    Y_train = target[train_indices]
    naive_bayes.fit(X_train, Y_train)
    y_pred.extend(naive_bayes.predict(data[test_indices]))
    y_true.extend(target[test_indices])

    confusion = metrics.confusion_matrix(y_true, y_pred)
    accuracy = metrics.accuracy_score(y_true, y_pred)
    precision = metrics.precision_score(y_true, y_pred, average=None)
    recall = metrics.recall_score(y_true, y_pred, average=None)
    f1_score = metrics.f1_score(y_true, y_pred, average=None)

    return {"confusion": confusion, "accuracy": accuracy, "precision": precision,
            "recall": recall, "f1_score": f1_score}

In [32]:
class_metrics = crossval_research(X.values, y.values)

HBox(children=(IntProgress(value=0, max=5000), HTML(value='')))

HBox(children=(IntProgress(value=0, max=5000), HTML(value='')))

HBox(children=(IntProgress(value=0, max=5000), HTML(value='')))

  'precision', 'predicted', average, warn_for)
  'precision', 'predicted', average, warn_for)


In [33]:
param_store = pyro.get_param_store()
for key in param_store.keys():
    print(key)
    
print(pyro.param("mu"))
print(pyro.param("sigma"))

mu
sigma
tensor([13.0939,  3.3176,  2.4561,  5.9368,  3.9697,  1.7212,  0.8012,  0.4476,
         1.1849,  7.4583,  0.6851,  1.6778], dtype=torch.float64,
       requires_grad=True)
tensor([ 1.3414,  1.1298,  0.1809, 16.7009, 44.4260,  0.3561,  0.2915,  0.1227,
         0.4022,  2.3089,  0.1171,  0.2617], dtype=torch.float64,
       grad_fn=<AddBackward0>)


In [34]:
print(class_metrics)

{'confusion': array([[9, 0, 0],
       [9, 0, 2],
       [0, 0, 7]]), 'accuracy': 0.5925925925925926, 'precision': array([0.5       , 0.        , 0.77777778]), 'recall': array([1., 0., 1.]), 'f1_score': array([0.66666667, 0.        , 0.875     ])}


In [35]:
print(naive_bayes.p_y)

[0.33112582781456956, 0.3973509933774834, 0.271523178807947]
