<a href="https://colab.research.google.com/github/phuongdoan13/ML-algo-from-scratch/blob/main/NaiveBayes_vs_JunkMail.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction
This notebook illustrates an implementation of Naive Bayes algorithm in combatting junk mail. 

The dataset source can be found at: https://www.kaggle.com/venky73/spam-mails-dataset. To use the code below, please download the dataset, then put it in the /content/ folder (i.e. import the data to google colab at the highest folder)

Reference: https://courses.cs.washington.edu/courses/cse312/18sp/lectures/naive-bayes/naivebayesnotes.pdf

# Classes

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from nltk import download
download('punkt')
download('wordnet')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [None]:
import numpy as np
import pandas as pd
from nltk.stem.wordnet import WordNetLemmatizer
from nltk.tokenize import RegexpTokenizer
from nltk.tokenize import word_tokenize

In [None]:
class DataPrepration:
  """
    Class for preparing data
  """
  def loadDataset(self, path):
    """
      Load the dataset from path
      Param:
        str path: to the data
      Return:
        Dataframe_obj df
    """
    
    return pd.read_csv(path, header=0, index_col = 0)
  
  def splitDataset(self, dataset, train_ratio = 0.8, seed = 1):
    """
      Split a data frame into train, valid, and test set
      Param:
        # # Dataframe_obj dataset: the dataset
        # float train_ratio (default: 0.8): the proportion of the train/dataset
        # int seed (default: 1): the randomness seed for shuffling the dataset
      Return:
        # Dataframe_obj train
        # Dataframe_obj test
        # Dataframe_obj valid
    """
    train, test = np.split(dataset.sample(frac=1, random_state= seed), [int(train_ratio*len(dataset))])
    # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0., random_state=42)
    return train, test
  

In [None]:
class DataAnalysis:
  """
    Class to analyse the dataset

  """
  def __init__(self):
    """
      Constructor
      Param:
        # Dataframe_obj dataset 
    """
    self.dataset = None
    self.S_count = 0 
    self.H_count = 0
    self.wordsSpamList = []
    self.wordsHamList = []
    self.wordsSpamSet = ()
    self.wordsHamSet = ()

  def analyse(self, dataset):
    """
      Analyse a dataset
      Param:
        # Dataframe_obj dataset
    """
    self.dataset = dataset
    self.__set_SandH_count()
    self.__set_SandH_wordsList()
  
  def __set_SandH_count(self):
    """
      Set the S_count and H_count 
    """
    freq_table = self.dataset[GLOBAL_label].value_counts().to_dict()
    self.S_count = freq_table[GLOBAL_label_yes] if(GLOBAL_label_yes in freq_table.keys()) else 0
    self.H_count = freq_table[GLOBAL_label_no] if(GLOBAL_label_no in freq_table.keys()) else 0

  def __set_SandH_wordsList(self):
    """
      
    """
    lmtzr = WordNetLemmatizer()
    self.wordsSpamList = [[lmtzr.lemmatize(word) for word in word_tokenize(row)] for row in self.dataset[self.dataset[GLOBAL_label] == GLOBAL_label_yes][GLOBAL_value]]
    self.wordsSpamList = [item for sublist in self.wordsSpamList for item in sublist] # flatten the nested list
    self.wordsSpamSet = set(self.wordsSpamList)

    self.wordsHamList = [[lmtzr.lemmatize(word) for word in word_tokenize(row)] for row in self.dataset[self.dataset[GLOBAL_label] == GLOBAL_label_no][GLOBAL_value]]
    self.wordsHamList = [item for sublist in self.wordsHamList for item in sublist] # flatten the nested list
    self.wordsHamSet = set(self.wordsHamList)

In [None]:
class NaiveBayes:
  """
    Class for the Naive Bayes model
    Attributes:
      # dict wS_freq: the frequency of words given Spam
      # dict wH_freq: the frequency of words given Ham
      # dict wS_chance: the P(w|S) value for each word
      # dict wH_chance: the P(w|H) value for each word
      # float S_chance: the P(S)
      # float H_chane: the P(H) 
      # DataAnalysis_obj dtAnalysis
  """

  def __init__(self):
    """
      Constructor
    """
    self.dataAnalysis = None
    self.wS_freq = {}
    self.wH_freq = {}
    self.wS_chance = {}
    self.wH_chance = {}
    self.S_chance = 0
    self.H_chance = 0
    
  def fit(self, train_dataset):
    """
      Train the model
      Param:
        # Dataframe_obj train_dataset
    """
    self.dataAnalysis = DataAnalysis()
    self.dataAnalysis.analyse(train_dataset)
    
    self.__calc_wS_freq()
    self.__calc_wS_chance()
    
    self.__calc_wH_freq()
    self.__calc_wH_chance()

    self.__calc_S_chance()
    self.__calc_H_chance()
    
  def __calc_wS_freq(self):
    """
      Create the dictionary of p(w|S)
    """
    for word in self.dataAnalysis.wordsSpamList:
      if(word in self.wS_freq):
        self.wS_freq[word] += 1
      else:
        self.wS_freq[word] = 1
  
  def __calc_wH_freq(self):
    """
      Create the dictionary of p(w|H)
    """
    for word in self.dataAnalysis.wordsHamList:
      if(word in self.wH_freq):
        self.wH_freq[word] += 1
      else:
        self.wH_freq[word] = 1

  def __calc_wS_chance(self):
    """
      Create the dictionary of p(w|S)
    """
    for word, freq in self.wS_freq.items():
      self.wS_chance[word] = (freq + 1)/(self.dataAnalysis.S_count + 2)
  
  def __calc_wH_chance(self):
    """
      Create the dictionary of p(w|H)
    """
    for word, freq in self.wH_freq.items():
      self.wH_chance[word] = (freq + 1)/(self.dataAnalysis.H_count + 2)

  def __calc_S_chance(self):
    """
      Calc the ratio of S over the total emails
    """
    self.S_chance = self.dataAnalysis.S_count / (self.dataAnalysis.S_count + self.dataAnalysis.H_count)

  def __calc_H_chance(self):
    """
      Calc the ratio of H over the total emails
    """
    self.H_chance = self.dataAnalysis.H_count / (self.dataAnalysis.S_count + self.dataAnalysis.H_count)

  def calc_ultimate_possibility(self, test_sample):
    """
      Calculate the ultimate possibility of P(S|x)
      Param:
        # Dataframe_obj test_sample: this is expect to contains 1 sample only
    """
    # Analyse the test_dataset
    test_dataAnalysis = DataAnalysis()
    test_dataAnalysis.analyse(test_sample.transpose()) 
    test_wordsSpamSet = test_dataAnalysis.wordsSpamSet
    test_wordsHamSet = test_dataAnalysis.wordsHamSet

    # Calculate the product of all p(x|S) of the test dataset
    wS_common = set(self.wS_chance.keys()).intersection(test_wordsSpamSet) # find common element between sets of words in train and test
    wS_product = 1
    for word in wS_common:
      wS_product *= self.wS_chance[word]

    # Calculate the product of all p(x|H) of the test dataset
    wH_common = set(self.wH_chance.keys()).intersection(test_wordsHamSet) # find common element between sets of words in train and test
    wH_product = 1
    for word in wH_common:
      wH_product *= self.wH_chance[word]

    return np.log(self.S_chance * wS_product) / (np.log(self.S_chance * wS_product) + np.log(self.H_chance * wH_product))
    
  def classify(self, test_dataset):
    """
      Classify the unseen dataset
      Param:
        # DataFrame_obj test_dataset
    """ 
    return test_dataset.apply(lambda row: GLOBAL_label_yes if self.calc_ultimate_possibility(row.to_frame()) > GLOBAL_spamThreshold else GLOBAL_label_no, axis = 1)

In [None]:
from sklearn.metrics import accuracy_score, recall_score, precision_score, roc_auc_score
class Evaluation:
  """
    Class for evaluating models
  """
  def getEvaluation(self, true, pred):
    """
      Return metric scores for the model
      Param:
        # list true: the true list
        # list pred: the predicted list
      Return:
        # dict _: the dictionary of metrics
    """
    return {
        "accuracy": accuracy_score(true, pred),
        "precision": precision_score(true, pred),
        "recall": recall_score(true, pred),
        "auc": roc_auc_score(true, pred)
    }

# Implementation

In [None]:
"""
  Global value
"""
GLOBAL_label = "label_num"
GLOBAL_label_yes = 1
GLOBAL_label_no = 0
GLOBAL_value = "text"
GLOBAL_spamThreshold = 0.5
GLOBAL_pred = "pred"

""" 
  Prepare data
"""
dataPreperation = DataPrepration()
df = dataPreperation.loadDataset('/content/spam_ham_dataset.csv')
train, test = dataPreperation.splitDataset(dataset = df, train_ratio = 0.8, seed = 2)

"""
  Analyse data. 
  This DataAnalysis object is a copy of the DataAnalysis object created while training the model.
  This obj is used for visualisation only.
"""
# dataAnalysis = DataAnalysis()
# dataAnalysis.analyse(train)

"""
  Train model
"""
model = NaiveBayes()
model.fit(train)

"""
  Test and evaluate
"""
print("Test dataset")
test[GLOBAL_pred] = model.classify(test)
test_metrics = Evaluation().getEvaluation(test[GLOBAL_label], test[GLOBAL_pred])
print(test_metrics)

print("Train dataset")
train[GLOBAL_pred] = model.classify(train)
train_metrics = Evaluation().getEvaluation(train[GLOBAL_label], train[GLOBAL_pred])
print(train_metrics)

Test dataset
{'accuracy': 0.961352657004831, 'precision': 0.996415770609319, 'recall': 0.8769716088328076, 'auc': 0.9377894255863202}
Train dataset
{'accuracy': 0.9540618955512572, 'precision': 0.9979919678714859, 'recall': 0.8409475465313029, 'auc': 0.9201352492304449}
