# MLE Predictions based on Mutlivariate Gauassian
Notebook to create and evaluate baseline model predictions

In [None]:
from utils.data import create_dataloaders
from utils.evaluation import visualise_batch_predictions
import torch

from config import config

DATA_PATH = config.PATH_TO_DATA

### Loading and Preparing Data

In [None]:
bands = [0, 1, 2, 3,]
train_loader, val_loader, test_loader = create_dataloaders(DATA_PATH, bands=bands, batch_size=2000, transforms=False) # get full train and val dataset

In [None]:
def prepare_data(X_in, y_in):
    X_out = torch.permute(X_in, (1, 0, 2, 3)).flatten(start_dim=1).T
    y_out = y_in.flatten(start_dim=0)

    assert X_out.shape[0] == len(y_out)
    print("Dimensions of transformed output: ")
    print("Data: {}".format(X_out.shape))
    print("Labels: {}".format(y_out.shape))
    return X_out, y_out

def pull_train_loader(loader, n_pulls):
    image_batches = []
    mask_batches = []
    for _ in range(n_pulls):
        batch_x, batch_y = next(iter(loader))
        image_batches.append(batch_x)
        mask_batches.append(batch_y)

    X = torch.cat(image_batches, 0)
    y = torch.cat(mask_batches, 0)
    return X, y

In [None]:
# gather all data from image loaders
X_train, y_train = pull_train_loader(train_loader, 1)
X_test, y_test = next(iter(test_loader))
X_val, y_val = next(iter(val_loader))

# print dimensions
print(X_train.shape)
print(X_test.shape)
print(X_val.shape)
print(y_test.shape)


# flatten all data
X_train, y_train = prepare_data(X_train, y_train)
X_test, y_test = prepare_data(X_test, y_test)
X_val, y_val = prepare_data(X_val, y_val)

### Define MLE Model

In [None]:
import numpy as np

class MLE_model():
    def __init__(self, ):
        self.pos_means = None
        self.neg_means = None
        self.pos_cov = None
        self.neg_cov = None

    def fit(self, X_train, y_train):
        
        # seperate data into both classes
        pos = (y_train==1)
        neg = (y_train==0)

        X_train_pos = X_train[pos, :]
        X_train_neg = X_train[neg, :]

        # fit multivariate gaussian distributions for both classes
        self.pos_means = torch.mean(X_train_pos, dim=0).numpy()
        self.pos_cov = np.cov(X_train_pos.numpy().T)

        self.neg_means = torch.mean(X_train_neg, dim=0).numpy()
        self.neg_cov = np.cov(X_train_neg.numpy().T)

        # validate fitting process
        print("Model Fitted with means {} and {}".format(self.pos_means, self.neg_means))
        print("Covariance matrix are \n {} \n and \n {}".format(self.pos_cov, self.neg_cov))
        return

    def predict(self, X_test):

        # convert to numpy
        X_test = X_test.numpy()

        # Run MLE estimator
        # calculate log odds for both classes
        log_ps = [self.compute_log_p_solution(X_test, m, s) for m, s in zip([self.neg_means, self.pos_means], [self.neg_cov, self.pos_cov])]
        
        # take argmax
        assignments = np.argmax(log_ps, axis=0)
        return log_ps, assignments
    
    def compute_log_p_solution(self, X, mean, sigma):
        # Solution inspired by CS-433 Machine Learning exercises
        d = X.shape[1]
        c = -np.log(2 * np.pi) * (d / 2) - 0.5 * np.log(np.linalg.det(sigma))
        A = X - mean
        invSigma = np.linalg.inv(sigma)

        return -0.5 * np.sum(A * (A.dot(invSigma)), axis=1) + c

In [None]:
# fit MLE model
model = MLE_model()
model.fit(X_train, y_train)

In [None]:
def evaluate_MLE(y_test, y_pred):
    true_pos = 0
    true_neg = 0
    false_neg = 0
    false_pos = 0

    target = torch.tensor(y_test, dtype=torch.int64)
    pred = torch.tensor(y_pred, dtype=torch.int64)

    true_pos += ((target == 1) & (pred == 1)).sum().item()
    true_neg += ((target == 0) & (pred == 0)).sum().item()
    false_neg += ((target == 1) & (pred == 0)).sum().item()
    false_pos += ((target == 0) & (pred == 1)).sum().item()

    # Calculate metrics (on GPU if needed)
    accuracy = (true_pos + true_neg) / (true_pos + true_neg + false_pos + false_neg)
    precision = true_pos / (true_pos + false_pos) if (true_pos + false_pos) > 0 else 0
    recall = true_pos / (true_pos + false_neg) if (true_pos + false_pos) > 0 else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) else 0
    jaccard = true_pos / (true_pos + false_neg + false_pos) if (true_pos + false_neg + false_pos) else 0

    print("Accuracy: {:.4}".format(accuracy))
    print("F1: {:.4}".format(f1))
    print("Jaccard: {:.4}".format(jaccard))
    print("Precision: {:.4}".format(precision))
    print("Recall: {:.4}".format(recall))

In [None]:
# predict and evaluate on the test set
log_ps, pred_tot = model.predict(X_test)
evaluate_MLE(y_test, pred_tot)

In [None]:
# predict and evaluate on the val set
log_ps, pred_tot = model.predict(X_val)
evaluate_MLE(y_val, pred_tot)

### Visualise Predictions
visually, the MLE predictions are convincing too

In [None]:
# reload right sized image
_, _, test_loader = create_dataloaders(DATA_PATH, bands=bands, batch_size=8, transforms=False) # get full train and val dataset
loader = iter(test_loader)

In [None]:
def predict_from_batch(model, batch_sample):
    input = torch.permute(batch_sample, (1, 0, 2, 3)).flatten(start_dim=1).T
    _, batch_output = model.predict(input)
    batch_predictions = torch.tensor(batch_output).reshape(batch_sample.shape[0], 1, 128, 128)
    return batch_predictions

batch_sample, batch_mask = next(loader)
batch_predictions = predict_from_batch(model, batch_sample)
visualise_batch_predictions(batch_sample, batch_mask.unsqueeze(1), batch_predictions, rescale=True, bands=bands)

## Testing unsupervised methods for good measure
This seems to work equally well - we can approximate the two mutlivariate gaussians with the EM algorithm quite succesfully

In [None]:
from sklearn.mixture import GaussianMixture

model = GaussianMixture(n_components=2, verbose=True)

n_samples = 10_000_000 # take a susample to speed up training
indices = np.random.choice(np.arange(n_samples), size=n_samples)

X_train_ = X_train[indices, :]

model.fit(X_train_)

print("Found means: ", model.means_)
print("Found covariances: ", model.covariances_)

In [None]:
# predict and evaluate on the test set
pred_tot = model.predict(X_test)

# since this is unspervised, we need to flip the class (if accuracy on your machine is bad, just flip it back by uncommenting)
# pred_tot = np.where(pred_tot==1, 0, 1)

evaluate_MLE(y_test, pred_tot)