In [1]:
import numpy as np
import re
import random

In [2]:
class NaiveBayes():
    def load_data_set(self):
        # fake dataset created by ourself
        """
        posting_list: feature dataset
        class_vec: labels
        """
        posting_list = [
        ['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'],
        ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'],
        ['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'],
        ['stop', 'posting', 'stupid', 'worthless', 'gar e'],
        ['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'],
        ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']]
        class_vec = [0, 1, 0, 1, 0, 1]  # 1 is 侮辱性的文字, 0 is not
        return posting_list, class_vec
    
    def createVocabList(self, dataSet):
        """
        return the unique vocabulary list
        """
        vocabSet = set([])
        for document in dataSet:
            vocabSet = vocabSet | set(document)
        return list(vocabSet)
    
    def setOfWords2Vec(self, vocabList, inputSet):
        """
        convert document into vector (if a vocabulary occurred, then the corresponding
        feature will be set as 1)
        """
        returnVec = [0] * len(vocabList) # the length of feature vector = the length of vocabList
        for word in inputSet:
            if word in vocabList:
                returnVec[vocabList.index(word)] = 1
            else:
                print ("the word: %s is not in my VocabList" % word)
        return returnVec
    
    def trainNB0(self, trainMatrix, trainCategory):
        """
        input:
        1. trainMatrix: training documents
        2. trainCategory: corresponding training labels
        return:
        1. p0Vect: vector, each element is p(wi|C0).
        2. p1Vect: vector, each element is p(wi|C1).
        3. pAbusive: p(C1)
        
        Note: p(wi|C0) represents the probability that the word (VocalList[i]) 
                occur in all C0 documents
        """
        numTrainDocs = len(trainMatrix) # row number, the trianing file number
        numWords = len(trainMatrix[0]) # column number, vocabList length
        """calculate the probability of Abusive file, p(ci)"""
        pAbusive = sum(trainCategory) / float(numTrainDocs) # the probability of Abusive file
        
        # generate the empty vector to store the probability for each word in every category
#         p0Num = np.zero(numWords)
#         p1Num = np.zero(numWords)
        """we set the initial frequence as 1 for every word to avoid 0"""
        p0Num = np.ones(numWords)
        p1Num = np.ones(numWords)
        
        # create variable to store the total word number in each category
#         p0Denom = 0.0
#         p1Denom = 0.0
        """we also set the initial total number as 2 to avoid 0"""
        p0Denom = 2.0
        p1Denom = 2.0
        
        # count the frequence of each word and the total word number
        for i in range(numTrainDocs):
            if trainCategory[i] == 1: # this document belongs to category 1
                p1Num += trainMatrix[i] # cumsum the trainMatrix vector
                p1Denom += sum(trainMatrix[i]) # cumsum the total word number of each document
            else:
                p0Num += trainMatrix[i]
                p0Denom += sum(trainMatrix[i])
        
        # calculate p(wi|C0) and p(wi|C1)
#         p0Vect = p0Num / p0Denom
#         p1Vect = p1Num / p1Denom
        
        """
        另一个遇到的问题是下溢出，这是由于太多很小的数相乘造成的。当计算乘积 
        p(w0|ci) * p(w1|ci) * p(w2|ci)... p(wn|ci) 时，由于大部分因子都非
        常小，所以程序会下溢出或者得到不正确的答案。（用 Python 尝试相乘许多
        很小的数，最后四舍五入后会得到 0）。一种解决办法是对乘积取自然对数。
        在代数中有 ln(a * b) = ln(a) + ln(b), 于是通过求对数可以避免下溢出或
        者浮点数舍入导致的错误。同时，采用自然对数进行处理不会有任何损失。
        """
        
        # calculate In(p(wi|C0)) and In(p(wi|C1))
        p0Vect = np.log(p0Num / p0Denom) #numpy.log: Natural logarithm, element-wise.
        p1Vect = np.log(p1Num / p1Denom)
        
        return p0Vect, p1Vect, pAbusive
    
    def classifyNB(self, vec2Classify, p0Vec, p1Vec, pClass1):
        """
        function: classify the input document
        input:
        1. vec2Classify: the document waiting to be classified
        2. p0Vec: In(p(wi|C0)) vector
        3. p1Vec: In(p(wi|C1)) vector
        4. pClass1: p(C1)
        return:
        classified result, 1 or 0
        """
        
        # calculate p(C1|w) and p(C0|w), w is a vector
        # vector multiplication, element i (vector 0) * element i (vector 1)
        p1 = sum(vec2Classify * p1Vec) + np.log(pClass1)
        p0 = sum(vec2Classify * p0Vec) + np.log(1 - pClass1)
        
        if p1 > p0:
            return 1
        else:
            return 0
        
    def testNB(self):
        listOPosts, listClasses = self.load_data_set()
        myVocablist = self.createVocabList(listOPosts)
        trainMat = []

        # create the training matrix 
        for postinDoc in listOPosts:
            trainMat.append(self.setOfWords2Vec(myVocablist, postinDoc))

        # calculate the probability vector and category probability
        p0v, p1v, pAb = NB.trainNB0(trainMat, listClasses)

        # test the model, classify the test document
        testEntry = ['love', 'my', 'dalmation']
        thisDoc = np.array(self.setOfWords2Vec(myVocablist, testEntry))
        print('the result is: {}'.format(self.classifyNB(thisDoc, p0v, p1v, pAb)))

        testEntry = ['stupid', 'garbage']
        thisDoc = np.array(NB.setOfWords2Vec(myVocablist, testEntry))
        print('the result is: {}'.format(self.classifyNB(thisDoc, p0v, p1v, pAb)))

In [3]:
NB = NaiveBayes()
NB.testNB()

the result is: 0
the word: garbage is not in my VocabList
the result is: 1


In [4]:
class spamTestClassifier():
    def __init__(self):
        self.reEx = r'\W+' # define the regular expression used in spliting string
        
    def textParse(self, bigString):
        listOfTokens = re.split(self.reEx, bigString)
        # remain the token which length exceeds 2.
        return [tok.lower() for tok in listOfTokens if len(tok)>2]
    
    def spam_test(self):
        doc_list = [] # store the vocabulary list for each text
        full_text = [] # flip all text and store all vocabulary in every text
        class_list = [] # store the label
        for i in range(1, 26):
            # split the spam text and get the vocabulary set 
            try:
                words = self.textParse(open('..\data\Ch04\email\spam\{}.txt'.format(i)).read())
            except:
                words = self.textParse(open('..\data\Ch04\email\spam\{}.txt'.format(i), 
                                            encoding='Windows 1252').read())
            doc_list.append(words)
            full_text.extend(words)
            class_list.append(1)
            
            # split the ham text and get the vocabulary set 
            try:
                words = self.textParse(open('..\data\Ch04\email\ham\{}.txt'.format(i)).read())
            except:
                words = self.textParse(open('..\data\Ch04\email\ham\{}.txt'.format(i), 
                                            encoding='Windows 1252').read())
            doc_list.append(words)
            full_text.extend(words)
            class_list.append(0)
        
        # generate unique vocabulary list
        NB = NaiveBayes()
        vocabList = NB.createVocabList(doc_list)
        
        # generate 10 integer as the test set index
        testSetIndex = [int(num) for num in random.sample(range(50), 10)]
        trainingSetIndex = list(set(range(50)) - set(testSetIndex)) # get the training set index
        
        # convert the text in training set into feature vector
        trainingMat = []
        trainingClass = []
        for doc_index in trainingSetIndex:
            trainingMat.append(NB.setOfWords2Vec(vocabList, doc_list[doc_index]))
            trainingClass.append(class_list[doc_index])
            
        # the trainingMat is list-type, we need to convert it into np.array type
        p0v, p1v, p_spam = NB.trainNB0(
            np.array(trainingMat),
            np.array(trainingClass)
        )
        # count the error classified result and print the error rate
        errorCount = 0
        for doc_index in testSetIndex:
            # convert the doc in test set into feature vector
            wordVec = NB.setOfWords2Vec(vocabList, doc_list[doc_index])
            if NB.classifyNB(np.array(wordVec), p0v, p1v, p_spam) != class_list[doc_index]:
                errorCount += 1
        print ('the error rate is {}'.format(errorCount / len(testSetIndex)))
       

In [5]:
spamClassifier = spamTestClassifier()
spamClassifier.spam_test()

the error rate is 0.0
