# read_file

In [33]:
#pip install parsivar

In [52]:
import collections, os
import pandas as pd
import numpy as np
from pandas import DataFrame
import string
from parsivar import Normalizer
from parsivar import FindStems, Tokenizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.metrics import classification_report, confusion_matrix,  accuracy_score
from collections import OrderedDict

In [3]:
def words(filename):
    with open(filename,'r', encoding='utf-8') as infile:
        return [line for line in infile if line != "\n"]
spam_training_directory = os.getcwd() + '/emails/spamtraining'
ham_training_directory  = os.getcwd() + '/emails/hamtraining'
spam_testing_directory = os.getcwd() + '/emails/spamtesting'
ham_testing_directory = os.getcwd() + '/emails/hamtesting'

# training set

## spam

In [4]:
spam_traning_set = []
files = os.listdir(spam_training_directory)
i = 0
for f_name in files:
        maile = words(spam_training_directory + '/' + f_name)
        text = ""
        for line in maile:
            text = text + line
        i += 1
        spam_traning_set.append([text,1])

## ham

In [5]:
ham_traning_set = []
files = os.listdir(ham_training_directory)
i = 0
for f_name in files:
    maile = words(ham_training_directory + '/' + f_name)
    text = ""
    for line in maile:
        text = text + line
    i += 1
    ham_traning_set.append([text,0])

# test set

## spam

In [6]:
spam_testing_set = []
files = os.listdir(spam_testing_directory)
i = 0
for f_name in files:
        maile = words(spam_testing_directory + '/' + f_name)
        text = ""
        for line in maile:
            text = text + line
        i += 1
        spam_testing_set.append([text,1])

## ham

In [7]:
ham_testing_set = []
files = os.listdir(ham_testing_directory)
i = 0
for f_name in files:
        maile = words(ham_testing_directory + '/' + f_name)
        text = ""
        for line in maile:
            text = text + line
        i += 1
        ham_testing_set.append([text,0])

## create a train and testing dataframe

In [19]:
spam_train = DataFrame(spam_traning_set,columns=['maile','spam'])
ham_train = DataFrame(ham_traning_set,columns=['maile','spam'])
train_set = pd.concat([spam_train, ham_train], ignore_index=True)
train_set.head()

Unnamed: 0,maile,spam
0,﻿\n *جدیدترین و قدرتمندترین محصول بزرگ کننده ...,1
1,﻿\n Ghahve JENSI :: قهوه جنسی\n \n \nدر صور...,1
2,﻿\n*درج **لینک در 7000 وبلاگ\n*\n درج لینک...,1
3,﻿\n*فارکس*\n \n \n*فارکس چیست ؟ * <http://fore...,1
4,﻿\n-------------------------------------------...,1


In [20]:
spam_test = DataFrame(spam_testing_set,columns=['maile','spam'])
ham_test = DataFrame(ham_testing_set,columns=['maile','spam'])
test_set = pd.concat([spam_test, ham_test], ignore_index=True)
test_set.head()

Unnamed: 0,maile,spam
0,﻿\n*agar ba farsi khandan moshkel darid\nbe ma...,1
1,﻿\n*agar ba farsi khandan moshkel darid\nbe ma...,1
2,﻿\nلطفا اگر این ایمیل را در اسپم مشاهده میکنید...,1
3,﻿\nارسال 90درصدی به اینباکس\nهر ثانيه يک ايميل...,1
4,﻿ عنوان بهترین گرافیست را از آن خود کنید!\n...,1


### check nan

In [16]:
train_df_missing = train_set.isna()
train_df_missing.sum()

maile    0
spam     0
dtype: int64

In [17]:
test_df_missing = test_set.isna()
test_df_missing.sum()

maile    0
spam     0
dtype: int64

In [21]:
train_set.describe()

Unnamed: 0,spam
count,600.0
mean,0.5
std,0.500417
min,0.0
25%,0.0
50%,0.5
75%,1.0
max,1.0


In [22]:
test_set.describe()

Unnamed: 0,spam
count,400.0
mean,0.5
std,0.500626
min,0.0
25%,0.0
50%,0.5
75%,1.0
max,1.0


### clean text

In [24]:
pun = list(string.punctuation)
english_w_l = list(string.ascii_lowercase)
english_w_u = list(string.ascii_uppercase)
num = ["1","2","3","4","5","6","7","8","9","0",] 
for i in range(0,len(train_set["maile"])):
    for p in pun:
        train_set.loc[i,"maile"] = train_set.loc[i,"maile"].replace(p," ")
    for w in english_w_l:
        train_set.loc[i,"maile"] = train_set.loc[i,"maile"].replace(w," ")
    for w in english_w_u:
        train_set.loc[i,"maile"] = train_set.loc[i,"maile"].replace(w," ")
    for n in num:
        train_set.loc[i,"maile"] = train_set.loc[i,"maile"].replace(n," ")

for i in range(0,len(test_set["maile"])):
    for p in pun:
        test_set.loc[i,"maile"] = test_set.loc[i,"maile"].replace(p," ")
    for w in english_w_l:
        test_set.loc[i,"maile"] = test_set.loc[i,"maile"].replace(w," ")
    for w in english_w_u:
        test_set.loc[i,"maile"] = test_set.loc[i,"maile"].replace(w," ")
    for n in num:
        test_set.loc[i,"maile"] = test_set.loc[i,"maile"].replace(n," ")

# normalize 

In [29]:
my_normalizer = Normalizer(statistical_space_correction=True)
for i in range(0,len(train_set["maile"])):
    train_set.loc[i,"maile"] = my_normalizer.normalize(train_set.loc[i,"maile"])
for i in range(0,len(test_set["maile"])):
    test_set.loc[i,"maile"] = my_normalizer.normalize(test_set.loc[i,"maile"])

# Feature Extraction

## stemmer and Tokenizer and preprocessor function

In [36]:
stemmer = FindStems()
my_tokenizer = Tokenizer()
def prep(txt):
    txt = txt.split()
    txt = [stemmer.convert_to_stem(t) for t in txt]
    txt = " ".join(txt)
    return txt

## stop words

In [37]:
with open("stop_w_p.txt",'r', encoding='utf-8') as infile:
    stop_word = [line for line in infile if line != "\n"]
for i in range (0,len(stop_word)):
    stop_word[i] = stop_word[i].replace("\n","")
stop_word = set(stop_word)

## vectorizer

In [38]:
training_text = train_set["maile"].to_list()
vectorizer = CountVectorizer(stop_words=stop_word, tokenizer= my_tokenizer.tokenize_words ,max_features=5000, ngram_range=(1,1),min_df=5,preprocessor=prep)
vectorizer.fit(training_text)



CountVectorizer(max_features=5000, min_df=5,
                preprocessor=<function prep at 0x000001EE0C3EDEE0>,
                stop_words={'آباد', 'آخ', 'آخر', 'آخرها', 'آخه', 'آدمهاست',
                            'آرام', 'آرام آرام', 'آره', 'آری', 'آزادانه',
                            'آسان', 'آسیب پذیرند', 'آشنایند', 'آشکارا', 'آقا',
                            'آقای', 'آقایان', 'آمد', 'آمدن', 'آمده', 'آمرانه',
                            'آن', 'آن گاه', 'آنان', 'آنانی', 'آنجا', 'آنرا',
                            'آنطور', 'آنقدر', ...},
                tokenizer=<bound method Tokenizer.tokenize_words of <parsivar.tokenizer.Tokenizer object at 0x000001EE091E2A30>>)

## fit vectorizer

In [39]:
x_train = vectorizer.transform(training_text).toarray()
x_train.shape

(600, 2203)

In [40]:
testing_text = test_set["maile"].to_list()
x_test = vectorizer.transform(testing_text).toarray()
x_test.shape

(400, 2203)

# Applying Chi2

In [42]:
y_train = train_set["spam"].to_list()
y_test = test_set["spam"].tolist()
transformer = SelectKBest(chi2, k=500).fit(x_train, y_train)

In [43]:
x_train = transformer.transform(x_train)
x_train.shape

(600, 500)

In [44]:
x_test = transformer.transform(x_test)
x_test.shape

(400, 500)

#  classifier

## cosin function

In [45]:
def cosin_sim(vec_a,vec_b):
    dot = sum(a*b for a, b in zip(vec_a, vec_b))
    norm_a = sum(a*a for a in vec_a) ** 0.5
    norm_b = sum(b*b for b in vec_b) ** 0.5
    return dot / (norm_a*norm_b)

# knn class for cosin_sim

In [46]:
class kNN():
    def __init__(self):
        pass

    def fit(self, X, y):
        self.data = X
        self.targets = y

    def cosin_distance(self, x):
        dists = []
        for ts in x:
            d = []
            for tr in self.data:
                d.append(cosin_sim(ts,tr))
            dists.append(np.array(d))
        return dists
    
    def predict_cosin(self, x, k=1):
        # compute distance between input and training data
        dists = self.cosin_distance(x)
        # find the k nearest neighbors and their classifications
        result = []
        for test_dis in dists:
            knn = np.argsort(test_dis)[(-k):]
            y_knn = []
            for index in knn:
                y_knn.append(y_train[index])
            result.append(max(y_knn, key=y_knn.count))
        return np.array(result)

# test model

In [47]:
knn = kNN()
knn.fit(x_train, y_train)
y_pred = knn.predict_cosin(x_test, k=5)

  return dot / (norm_a*norm_b)


## Compute accuracy and confusion_matrix

In [49]:
correct_pred = 0
true_spam = 0
true_ham = 0
false_spam = 0
false_ham = 0
for i in range(0,len(y_pred)):
    if(y_test[i] == y_pred[i] & y_pred[i] == 1):
        true_spam += 1
    else:
        if(y_test[i] == y_pred[i]):
            true_ham += 1
        else:
            if(y_pred[i] == 1):
                false_spam += 1
            else:
                false_ham += 1

In [50]:
correct_pred = true_spam + true_ham
print("accuracy : " + str(correct_pred/len(y_pred)) + "\n")
print("true_spam : " + str(true_spam))
print("false_spam : " + str(false_spam) + "\n")
print("true_ham : " + str(true_ham))
print("false_ham : " + str(false_ham) + "\n")

accuracy : 0.8175

true_spam : 132
false_spam : 5

true_ham : 195
false_ham : 68



In [51]:
print("\nconfusion_matrix")
print(confusion_matrix(y_test, y_pred))
print("\nclassification_report")
print(classification_report(y_test, y_pred))
print("\nAccuracy")
print(accuracy_score(y_test, y_pred)*100, '%')


confusion_matrix
[[195   5]
 [ 68 132]]

classification_report
              precision    recall  f1-score   support

           0       0.74      0.97      0.84       200
           1       0.96      0.66      0.78       200

    accuracy                           0.82       400
   macro avg       0.85      0.82      0.81       400
weighted avg       0.85      0.82      0.81       400


Accuracy
81.75 %


# tf_idf

## convert email to list of uniqe word

In [53]:
def vector_email(email):
    txt = my_tokenizer.tokenize_words(email)
    vec = [stemmer.convert_to_stem(t) for t in txt if t not in stop_word]
    return list(OrderedDict.fromkeys(vec))

## count number of word in an email

In [55]:
def count_words(list_of_word, email):
    e_words = my_tokenizer.tokenize_words(email)
    e_words = [stemmer.convert_to_stem(t) for t in e_words if t not in stop_word]
    res = [e_words.count(w) for w in list_of_word]
    return res

## compute tf_idf for an email

In [56]:
def my_tf_idf(list_train_text, email):
    word_of_email = vector_email(email)
    tf_idf_vec = []
    for train_mail in list_train_text:
        tf_idf_vec.append(count_words(word_of_email, train_mail))
    df_words = [0] * len(word_of_email)
    for email_train_vec in tf_idf_vec:
        for i in range(0, len(df_words)):
            if(email_train_vec[i] > 0):
                df_words[i] += 1
    dists = []
    n = len(tf_idf_vec)
    for email_train_vec in tf_idf_vec:
        score_email = 0
        for i in range(0,len(email_train_vec)):
            if(email_train_vec[i] > 0):
                tf = np.log(email_train_vec[i]) + 1
            else:
                tf = 0
            if(df_words[i] > 0):
                score_email += tf * (np.log(n / df_words[i]))
        dists.append(score_email)
    return dists

In [57]:
my_tf_idf(train_set["maile"][:3],test_set["maile"][0])

[10.420235002326969, 1.0986122886681098, 3.2364130910642706]

## knn tf_idf

In [58]:
class kNN_tfidf():
    def __init__(self):
        pass

    def fit(self, X, y):
        self.data = X
        self.targets = y

    def tf_idf_distance(self, x):
        dists = []
        for ts in x:
            dists.append(np.array(my_tf_idf(self.data, ts)))
        return dists
    
    def predict_tf_idf(self, x, k=1):
        # compute distance between input and training data
        dists = self.tf_idf_distance(x)
        # find the k nearest neighbors and their classifications
        result = []
        for test_dis in dists:
            knn = np.argsort(test_dis)[(-k):]
            y_knn = []
            for index in knn:
                y_knn.append(y_train[index])
            result.append(max(y_knn, key=y_knn.count))
        return np.array(result)

## test model

In [59]:
knn =  kNN_tfidf()
knn.fit(train_set["maile"], y_train)
y_pred = knn.predict_tf_idf(test_set["maile"], k=5)

# Compute accuracy and confusion_matrix

In [60]:
print("\nconfusion_matrix")
print(confusion_matrix(y_test, y_pred))
print("\nclassification_report")
print(classification_report(y_test, y_pred))
print("\nAccuracy")
print(accuracy_score(y_test, y_pred)*100, '%')


confusion_matrix
[[125  75]
 [  5 195]]

classification_report
              precision    recall  f1-score   support

           0       0.96      0.62      0.76       200
           1       0.72      0.97      0.83       200

    accuracy                           0.80       400
   macro avg       0.84      0.80      0.79       400
weighted avg       0.84      0.80      0.79       400


Accuracy
80.0 %


# navie bayes

In [61]:
class navei_bayes():
    def __init__(self):
        pass

    def fit(self, x, y):
        words_of_test_mail = []
        for test_mail in x:
            l = my_tokenizer.tokenize_words(test_mail)
            for w in l:
                words_of_test_mail.append(stemmer.convert_to_stem(w))
        words_of_test_mail = [w for w in words_of_test_mail if w not in stop_word]
        self.words = list(OrderedDict.fromkeys(words_of_test_mail))
        count_number_of_word_spam = []
        count_number_of_word_ham = []
        n_spam = 0
        n_ham = 0
        for i in range(0,len(y)):
            if y[i] == 1:
                n_spam += 1
                count_number_of_word_spam.append(count_words(self.words,x[i]))
            else:
                n_ham += 1
                count_number_of_word_ham.append(count_words(self.words,x[i]))

        n_word_spam = [0] * len(self.words)
        for l in count_number_of_word_spam:
            for i in range(0,len(l)):
                n_word_spam[i] += l[i]
        n_word_ham = [0] * len(self.words)
        for l in count_number_of_word_ham:
            for i in range(0,len(l)):
                n_word_ham[i] += l[i]
        self.sum_spam = sum(n_word_spam)
        self.sum_ham = sum(n_word_ham)
        self.p_ham = n_spam/(n_spam + n_ham)
        self.p_spam = n_spam/(n_spam + n_ham)
        self.n_word_spam = [w/self.sum_spam for w in n_word_spam]
        self.n_word_ham = [w/self.sum_spam for w in n_word_ham]
        
    def transform(self, x_test):
        result = []
        for test_mail in x_test:
            mail_spam = self.p_spam
            mail_ham = self.p_ham
            l = my_tokenizer.tokenize_words(test_mail)
            l = [stemmer.convert_to_stem(w) for w in l if w not in stop_word]
            for w_test in l:
                if w_test in self.words:
                    indx = self.words.index(w_test)
                    if(self.n_word_spam[indx] > 0):
                        mail_spam *= (self.n_word_spam[indx])/((self.p_spam*self.n_word_spam[indx]) + (self.n_word_ham[indx]*self.p_ham))
                    if(self.n_word_ham[indx] > 0):
                        mail_ham *= (self.n_word_ham[indx])/((self.p_spam*self.n_word_spam[indx]) + (self.n_word_ham[indx]*self.p_ham))
            if mail_spam > mail_ham :
                result.append(1)
            else : 
                if mail_spam < mail_ham :
                    result.append(0)
                else :
                    result.append(np.nan)
        return(result)

In [62]:
n_b = navei_bayes()
n_b.fit(train_set["maile"],train_set["spam"])

In [63]:
y_pred = n_b.transform(test_set["maile"])

In [64]:
from sklearn.metrics import classification_report, confusion_matrix,  accuracy_score
print("\nconfusion_matrix")
print(confusion_matrix(y_test, y_pred))
print("\nclassification_report")
print(classification_report(y_test, y_pred))
print("\nAccuracy")
print(accuracy_score(y_test, y_pred)*100, '%')


confusion_matrix
[[ 85 115]
 [  0 200]]

classification_report
              precision    recall  f1-score   support

           0       1.00      0.42      0.60       200
           1       0.63      1.00      0.78       200

    accuracy                           0.71       400
   macro avg       0.82      0.71      0.69       400
weighted avg       0.82      0.71      0.69       400


Accuracy
71.25 %
