In [None]:
import numpy as np
import numpy as np
import sklearn.metrics as metrics

In [2]:
import torch as tc

In [3]:
import os
import sys
module_path = os.path.abspath(os.path.join('/Users/wushuangyan/Desktop/Linear models/base'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [4]:
os.chdir('/Users/wushuangyan/Desktop/Linear models')

In [5]:
# Logistic Regression Model
from base.module import GradientClassifier  # import from local files

class LogisticRegression(GradientClassifier):
    def __init__(self, feat_dim=4):
        GradientClassifier.__init__(self, "LogReg")
        self._features = feat_dim
        self.weights = tc.nn.Parameter(tc.randn((feat_dim, 1), dtype=tc.float32))
        self.bias = tc.nn.Parameter(tc.zeros((1, ), dtype=tc.float32) + 0.1)

    def forward(self, x):
        assert len(x.shape) == 2 and x.shape[1] == self._features
        # Implement a linear model for the log odds.
        logits = tc.mm(x, self.weights) + self.bias 
        return tc.sigmoid(logits)

In [8]:
# Multilayer Perceptron Neural Networks
from base.module import GradientClassifier

class MLPBinaryClassifier(GradientClassifier):
    def __init__(self, in_dim=4, hide_dim=64, device="cpu"):
        GradientClassifier.__init__(self, "MLP")
        self._features = in_dim
        self.weights_hide = tc.nn.Parameter(tc.randn((in_dim, hide_dim)))
        self.bias_hide = tc.nn.Parameter(tc.zeros((hide_dim, ), dtype=tc.float32) + 0.1)
        self.weights_clf = tc.nn.Parameter(tc.randn((hide_dim, 1)))
        self.bias_clf = tc.nn.Parameter(tc.zeros((1, ), dtype=tc.float32) + 0.1)

    def forward(self, x): 
        assert len(x.shape) == 2 and x.shape[1] == self._features
        # Implement the MLP model.
        logits = tc.mm((tc.mm (x, self.weights_hide) + self.bias_hide), self.weights_clf)+ self.bias_clf
        return tc.sigmoid(logits)

In [9]:
# Naive Bayes model
from base.module import StatisticClassifier

class BayesianClassifier(StatisticClassifier):
    
    def __init__(self):
        StatisticClassifier.__init__(self, "NaiveBayesian")
        self._probas = None

    def forward(self, sample):
        target_y, target_prob = 0, 0.0
        for y, (postior, cond_probs) in enumerate(self._probas):
            # P(Y=y|x1, x2, ...) ~= P(Y=y) * P(X1=x1|Y=y) * P(X2=x2|Y=y) * ...
            for x_idx, x_cond_prob_dict in enumerate(cond_probs):
                postior *= x_cond_prob_dict.get(sample[x_idx], 0.0)
            if  postior > target_prob:       
                # Find the biggest postior probability
                target_y, target_prob = y, postior           
        return target_y            

    def _fit(self, X, Y):
        # the dimsension of is X = (num_samples, num_featues)
        self._probas = [] # store P(y) and P(X|Y=y) as a tuple
        for y in range(self._num_cls): # y= 0,1,2,3...
            # calculate P(X|Y=y) by storing them as a sequence
            subX = X[Y == y] # records with label y
            y_cond_prob = [] # store [P(X1|Y=y), P(X2|Y=y), ...] as list
            for subx_seq in subX.T:
                counts = {}  # dict
                for val in subx_seq.tolist():
                    if val not in counts:
                        counts[val] = 0
                    counts[val] += 1
                    # Calcluate the P(X1=x11|Y=y), P(X1=x21|Y=y)
                x_probs = {key: value / len(subX) for key, value in counts.items()}
                y_cond_prob.append(x_probs) 
                
            Py = len(subX) / len(X) # calculate P(Y=y)
            self._probas.append((Py, y_cond_prob))


In [10]:
# Decision Tree 
from base.module import StatisticClassifier

class _Tree:
    
    """basic node structure for construction tree"""
    
    def __init__(self, label, feature=None):
        assert (isinstance(feature, int) and 0 <= feature) or feature is None
        assert isinstance(label, (int, np.int32, np.int64)) and 0 <= label
        self._feature = feature
        self._label = label
        self._children = {} #dict

    @property
    def label(self):
        return self._label
    
    @property
    def feature(self):
        return self._feature
        
    def __getitem__(self, condition):
        return self._children.get(condition, self._label)

    def __setitem__(self, condition, children):
        assert not self.is_leaf(), 'current node is a leaf!'
        assert isinstance(children, _Tree)
        self._children[condition] = children

    def is_leaf(self):
        return self._feature is None


def standard_entropy(seq):
    assert len(seq.shape) == 1
    uniqs, counts = np.unique(seq, return_counts=True)
    probs = counts / seq.size
    # entropy = -sum[P(y) * log(P(y))]
    entropy = - sum (probs * np.log2(probs))     
    return entropy

def conditional_entropy(x_seq, y_seq):
    assert len(x_seq.shape) == len(y_seq.shape) == 1
    assert x_seq.size == y_seq.size
    entropy = 0.0
    x_uniqs, x_counts = np.unique(x_seq, return_counts=True)
    for x_label, x_count in zip(x_uniqs, x_counts):
        x_prob = x_count / x_seq.size
        y_controled_by_x = y_seq[x_seq == x_label] 
        # conditional entropy = - Px * sum[P(y|x) * logP(y|x)]
        entropy += x_prob * (sum (((np.unique(y_controled_by_x, return_counts=True)[1])/len(y_controled_by_x))*
                                 (np.log2((np.unique(y_controled_by_x, return_counts=True)[1])/len(y_controled_by_x)))))          
    return - entropy
        

class DecisionTreeClassifier(StatisticClassifier):
    def __init__(self):
        StatisticClassifier.__init__(self, "DecisionTree")
        self._tree = None

    def forward(self, sample):
        tree = self._tree
        while not tree.is_leaf():
            tree = tree[sample[tree.feature]]
        return tree.label

    def _construct_tree(self, X, Y, used_fids):
        # Decision tree are generated recursively. Specifically, there
        # are three steps as following:
        # Step-1: check whether we stop the recursion.
        # Step-2: calculate information gain of each variable to pick up
        #        the best variable to split data.
        # Step-3: recursively call this function to generate children
        #        by using the best splitting variable.

        # Step-1: check whether we need to generate children
        most_freq_y = np.bincount(Y.astype(np.int32)).argmax() # y = 0,1,2,..
        entire_entropy = standard_entropy(Y)
        stop_cond_1 = (len (np.unique(Y))==1)
                     # Assign a bool value to identify: whether there is only a single y in Y.
        stop_cond_2 = (np.unique(X, axis=0).shape[0] == 1)
                     # Assign a bool value to identify: whether we use out all features.
        if stop_cond_1 or stop_cond_2:
            return _Tree(most_freq_y) 
        
        # Step-2: find out the best splitting feature
        max_info_gain, best_fid, best_seq = -float("inf"), 0, None
        for fid, x_seq in enumerate(X.T):
            if fid in used_fids:
                continue
            x_info_gain = entire_entropy - conditional_entropy(x_seq, Y)
            if x_info_gain > max_info_gain:
                max_info_gain, best_fid, best_seq = x_info_gain, fid, x_seq

        # Step-3: recursively generate children of the current tree
        root = _Tree(most_freq_y, best_fid)
        used_fids = used_fids | {best_fid}
        for uniq_val in np.unique(best_seq):
            uniq_idx = best_seq == uniq_val
            subX, subY = X[uniq_idx], Y[uniq_idx]
            # Recursively call `self._construct_tree` to generate children."
            root[uniq_val] = self._construct_tree(subX, subY, used_fids)
        return root

    def _fit(self, X, Y):
        self._tree = self._construct_tree(X, Y, set())


In [22]:
# Build pipeline for all 4 ML methods
from base.utils import prepare_dataset, scoring

def pipeline(model, train, test):
    name = model.name
    trainX, trainY = train[:, :-1], train[:, -1]
    print("")
    print("Model: %s\n" % name + "-" * 80)
    model.fit(trainX, trainY)
    
    acc, f1, auc = scoring(trainY, model.predict(trainX))
    print("Train Accuracy=%.4f | F1=%.4f | AUC=%.4f" % (acc, f1, auc))

    acc, f1, auc = scoring(test[:, -1].astype(np.int32), model.predict(test[:, :-1]))
    print("Test Accuracy=%.4f | F1=%.4f | AUC=%.4f" % (acc, f1, auc))

    
if __name__ == "__main__":
    train, test, labels = prepare_dataset("./iris.csv", do_normalize=True)
    for architect in [LogisticRegression, MLPBinaryClassifier]:
        pipeline(architect(), train, test)
    train, test, labels = prepare_dataset("./iris.csv", do_discretize=True)
    for architect in [BayesianClassifier, DecisionTreeClassifier]:
        pipeline(architect(), train, test)
    


Model: LogReg
--------------------------------------------------------------------------------
Train Accuracy=0.9714 | F1=0.9737 | AUC=0.9967
Test Accuracy=0.9667 | F1=0.9600 | AUC=0.9955

Model: MLP
--------------------------------------------------------------------------------
Train Accuracy=0.9857 | F1=0.9867 | AUC=1.0000
Test Accuracy=0.9667 | F1=0.9600 | AUC=0.9910

Model: NaiveBayesian
--------------------------------------------------------------------------------
Train Accuracy=0.9286 | F1=0.9296 | AUC=0.9308
Test Accuracy=0.9333 | F1=0.9231 | AUC=0.9321

Model: DecisionTree
--------------------------------------------------------------------------------
Train Accuracy=0.9857 | F1=0.9867 | AUC=0.9848
Test Accuracy=0.9000 | F1=0.8800 | AUC=0.8937


The model accuracy, F1 score, and AUC order for the test set is: Logistic Regression >= MLP > Naive Bayes > Decision Tree. Although the MLP nerual network model performs well on the training and test sets, it took much longer time to train the model. And the decision tree model seems to overfit the training data, since the model performance (accuracy, F1, and AUC) shows a large difference between the training and test sets.  