## Estructura del código

## Modelo

In [1]:
import numpy as np
from numpy.linalg import det, inv

In [2]:
class ClassEncoder:
  def fit(self, y):
    self.names = np.unique(y)
    self.name_to_class = {name:idx for idx, name in enumerate(self.names)}
    self.fmt = y.dtype
    # Q1: por que no hace falta definir un class_to_name para el mapeo inverso?

  def _map_reshape(self, f, arr):
    return np.array([f(elem) for elem in arr.flatten()]).reshape(arr.shape)
    # Q2: por que hace falta un reshape?

  def transform(self, y):
    return self._map_reshape(lambda name: self.name_to_class[name], y)

  def fit_transform(self, y):
    self.fit(y)
    return self.transform(y)

  def detransform(self, y_hat):
    return self._map_reshape(lambda idx: self.names[idx], y_hat)

In [3]:
class BaseBayesianClassifier:
  def __init__(self):
    self.encoder = ClassEncoder()

  def _estimate_a_priori(self, y):
    a_priori = np.bincount(y.flatten().astype(int)) / y.size
    # Q3: para que sirve bincount?
    return np.log(a_priori)

  def _fit_params(self, X, y):
    # estimate all needed parameters for given model
    raise NotImplementedError()

  def _predict_log_conditional(self, x, class_idx):
    # predict the log(P(x|G=class_idx)), the log of the conditional probability of x given the class
    # this should depend on the model used
    raise NotImplementedError()

  def fit(self, X, y, a_priori=None):
    # first encode the classes
    y = self.encoder.fit_transform(y)

    # if it's needed, estimate a priori probabilities
    self.log_a_priori = self._estimate_a_priori(y) if a_priori is None else np.log(a_priori)

    # check that a_priori has the correct number of classes
    assert len(self.log_a_priori) == len(self.encoder.names), "A priori probabilities do not match number of classes"

    # now that everything else is in place, estimate all needed parameters for given model
    self._fit_params(X, y)
    # Q4: por que el _fit_params va al final? no se puede mover a, por ejemplo, antes de la priori?

  def predict(self, X):
    # this is actually an individual prediction encased in a for-loop
    m_obs = X.shape[1]
    y_hat = np.empty(m_obs, dtype=self.encoder.fmt)

    for i in range(m_obs):
      encoded_y_hat_i = self._predict_one(X[:,i].reshape(-1,1))
      y_hat[i] = self.encoder.names[encoded_y_hat_i]

    # return prediction as a row vector (matching y)
    return y_hat.reshape(1,-1)

  def _predict_one(self, x):
    # calculate all log posteriori probabilities (actually, +C)
    log_posteriori = [ log_a_priori_i + self._predict_log_conditional(x, idx) for idx, log_a_priori_i
                  in enumerate(self.log_a_priori) ]

    # return the class that has maximum a posteriori probability
    return np.argmax(log_posteriori)

In [4]:
class QDA(BaseBayesianClassifier):

  def _fit_params(self, X, y):
    # estimate each covariance matrix
    self.inv_covs = [inv(np.cov(X[:,y.flatten()==idx], bias=True))
                      for idx in range(len(self.log_a_priori))]
    # Q5: por que hace falta el flatten y no se puede directamente X[:,y==idx]?
    # Q6: por que se usa bias=True en vez del default bias=False?
    self.means = [X[:,y.flatten()==idx].mean(axis=1, keepdims=True)
                  for idx in range(len(self.log_a_priori))]
    # Q7: que hace axis=1? por que no axis=0?

  def _predict_log_conditional(self, x, class_idx):
    # predict the log(P(x|G=class_idx)), the log of the conditional probability of x given the class
    # this should depend on the model used
    inv_cov = self.inv_covs[class_idx]
    unbiased_x =  x - self.means[class_idx]
    return 0.5*np.log(det(inv_cov)) -0.5 * unbiased_x.T @ inv_cov @ unbiased_x

In [5]:
class TensorizedQDA(QDA):

    def _fit_params(self, X, y):
        # ask plain QDA to fit params
        super()._fit_params(X,y)

        # stack onto new dimension
        self.tensor_inv_cov = np.stack(self.inv_covs)
        self.tensor_means = np.stack(self.means)

    def _predict_log_conditionals(self,x):
        unbiased_x = x - self.tensor_means
        inner_prod = unbiased_x.transpose(0,2,1) @ self.tensor_inv_cov @ unbiased_x

        return 0.5*np.log(det(self.tensor_inv_cov)) - 0.5 * inner_prod.flatten()

    def _predict_one(self, x):
        # return the class that has maximum a posteriori probability
        return np.argmax(self.log_a_priori + self._predict_log_conditionals(x))

## Código para pruebas

Seteamos los datos

In [6]:
# hiperparámetros
rng_seed = 6543

In [7]:
from sklearn.datasets import load_iris, fetch_openml

def get_iris_dataset():
  data = load_iris()
  X_full = data.data
  y_full = np.array([data.target_names[y] for y in data.target.reshape(-1,1)])
  return X_full, y_full

def get_penguins():
    # get data
    df, tgt = fetch_openml(name="penguins", return_X_y=True, as_frame=True, parser='auto')

    # drop non-numeric columns
    df.drop(columns=["island","sex"], inplace=True)

    # drop rows with missing values
    mask = df.isna().sum(axis=1) == 0
    df = df[mask]
    tgt = tgt[mask]

    return df.values, tgt.to_numpy().reshape(-1,1)

# showing for iris
X_full, y_full = get_iris_dataset()

print(f"X: {X_full.shape}, Y:{y_full.shape}")

X: (150, 4), Y:(150, 1)


In [8]:
# peek data matrix
X_full[:5]

array([[5.1, 3.5, 1.4, 0.2],
       [4.9, 3. , 1.4, 0.2],
       [4.7, 3.2, 1.3, 0.2],
       [4.6, 3.1, 1.5, 0.2],
       [5. , 3.6, 1.4, 0.2]])

In [9]:
# peek target vector
y_full[:5]

array([['setosa'],
       ['setosa'],
       ['setosa'],
       ['setosa'],
       ['setosa']], dtype='<U10')

Separamos el dataset en train y test para medir performance

In [10]:
# preparing data, train - test validation
# 70-30 split
from sklearn.model_selection import train_test_split

def split_transpose(X, y, test_sz, random_state):
    # split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=random_state)

    # transpose so observations are column vectors
    return X_train.T, y_train.T, X_test.T, y_test.T

def accuracy(y_true, y_pred):
  return (y_true == y_pred).mean()

train_x, train_y, test_x, test_y = split_transpose(X_full, y_full, 0.4, rng_seed)

print(train_x.shape, train_y.shape, test_x.shape, test_y.shape)

(4, 90) (1, 90) (4, 60) (1, 60)


Entrenamos un QDA y medimos su accuracy

In [11]:
qda = QDA()

qda.fit(train_x, train_y)

In [12]:
train_acc = accuracy(train_y, qda.predict(train_x))
test_acc = accuracy(test_y, qda.predict(test_x))
print(f"Train (apparent) error is {1-train_acc:.4f} while test error is {1-test_acc:.4f}")

Train (apparent) error is 0.0111 while test error is 0.0167


Con el magic %%timeit podemos estimar el tiempo que tarda en correr una celda en base a varias ejecuciones. Por poner un ejemplo, acá vamos a estimar lo que tarda un ciclo completo de QDA y también su inferencia (predicción).

Ojo! a veces [puede ser necesario ejecutarlo varias veces](https://stackoverflow.com/questions/10994405/python-timeit-results-cached-instead-of-calculated) para obtener resultados consistentes.

Si quieren explorar otros métodos de medición también es válido!

In [13]:
%%timeit

qda.predict(test_x)

13.8 ms ± 3.77 ms per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [14]:
%%timeit

model = QDA()
model.fit(train_x, train_y)
model.predict(test_x)

20.1 ms ± 4.16 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)


# Implementación base

## Parte 1

### Distribucion uniforme

In [46]:
# Cargo el dataset iris
X_iris, y_iris = get_iris_dataset()

#Separo en test de entrenamiento y test de evaluacio
train_x_iris, train_y_iris, test_x_iris, test_y_iris = split_transpose(X_iris, y_iris, 0.4, rng_seed)

In [47]:
#Creo el modelo
qda_uniform = QDA()

#La probabilidad a priori es un array de largo 3 porque CLass encoder toma los y unicos, 
#en este caso son 3
qda_uniform.fit(train_x_iris, train_y_iris, a_priori = [1/3, 1/3, 1/3])

In [48]:
#Mido prescicion del modelo
train_acc_qda_uniform = accuracy(train_y_iris, qda_uniform.predict(train_x_iris))
test_acc_qda_uniform = accuracy(test_y_iris, qda_uniform.predict(test_x_iris))
print(f"Train (apparent) error is {1-train_acc_qda_uniform:.4f} while test error is {1-test_acc_qda_uniform:.4f}")

Train (apparent) error is 0.0222 while test error is 0.0167


### Probabilidad 0.9, 0.05, 0.05

In [57]:
#Creo nuevo modelo
qda_prob = QDA()

qda_prob.fit(train_x_iris, train_y_iris, a_priori = [0.9, 0.05, 0.05])

In [58]:
#Mido prescicion del modelo
train_acc_qda_prob = accuracy(train_y_iris, qda_prob.predict(train_x_iris))
test_acc_qda_prob = accuracy(test_y_iris, qda_prob.predict(test_x_iris))
print(f"Train (apparent) error is {1-train_acc_qda_prob:.4f} while test error is {1-test_acc_qda_prob:.4f}")

Train (apparent) error is 0.0222 while test error is 0.0167


In [59]:
#Cambio de orden las probabilidades
qda_prob.fit(train_x_iris, train_y_iris, a_priori = [0.05, 0.9, 0.05])

In [60]:
#Mido prescicion del modelo
train_acc_qda_prob = accuracy(train_y_iris, qda_prob.predict(train_x_iris))
test_acc_qda_prob = accuracy(test_y_iris, qda_prob.predict(test_x_iris))
print(f"Train (apparent) error is {1-train_acc_qda_prob:.4f} while test error is {1-test_acc_qda_prob:.4f}")

Train (apparent) error is 0.0333 while test error is 0.0000


In [61]:
#Cambio de orden las probabilidades
qda_prob.fit(train_x_iris, train_y_iris, a_priori = [0.05, 0.05, 0.9])

In [62]:
#Mido prescicion del modelo
train_acc_qda_prob = accuracy(train_y_iris, qda_prob.predict(train_x_iris))
test_acc_qda_prob = accuracy(test_y_iris, qda_prob.predict(test_x_iris))
print(f"Train (apparent) error is {1-train_acc_qda_prob:.4f} while test error is {1-test_acc_qda_prob:.4f}")

Train (apparent) error is 0.0333 while test error is 0.0500


Ver como responder la pregunta. En base a que quiere coomparar si es entre estas dos distribuciones o con la que el hace antes