In [1]:
"""
@ Author:         姜楠（小组成员：姜楠、王勃栋、李甜）
@ Create Date:    2021-12-01
@ Requirements:
1, Implmenting the naïve bayes classifier without using sk-learn.
2, Implementation need support both discrete and continuous features, and Gaussian, Multinomial, and Bernoulli models required.
3, Providing prediction probability is required 
4, classifier evaluation required.
5, Compare your implementation with Sk-learn regressor.
"""
import pandas as pd
import numpy as np
import math


# 定义gaussion model
class Gaussion:
    def separate_by_classes(self, X, y):
        ''' This function separates our dataset in subdatasets by classes '''
        self.classes = np.unique(y)
        classes_index = {}
        subdatasets = {}
        cls, counts = np.unique(y, return_counts=True)
        self.class_freq = dict(zip(cls, counts))
#         print(self.class_freq)
        for class_type in self.classes:
            classes_index[class_type] = np.argwhere(y==class_type)
            subdatasets[class_type] = X[classes_index[class_type], :]
            self.class_freq[class_type] = self.class_freq[class_type]/sum(list(self.class_freq.values()))
        return subdatasets
    
    def fit(self, X, y):
        ''' The fitting function '''
        separated_X = self.separate_by_classes(X, y)
        self.means = {}
        self.std = {}
        for class_type in self.classes:
            # Here we calculate the mean and the standart deviation from datasets
            self.means[class_type] = np.mean(separated_X[class_type], axis=0)[0]
            self.std[class_type] = np.std(separated_X[class_type], axis=0)[0]
            
    def calculate_probability(self, x, mean, stdev):
        ''' This function calculates the class probability using gaussian distribution '''
        exponent = math.exp(-((x - mean) ** 2 / (2 * stdev ** 2)))
        return (1 / (math.sqrt(2 * math.pi) * stdev)) * exponent
    
    def predict_proba(self, X):
        ''' This function predicts the probability for every class '''
        self.class_prob = {cls:math.log(self.class_freq[cls], math.e) for cls in self.classes}
        for cls in self.classes:
            for i in range(len(self.means)):
#                 print(X[i])
                self.class_prob[cls]+=math.log(self.calculate_probability(X[i], self.means[cls][i], self.std[cls][i]), math.e)
        self.class_prob = {cls: math.e**self.class_prob[cls] for cls in self.class_prob}
        return self.class_prob
    
    def predict(self, X):
        ''' This funtion predicts the class of a sample '''
        pred = []
        for x in X:
            pred_class = None
            max_prob = 0
            for cls, prob in self.predict_proba(x).items():
                if prob>max_prob:
                    max_prob = prob
                    pred_class = cls
            pred.append(pred_class)
        return pred

# 定义Multinomial model
class Multinomial:
    def __init__(self, k=0.5):
        self.k = k
        self.cat0_count = 0
        self.cat1_count = 0
        self.total_count = self.cat0_count + self.cat1_count
        self.cat_0_prior = 0
        self.cat_1_prior = 0
        self.cat_0_prior, self.cat_1_prior
        self.word_probs = []
        self.vocab = []

    def tokenize(self, document):
        """
        Take in a document and return a list of words
        """
        doc = document.lower()
#         doc = document
        # remove non-alpha characters
#         stop_chars = '''0123456789!()-[]{};:'"\,<>./?@#$%^&*_~'''

        tokens = ""
        # iterate through and make each token
        for char in doc:
#             if char not in stop_chars:
            tokens += char

        return tokens.split() # now a list of tokens

    def count_words(self, X, y):
        """
        X is an array of documents
        y is an array of targets, 0 or 1
        Output a dictionary of {word: (cat0_count, cat1_count)...}
        """
        counts = {}
        # need to figure our this loop, want to iterate over both of them, I see why it was paired before
        for document, category in zip(X, y):
            for token in self.tokenize(document):
              # Initialize a dict entry with 0 counts
              if token not in counts:
                counts[token] = [0,0]
              # Now that it exists, add to the category count for that word
              counts[token][category] += 1
        return counts

    def prior_prob(self, counts):

        # Iterate through counts dict and add up each word count by category
        cat0_word_count = cat1_word_count = 0
        for word, (cat0_count, cat1_count) in counts.items():
            cat0_word_count += cat0_count
            cat1_word_count += cat1_count

        # save attributes to the class
        self.cat0_count = cat0_word_count
        self.cat1_count = cat1_word_count
        self.total_count = self.cat0_count + self.cat1_count

        # Get the prior prob by dividing words in each cat by total words
        cat_0_prior = cat0_word_count / self.total_count
        cat_1_prior = cat1_word_count / self.total_count
        return cat_0_prior, cat_1_prior

    def word_probabilities(self, counts):
        """turn the word_counts into a list of triplets
        word, p(w | cat0), and p(w | cat1)"""
        # Here we apply the smoothing term, self.k, so that words that aren't in
        # the category don't get calculated as 0
        self.vocab = [word for word, (cat0, cat1) in counts.items()]
        return [(word,
        (cat0 + self.k) / (self.cat0_count + 2 * self.k),
        (cat1 + self.k) / (self.cat1_count + 2 * self.k))
        for word, (cat0, cat1) in counts.items()]

    def fit(self, X, y):
        # Take all these functions and establish probabilities of input
        counts = self.count_words(X, y)
        self.cat_0_prior, self.cat_1_prior = self.prior_prob(counts)
        self.word_probs = self.word_probabilities(counts)

    def predict(self, test_corpus):
        # Split the text into tokens,
        # For each category: calculate the probability of each word in that cat
        # find the product of all of them and the prior prob of that cat
        y_pred = []
        for document in test_corpus:
          # Every document get their own prediction probability
            log_prob_cat0 = log_prob_cat1 = 0.0
            tokens = self.tokenize(document)
            # Iterate through the training vocabulary and add any log probs that match
            # if no match don't do anything. We just need a score for each category/doc
            for word, prob_cat0, prob_cat1 in self.word_probs:
                if word in tokens:
                  # Because of 'overflow' best to add the log probs together and exp
                    log_prob_cat0 += np.log(prob_cat0)
                    log_prob_cat1 += np.log(prob_cat1)
            # get each of the category predictions including the prior
            cat_0_pred = self.cat_0_prior * np.exp(log_prob_cat0)
            cat_1_pred = self.cat_1_prior * np.exp(log_prob_cat1)
            if cat_0_pred >= cat_1_pred:
                y_pred.append(0)
            else:
                y_pred.append(1)
        return y_pred


# 定义Bernoulli model
class Bernoulli:
    def __init__(self, smooth=1):
        self._smooth = smooth
        self._feat_prob = {}
        self._class_prob = {}
        self._classDict = {}
        self._featureMap = {}
        self._Ncls = 0
        self._Nfeat = 0

    def computeClassProbability(self, totalRecords):
        for x in self._classDict.keys():
            self._class_prob[x] = (self._classDict[x] + self._smooth) / float(totalRecords + (2**self._smooth)) # 2 because bernoullli (2 options)

    def fit(self, X, y):

        classDict = self.makeClasses(y)
        numFeatures = X.shape[1]
        classMap = {k: {"count": 0, "probability": 0} for k in classDict.keys()}
        featureMap = {k: classMap for k in range(0,numFeatures)}
        #featureMap contains a dictionary for every feature
        #which contains a dictionary for every class, and stores zero_count given class

        #go through the array and add all the counts for 0 given Class C
        for row in range(0, X.shape[0]):
            for column in range(0, X.shape[1]):
                val = 0 | X[row][column]
                if(val == 0):
                    correspondingClassValue = y[row]
                    featureMap[column][correspondingClassValue]["count"] += 1
                if(row == X.shape[0] - 1):
                    # calculate probablity and account for alpha
                    featureMap[column][correspondingClassValue]["probability"] = (featureMap[column][correspondingClassValue]["count"] + self._smooth) / float(classDict[correspondingClassValue] + (2**self._smooth))

        self._featureMap = featureMap
        self._Ncls = len(classDict.keys())
        self._Nfeat = numFeatures
        self._classDict = classDict
        self._class_prob = self.computeClassProbability(X.shape[0])

    def getPointProbability(self, featureName, columnName, className, value):

        retrieved = self._featureMap[fname][str(className)]["probability"]
        if(val == 1):
            retrieved = 1 - retrieved
        return retrieved

    def predict(self, X):

        results = np.zeros([X.shape[0],1])
        for row in range(0, X.shape[0]):
            maxVal = 0
            finalChoice = -1
            for className in self._classDict.keys():
                partialTotal = self._class_prob[str(className)]
                for column in range(0, X.shape[1]):
                    partialTotal *= self.getPointProbability(row,column,className,X[row][column])
                #set maxVal/result
                if(maxVal<partialTotal):
                    maxVal = partialTotal
                    finalChoice = className
            results[row] = finalChoice

        return results

    def makeClasses(self, y):
        classes = {}
        for i in range(0,len(y)):
            val = y[i]
            if(val in classes):
                classes[val] += 1
            else:
                classes[val] = 1
        return classes

    

In [2]:
#数据集准备
from sklearn import datasets 
from sklearn.model_selection import train_test_split

# gaussian 使用sklearn iris数据集
X, y = datasets.load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.5, random_state=0)
# X_train

In [3]:
# 使用sklearn GaussianNB模型预测
from sklearn.naive_bayes import GaussianNB,BernoulliNB ,MultinomialNB

#GaussianNB
skg=GaussianNB()
skg.fit(X_train,y_train)
skg_pre = skg.predict(X_test)
print("GaussianNB predict:",skg_pre)

GaussianNB predict: [2 1 0 2 0 2 0 1 1 1 1 1 1 1 1 0 1 1 0 0 2 1 0 0 2 0 0 1 1 0 2 1 0 2 2 1 0
 1 1 1 2 0 2 0 0 1 2 2 1 2 1 2 1 1 2 1 1 2 1 2 1 0 2 1 1 1 1 2 0 0 2 1 0 0
 1]


In [4]:
# 使用编写的 GaussianNB模型预测
g=Gaussion()
g.fit(X_train,y_train)
g_pre = g.predict(X_test)
print("Gaussion predict:",g_pre)


Gaussion predict: [1, 1, 0, 2, 0, 2, 0, 2, 2, 1, 2, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 2, 1, 0, 2, 2, 1, 0, 1, 1, 1, 2, 0, 2, 0, 0, 1, 2, 2, 1, 2, 1, 2, 2, 1, 1, 2, 1, 1, 2, 2, 1, 0, 1, 1, 1, 1, 1, 2, 0, 0, 2, 1, 0, 0, 2]


In [5]:
# multiminol 使用 邮件数据集
from sklearn.feature_extraction.text import CountVectorizer

spam = pd.read_csv("spam.csv")
dummies = pd.get_dummies(spam.label)
spam = pd.concat([spam,dummies],axis="columns")
spam = spam.drop(["label","ham"],axis="columns")
print(spam.groupby("spam").describe())

# print(spam)
m_X_train, m_X_test, m_y_train, m_y_test = train_test_split(spam["text"], spam["spam"], test_size=0.7, random_state=0)
# v=CountVectorizer(analyzer='word',ngram_range=(2,2))
v=CountVectorizer(stop_words='english')

m_X_train_T=v.fit_transform(m_X_train.values)
m_X_test_T=v.transform(m_X_test.values)

print(m_X_train_T.toarray()[:1])
# v.get_feature_names()


      text                                                               
     count unique                                                top freq
spam                                                                     
0       82     82                        Anything lor... U decide...    1
1       17     17  As a valued customer, I am pleased to advise y...    1
[[0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 1 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 2 0 0 0 0 0 0 0 0 0 0
  0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2
  0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0]]


In [6]:
# MultinomialNB

skm = MultinomialNB()
skm.fit(m_X_train_T, m_y_train)
skm_pre = skm.predict(m_X_test_T);
print("MultinomialNB predict:",skm_pre)
# skm.score(m_X_train_T,m_y_test)
# skm_pre.reshape(1,3)
# m_X_test_T.toarray()

MultinomialNB predict: [0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0
 1 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [7]:
#使用自编Multinomial
m=Multinomial()
m.fit(m_X_train, m_y_train)
m_pre = m.predict(m_X_test)
print("Multinomial predict:",m_pre)


Multinomial predict: [0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]


In [8]:
# bernoulli数据集，使用multiminal转换
v=CountVectorizer(stop_words='english', binary=True)

b_X_train_T=v.fit_transform(m_X_train.values).toarray()
b_X_test_T=v.transform(m_X_test.values).toarray()
b_y_train=m_y_train
# print(b_X_train_T)

In [9]:
# skleran 实现，此处使用multiminal数据集
skb = BernoulliNB()
skb.fit(m_X_train_T, b_y_train)
print("BernoulliNB predict:",skb.predict(b_X_test_T))

BernoulliNB predict: [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [11]:
#自编bernuoulli
alpha = 1
b = Bernoulli(alpha)
b.fit(b_X_train_T, np.array(b_y_train))

# print(b_X_train_T)
b_pre = b.predict(b_X_test_T)
# b_pre
# print 'alpha=%i accuracy = %f' %(alpha, np.mean((y_test-y_pred)==0))

TypeError: 'NoneType' object is not subscriptable