# Boosting

In [None]:
%matplotlib inline
import torch
# import torchvision
import torchvision.transforms.functional as fn
from torch.distributions.multivariate_normal import MultivariateNormal
import os, os.path
import numpy as np
from matplotlib import pyplot as plt
from PIL import Image

## Restructure images and prepare training, validation and test dataset

In [None]:
# when running the below cell again with diff dimension and saving files to xray folder, run this before
# !rmdir /s /q xray

In [None]:
# by inspection of files, min Width = 384, min Height = 127
def process_images(crop, dimension,zeroOne):
    imgs = torch.zeros((1,torch.prod(dimension)+1))
    train, validation, test = imgs,imgs,imgs

    in_directory = ["chest_xray/test/NORMAL","chest_xray/test/PNEUMONIA",\
                "chest_xray/val/NORMAL","chest_xray/val/PNEUMONIA",\
                "chest_xray/train/NORMAL","chest_xray/train/PNEUMONIA"]

    out_directory = ["xray/test/NORMAL","xray/test/PNEUMONIA",\
                "xray/val/NORMAL","xray/val/PNEUMONIA",\
                "xray/train/NORMAL","xray/train/PNEUMONIA"]

    valid_images = [".jpeg",".jpg",".png"]

    for i in range(len(in_directory)):
        if not os.path.exists(out_directory[i]):
            # os.makedirs(out_directory[i]) #if you are saving to the xray folder
            imgs = torch.zeros((1,torch.prod(dimension)+1))
            group = in_directory[i].split("/")[1:]
            if (group[-1] == "NORMAL"):
                label = 1.
            else:
                label = -1.
            label = torch.tensor(label).reshape((1,1))

            for f in os.listdir(in_directory[i]):
                ext = os.path.splitext(f)[1]
                if ext.lower() not in valid_images:
                    continue
                img = Image.open(os.path.join(in_directory[i],f)).convert("L")
                
                if crop:
                    img = fn.center_crop(img, output_size=list(np.array(dimension))) #crop
                else:
                    img = fn.resize(img, size=list(np.array(dimension))) # resize

                # img.save(f"{out_directory[i]}/{f}") # optional: save image to folder
                img = torch.tensor(np.array(img)).reshape((1,torch.prod(dimension)))
                imgs = torch.vstack((imgs,torch.hstack((img,label))))

            imgs = imgs[1:,:]
            if (group[0] == "train"):
                train = torch.vstack((train,imgs))
            elif (group[0] == "val"):
                validation = torch.vstack((validation,imgs))
            else:
                test = torch.vstack((test,imgs))

    train, validation, test = train[1:,:], validation[1:,:],test[1:,:]
    train[:,:-1] /= 255
    validation[:,:-1] /= 255
    test[:,:-1] /= 255
    if zeroOne:
        train[:,:-1] = (train[:,:-1]>=0.5).long()
        validation[:,:-1] = (validation[:,:-1]>=0.5).long()
        test[:,:-1] = (test[:,:-1]>=0.5).long()
    # remove if you shuffle elsewhere
    train=train[torch.randperm(train.shape[0])]
    validation = validation[torch.randperm(validation.shape[0])]
    test = test[torch.randperm(test.shape[0])]
    return train, validation, test


In [None]:
dimension = torch.tensor([128,128])
train, validation, test = process_images(crop=0, dimension=dimension, zeroOne=False) # crop = 0 -> resize, crop=1 -> crop. w is square image dimension
train.shape, validation.shape, test.shape 

## Use Naive Bayes weak classifier with Adaptive Boositng (AdaBoost)

In [None]:
def rescale_linear(array, new_min, new_max):
    """Rescale an arrary linearly."""
    minimum, maximum = torch.min(array), torch.max(array)
    m = (new_max - new_min) / (maximum - minimum)
    b = new_min - m * minimum
    return m * array + b

def bayespost(data,px,py):
    # we need to incorporate the prior probability p(y) since p(y|x) is
    # proportional to p(x|y) p(y)
    data = data.reshape((len(data),1))
    logpx = torch.log(px)
    logpxneg = torch.log(1-px)
    logpy = torch.log(py)
    logpost = logpy.clone()
    logpost += (logpx * data).sum(0) + logpxneg * (1-data)
    # normalize to prevent overflow or underflow by subtracting the largest value
    logpost -= torch.max(logpost)
    # and compute the softmax using logpx
    post = torch.exps(logpost)
    post /= torch.sum(post)
    return post

Pure naive classifier (for comparison)

In [None]:
X_train, y_train = train[:,:-1], train[:,-1]
idx1 = (y_train==1.)
idx0 = ~idx1
ycount = torch.ones((2))
ycount[0],ycount[1] = idx1.sum(), idx0.sum()
py = ycount / ycount.sum()

for gauss in [True,False]:
    if gauss:
        print("Gaussian Naive Bayes")
        ## For continous [0-1] pixel images (Gaussian)
        means = torch.ones((torch.prod(dimension),2))
        vars, px = torch.ones((torch.prod(dimension),2)), torch.ones((torch.prod(dimension),2))
        means[:,0] = X_train[idx1,:].mean(axis=0)
        means[:,1] = X_train[idx0,:].mean(axis=0)
        vars[:,0] = X_train[idx1,:].var(axis=0)
        vars[:,1] = X_train[idx0,:].var(axis=0)
        # px[:,0] = MultivariateNormal(means[:,0],torch.diag(vars[:,0])).sample()
        # px[:,1] = MultivariateNormal(means[:,1],torch.diag(vars[:,1])).sample()
        # px = rescale_linear(px, 1e-5, 1-1e-3)

        #train
        x1 = (torch.log(1/torch.sqrt(2*3.1415*vars[:,0]))\
                -0.5*((train[:,:-1] - means[:,0])**2)/vars[:,0]).sum(1)
        x0 = (torch.log(1/torch.sqrt(2*3.1415*vars[:,1]))\
                -0.5*((train[:,:-1] - means[:,1])**2)/vars[:,1]).sum(1)
        logpy = torch.log(py)
        x = torch.vstack((x1,x0)).T
        pred = (x+logpy).argmax(1)
        pred[pred==1] = -1.
        pred[pred==0] = 1.
        err_rate = (train[:,-1] != pred).sum()/len(pred)
        print(f"Training Misclassification rate: {err_rate:.{4}}")
        print(f"Training Accuracy: {100*(1-err_rate):.{4}}%")

        #validation
        x1 = (torch.log(1/torch.sqrt(2*3.1415*vars[:,0]))\
                -0.5*((validation[:,:-1] - means[:,0])**2)/vars[:,0]).sum(1)
        x0 = (torch.log(1/torch.sqrt(2*3.1415*vars[:,1]))\
                -0.5*((validation[:,:-1] - means[:,1])**2)/vars[:,1]).sum(1)
        logpy = torch.log(py)
        x = torch.vstack((x1,x0)).T
        pred = (x+logpy).argmax(1)
        pred[pred==1] = -1.
        pred[pred==0] = 1.
        err_rate = (validation[:,-1] != pred).sum()/len(pred)
        print(f"\nValidation Misclassification rate: {err_rate:.{4}}")
        print(f"Validation Accuracy: {100*(1-err_rate):.{4}}%")

        #test
        x1 = (torch.log(1/torch.sqrt(2*3.1415*vars[:,0]))\
                -0.5*((test[:,:-1] - means[:,0])**2)/vars[:,0]).sum(1)
        x0 = (torch.log(1/torch.sqrt(2*3.1415*vars[:,1]))\
                -0.5*((test[:,:-1] - means[:,1])**2)/vars[:,1]).sum(1)
        logpy = torch.log(py)
        x = torch.vstack((x1,x0)).T
        pred = (x+logpy).argmax(1)
        pred[pred==1] = -1.
        pred[pred==0] = 1.
        err_rate = (test[:,-1] != pred).sum()/len(pred)
        print(f"\nTest Misclassification rate: {err_rate:.{4}}")
        print(f"Test Accuracy: {100*(1-err_rate):.{4}}%")
        
    else:
        print("*****************************\nMultinomial Naive Bayes")
        # For 0 to 1 pixel images (treating it as Multinomial)
        xcount = torch.ones((torch.prod(dimension),2)) #Laplace smoothing
        xcount[:,0] += X_train[idx1,:].sum(axis=0)
        xcount[:,1] += X_train[idx0,:].sum(axis=0)
        px = (xcount / ycount.reshape(1,2)) #broadcasting

        # train
        # t_pred = torch.tensor(np.apply_along_axis(bayespost, 1, train[:,:-1],px,py))
        t_pred = torch.ones((len(train),2))
        for i in range(len(train)):
            t_pred[i,:] = bayespost(train[i,:-1], px, py)
        pred_label = (t_pred[:,0] > t_pred[:,1]).long()
        pred_label[pred_label == 1] = 1.
        pred_label[pred_label == 0] = -1.
        err_rate = (train[:,-1] != pred_label).sum()/len(pred_label)
        print(f"Training Misclassification rate: {err_rate:.{4}}")
        print(f"Training Accuracy: {100*(1-err_rate):.{4}}%")

        # validation
        # v_pred = torch.tensor(np.apply_along_axis(bayespost, 1, validation[:,:-1],px,py))
        v_pred = torch.ones((len(validation),2))
        for i in range(len(validation)):
            v_pred[i,:] = bayespost(validation[i,:-1], px, py)
        pred_label = (v_pred[:,0] > v_pred[:,1]).long()
        pred_label[pred_label == 1] = 1.
        pred_label[pred_label == 0] = -1.
        err_rate = (validation[:,-1] != pred_label).sum()/len(pred_label)
        print(f"\nValidation Misclassification rate: {err_rate:.{4}}")
        print(f"Validation Accuracy: {100*(1-err_rate):.{4}}%")

        # test
        # test_pred = torch.tensor(np.apply_along_axis(bayespost, 1, test[:,:-1],px,py))
        test_pred = torch.ones((len(test),2))
        for i in range(len(test)):
            test_pred[i,:] = bayespost(test[i,:-1], px, py)
        pred_label = (test_pred[:,0] > test_pred[:,1]).long()
        pred_label[pred_label == 1] = 1.
        pred_label[pred_label == 0] = -1.
        err_rate = (test[:,-1] != pred_label).sum()/len(pred_label)
        print(f"\nTest Misclassification rate: {err_rate:.{4}}")
        print(f"Test Accuracy: {100*(1-err_rate):.{4}}%")

In [None]:
# Compare to scikit
from sklearn.naive_bayes import GaussianNB, MultinomialNB
nb = GaussianNB()
nb.fit(train[:,:-1], train[:,-1])
print("Gaussian Naive Bayes")
print("Train Naive Bayes accuracy: %.2f%%" %(100*nb.score(train[:,:-1], train[:,-1])))
print("Validation Naive Bayes accuracy: %.2f%%" %(100*nb.score(validation[:,:-1], validation[:,-1])))
print("Test Naive Bayes accuracy: %.2f%%" %(100*nb.score(test[:,:-1], test[:,-1])))

nb = MultinomialNB()
nb.fit(train[:,:-1], train[:,-1])
print("\nMultinomial Naive Bayes")
print("Train Naive Bayes accuracy: %.2f%%" %(100*nb.score(train[:,:-1], train[:,-1])))
print("Validation Naive Bayes accuracy: %.2f%%" %(100*nb.score(validation[:,:-1], validation[:,-1])))
print("Test Naive Bayes accuracy: %.2f%%" %(100*nb.score(test[:,:-1], test[:,-1])))


Adaboost

In [None]:
class AdaBoost:

    # initialization
    def __init__(self):

        self.num_classes = 2
        self.max_iter = 0
        self.prob = torch.tensor(0.)
        self.probs = torch.tensor(0.)
        self.alphas = torch.tensor(0.)
        self.pred_labels = torch.tensor(0.)


    def fit(self,data,dimension,gauss,max_iter = 100):

        self.alphas = torch.tensor(0.) 
        self.pred_labels = torch.ones((len(data),1))
        self.gauss = gauss
        self.prob = 1/len(data)*torch.ones((len(data),1)) #initialize weights to 1/N
        self.probs = torch.ones((len(data),1))
        self.probs = torch.hstack((self.probs,self.prob))
        self.max_iter = max_iter
        self.dimension = torch.prod(dimension)
        self.py = torch.ones((self.num_classes,self.max_iter))
        self.px = torch.ones((self.dimension,self.num_classes,self.max_iter))
        self.means = torch.ones((torch.prod(dimension),self.num_classes,self.max_iter))
        self.vars = torch.ones((torch.prod(dimension),self.num_classes,self.max_iter))

        for ctr in range(max_iter):
            # get samples of training data with replacement and find px, py
            # torch.seed()
            idx = self.probs[:,-1].multinomial(num_samples=len(data), replacement=True)
            X_train, y_train = data[idx,:-1], data[idx,-1]
            idx1 = (y_train==1.)
            idx0 = ~idx1

            ycount = torch.ones(self.num_classes)
            ycount[0],ycount[1] = idx1.sum(), idx0.sum()
            self.py[:,ctr] = ycount / ycount.sum()

            if self.gauss:
                ## For continous [0-1] pixel images (Gaussian)
                self.means[:,0,ctr] = X_train[idx1,:].mean(axis=0)
                self.means[:,1,ctr] = X_train[idx0,:].mean(axis=0)
                self.vars[:,0,ctr] = X_train[idx1,:].var(axis=0)
                self.vars[:,1,ctr] = X_train[idx0,:].var(axis=0)
                # self.px[:,0,ctr] = MultivariateNormal(means[:,0,ctr],torch.diag(vars[:,0,ctr])).sample()
                # self.px[:,1,ctr] = MultivariateNormal(means[:,1,ctr],torch.diag(vars[:,1,ctr])).sample()
                # self.px[:,:,ctr] = rescale_linear(self.px[:,:,ctr], 1e-5, 1-1e-3)
                x1 = (torch.log(1/torch.sqrt(2*3.1415*self.vars[:,0,ctr]))\
                        -0.5*((X_train - self.means[:,0,ctr])**2)/self.vars[:,0,ctr]).sum(1)
                x0 = (torch.log(1/torch.sqrt(2*3.1415*self.vars[:,1,ctr]))\
                        -0.5*((X_train - self.means[:,1,ctr])**2)/self.vars[:,1,ctr]).sum(1)
                logpy = torch.log(self.py[:,ctr])
                x = torch.vstack((x1,x0)).T
                self.pred_label = (x+logpy).argmax(1)
                self.pred_label[self.pred_label==1] = -1.
                self.pred_label[self.pred_label==0] = 1.

                # scikit
                # self.nb = GaussianNB()
                # self.nb.fit(X_train, y_train)
                # self.pred_label = torch.tensor(self.nb.predict(X_train))
            else:
                ## For 0 to 1 pixel images (treating each fraction as Multinomial data)
                xcount = torch.ones(self.dimension,self.num_classes) # Laplace smoothing
                xcount[:,0] += X_train[idx1,:].sum(axis=0)
                xcount[:,1] += X_train[idx0,:].sum(axis=0)
                self.px[:,:,ctr] = (xcount / ycount.reshape(1,self.num_classes))#broadcasting
                # get predictions on the whole training dataset
                # features = data[:,:-1]
                # pred = torch.tensor(np.apply_along_axis(bayespost, 1, features,self.px[:,:,ctr],self.py[:,ctr]))
                pred = torch.ones((len(data),self.num_classes))
                for i in range(len(data)):
                    pred[i,:] = bayespost(data[i,:-1],self.px[:,:,ctr],self.py[:,ctr])
                self.pred_label = (pred[:,0] > pred[:,1]).long()
                self.pred_label[self.pred_label == 1] = 1.
                self.pred_label[self.pred_label == 0] = -1.

                # scikit
                # self.nb = MultinomialNB()
                # self.nb.fit(X_train, y_train)
                # self.pred_label = torch.tensor(self.nb.predict(X_train))


            # estimate misclassification from predictions
            err_idx = (data[:,-1] != self.pred_label)
            err = (err_idx*self.probs[:,-1]).sum() + 1e-12        
            if err >= 1 - 1 / self.num_classes:
                print("Worse than random guess. Stopped Boosting")
                break

            # measure performance of the naive bayes with alpha
            # If the error (err) is 0.5, then the performance of the naive bayes will be zero.
            # If the error is 0 or 1, then the performance will become infinity or -infinity respectively.
            # Large alpha means classifier is a good one
            alpha = 0.5*torch.log((1-err)/err)
            self.alphas = torch.hstack((self.alphas,alpha)) # store
            self.pred_labels = torch.hstack((self.pred_labels,self.pred_label.reshape((len(data),1))))
            # increase weights of the wrongly classified records and decrease weights of the correctly classified records
            # training label * prediction label = -1 -> misclassified -> e^(-alpha) is large -> weight increased
            # training label * prediction label = 1 -> correctly classified -> e^(-alpha) is small -> weight decreased
            prob = self.probs[:,-1]*torch.exp(-alpha*data[:,-1]*self.pred_label)
            if prob.sum() <= 0:
                print("prob sum invalid")
                break   
            prob /= prob.sum() # normalize weights
            if ctr++1 >= max_iter: #err <= tol:
                break
            self.probs = torch.hstack((self.probs,prob.reshape((len(train),1)))) # store  
        # remove garbage initialization values
        self.alphas = self.alphas[1:]
        self.probs = self.probs[:,1:]
        self.pred_labels = self.pred_labels[:,1:]

        # adaboost = final classifier is the sign of the performance weighted sum of predicted values at different iterations
        self.final_pred = torch.sign((self.alphas*self.pred_labels).sum(1))
        err_rate = (data[:,-1] != self.final_pred).sum()/len(self.final_pred)
        print(f"Training Misclassification rate: {err_rate:.{4}}")
        print(f"Training Accuracy: {100*(1-err_rate):.{4}}%")
    

    def predict(self,data):

        pred_labels = torch.ones((len(data),1))

        for ctr in range(len(self.alphas)):
            if self.gauss:
                x1 = (torch.log(1/torch.sqrt(2*3.1415*self.vars[:,0,ctr]))\
                        -0.5*((data[:,:-1] - self.means[:,0,ctr])**2)/self.vars[:,0,ctr]).sum(1)
                x0 = (torch.log(1/torch.sqrt(2*3.1415*self.vars[:,1,ctr]))\
                        -0.5*((data[:,:-1] - self.means[:,1,ctr])**2)/self.vars[:,1,ctr]).sum(1)
                logpy = torch.log(self.py[:,ctr])
                x = torch.vstack((x1,x0)).T
                pred_label = (x+logpy).argmax(1)
                pred_label[pred_label==1] = -1.
                pred_label[pred_label==0] = 1.

                # pred_label = torch.tensor(self.nb.predict(data[:,:-1]))
            else:
                # features = data[:,:-1]
                # pred = torch.tensor(np.apply_along_axis(bayespost, 1, features,self.px[:,:,ctr],self.py[:,ctr]))
                self.pred = torch.ones((len(data),self.num_classes))
                for i in range(len(data)):
                    self.pred[i,:] = bayespost(data[i,:-1],self.px[:,:,ctr],self.py[:,ctr])
                pred_label = (self.pred[:,0] > self.pred[:,1]).long()
                pred_label[pred_label == 1] = 1.
                pred_label[pred_label == 0] = -1.

                # pred_label = torch.tensor(self.nb.predict(data[:,:-1]))
                
            pred_labels = torch.hstack((pred_labels,pred_label.reshape((len(data),1))))
        
        pred_labels = pred_labels[:,1:]
        final_pred = torch.sign((self.alphas*pred_labels).sum(1))
        return final_pred


In [None]:
M=100
for gauss in [True, False]:
    # train
    if gauss:
        print("Gaussian Naive Bayes")
    else:
        print("*****************************\nMultinomial Naive Bayes")
    
    AB = AdaBoost()
    AB.fit(train,dimension,gauss,max_iter=M)

    # validate
    val_pred = AB.predict(validation)
    err_rate = (validation[:,-1] != val_pred).sum()/len(val_pred)
    print(f"\nValidation Misclassification rate: {err_rate:.{4}}")
    print(f"Validation Accuracy: {100*(1-err_rate):.{4}}%")

    # test
    test_pred = AB.predict(test)
    err_rate = (test[:,-1] != test_pred).sum()/len(test_pred)
    print(f"\nTest Misclassification rate: {err_rate:.{4}}")
    print(f"Test Accuracy: {100*(1-err_rate):.{4}}%")

## Compare with scikit AdaBoost Classifier

In [None]:
from sklearn.ensemble import AdaBoostClassifier

# train
clf = AdaBoostClassifier(n_estimators=M, random_state=0)
clf.fit(train[:,:-1], train[:,-1])
t_acc = 100*clf.score(train[:,:-1], train[:,-1])

# validate
v_p = clf.predict(validation[:,:-1])
v_acc = 100*clf.score(validation[:,:-1], validation[:,-1])

# test
t_p = clf.predict(test[:,:-1])
te_acc = 100*clf.score(test[:,:-1], test[:,-1])

print("Training Accuracy: %.2f%%, Validation Accuracy: %.2f%%, Testing Accuracy %.2f%%" % (t_acc, v_acc, te_acc))