# Logistical Regression for Email Spam Detection through Homomorphic Encryption

Based on this tutorial: https://github.com/OpenMined/TenSEAL/blob/main/tutorials%2FTutorial%201%20-%20Training%20and%20Evaluation%20of%20Logistic%20Regression%20on%20Encrypted%20Data.ipynb

Setup Packages

In [2]:
pip install tenseal



In [3]:
import numpy as np
import pandas as pd
import string
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import nltk
from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer
import torch
import tenseal as ts
import random
from time import time
#Run the below piece of code for the first time
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# Load Dataset

Dataset: https://www.kaggle.com/datasets/uciml/sms-spam-collection-dataset/data

In [4]:
message_data = pd.read_csv("spam.csv",encoding = "latin")
message_data.head()
middle_index = len(message_data) // 2
# Split the DataFrame into half
message_data = message_data.iloc[:middle_index]

Remove Unnamed Columns

In [5]:
message_data = message_data.drop(['Unnamed: 2', 'Unnamed: 3', 'Unnamed: 4'], axis = 1)

# Label Columns

In [6]:
message_data = message_data.rename(columns = {'v1':'Spam/Not_Spam','v2':'message'})

In [7]:
message_data['Spam/Not_Spam'] = message_data['Spam/Not_Spam'].replace({'spam': 0, 'ham': 1})

In [8]:
message_data.groupby('Spam/Not_Spam').describe()

Unnamed: 0_level_0,message,message,message,message
Unnamed: 0_level_1,count,unique,top,freq
Spam/Not_Spam,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
0,381,358,I don't know u and u don't know me. Send CHAT ...,3
1,2405,2300,"Sorry, I'll call later",19


In [9]:
message_data_copy = message_data['message'].copy()

In [10]:
def text_preprocess(text):
    text = text.translate(str.maketrans('', '', string.punctuation))
    text = [word for word in text.split() if word.lower() not in stopwords.words('english')]
    return " ".join(text)

In [11]:
message_data_copy = message_data_copy.apply(text_preprocess)

# Embed Dataset

In [12]:
message_data_copy

0       Go jurong point crazy Available bugis n great ...
1                                 Ok lar Joking wif u oni
2       Free entry 2 wkly comp win FA Cup final tkts 2...
3                     U dun say early hor U c already say
4             Nah dont think goes usf lives around though
                              ...                        
2781    likely called Mittelschmertz Google dont parac...
2782    Well right Im gonna get check todays steam sal...
2783                          arrived see couple days lt3
2784                                   K wat tht incident
2785                                   Yeah get unlimited
Name: message, Length: 2786, dtype: object

In [13]:
vectorizer = TfidfVectorizer(stop_words='english')

In [14]:
message_mat = vectorizer.fit_transform(message_data_copy)
message_mat
coo = message_mat.tocoo()

# Stem and Normalize length of the messages

In [15]:
# Stemming: Making text easier and consistent to process
def stemmer (text):
    text = text.split()
    words = ""
    for i in text:
            stemmer = SnowballStemmer(language='english')
            words += (stemmer.stem(i))+" "
    return words
message_data_copy = message_data_copy.apply(stemmer)
vectorizer = TfidfVectorizer(stop_words='english')
message_mat = vectorizer.fit_transform(message_data_copy)

In [16]:
# Normalization: Adding string length
message_data['length'] = message_data['message'].apply(len)
message_data.head()
length = message_data['length'].to_numpy()
new_mat = np.hstack((message_mat.todense(),length[:, None]))

# Get Training and Testing Data

In [17]:
new_mat = np.asarray(new_mat)
message_train, message_test, spam_nospam_train, spam_nospam_test = train_test_split(
    new_mat,
    message_data['Spam/Not_Spam'],
    test_size=0.3,
    random_state=20
)
print(type (spam_nospam_test))
print(type (spam_nospam_train))

<class 'pandas.core.series.Series'>
<class 'pandas.core.series.Series'>


Format data for pytorch

In [18]:
x_train = torch.FloatTensor(message_train)
x_test = torch.FloatTensor(message_test)
y_train = torch.FloatTensor(spam_nospam_train.values)
y_test = torch.FloatTensor(spam_nospam_test.values)

In [19]:
y_train = y_train.view(-1, 1)
y_test = y_test.view(-1, 1)

In [20]:
print("Shapes of the tensors:")
print(f"x_train has shape: {x_train.shape}")
print(f"y_train has shape: {y_train.shape}")
print(f"x_test has shape: {x_test.shape}")
print(f"y_test has shape: {y_test.shape}")

Shapes of the tensors:
x_train has shape: torch.Size([1950, 5360])
y_train has shape: torch.Size([1950, 1])
x_test has shape: torch.Size([836, 5360])
y_test has shape: torch.Size([836, 1])


# Setup LR Model (Unencrypted)

Class declaration for Logistical Regression

In [21]:
class LR(torch.nn.Module):

    def __init__(self, n_features):
        super(LR, self).__init__()
        self.lr = torch.nn.Linear(n_features, 1)

    def forward(self, x):
        out = torch.sigmoid(self.lr(x))
        return out

In [22]:
n_features = x_train.shape[1]
model = LR(n_features)
# use gradient descent with a learning_rate=1
optimizer = torch.optim.SGD(model.parameters(), lr=1)
# use Binary Cross Entropy Loss
criterion = torch.nn.BCEWithLogitsLoss()

# Train / Test Model

In [23]:
EPOCHS = 5

def train(model, optimizer, criterion, x, y, epochs=EPOCHS):
    model.train()
    for e in range(1, epochs + 1):
        optimizer.zero_grad()
        out = model(x)
        # Ensure 'out' is [1950, 1] if your model architecture doesn't already guarantee this
        loss = criterion(out, y)  # No need to squeeze out, y is [1950, 1]
        loss.backward()
        optimizer.step()
        print(f"Loss at epoch {e}: {loss.item()}")
    return model
model = train(model, optimizer, criterion, x_train, y_train)

Loss at epoch 1: 0.5376841425895691
Loss at epoch 2: 0.44197991490364075
Loss at epoch 3: 0.44197991490364075
Loss at epoch 4: 0.44197991490364075
Loss at epoch 5: 0.44197991490364075


In [24]:
def accuracy(model, x, y):
    out = model(x)
    correct = torch.abs(y - out) < 0.5
    return correct.float().mean()

plain_accuracy = accuracy(model, x_test, y_test)
print(f"Accuracy on plain test_set: {plain_accuracy}")

Accuracy on plain test_set: 0.8444976210594177


# Build Homomorphically Encrypted LR Model (Accepts Encrypted Query Data)

In [25]:
class EncryptedLR:

    def __init__(self, torch_lr):
        # TenSEAL processes lists and not torch tensors,
        # so we take out the parameters from the PyTorch model
        self.weight = torch_lr.lr.weight.data.tolist()[0]
        self.bias = torch_lr.lr.bias.data.tolist()

    def forward(self, enc_x):
        # We don't need to perform sigmoid as this model
        # will only be used for evaluation, and the label
        # can be deduced without applying sigmoid
        enc_out = enc_x.dot(self.weight) + self.bias
        return enc_out

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

    ################################################
    ## You can use the functions below to perform ##
    ## the evaluation with an encrypted model     ##
    ################################################

    def encrypt(self, context):
        self.weight = ts.ckks_vector(context, self.weight)
        self.bias = ts.ckks_vector(context, self.bias)

    def decrypt(self, context):
        self.weight = self.weight.decrypt()
        self.bias = self.bias.decrypt()


eelr = EncryptedLR(model)

In [26]:
# parameters
poly_mod_degree = 4096
coeff_mod_bit_sizes = [40, 20, 40]
# create TenSEALContext
ctx_eval = ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes)
# scale of ciphertext to use
ctx_eval.global_scale = 2 ** 20
# this key is needed for doing dot-product operations
ctx_eval.generate_galois_keys()

# Encrypt Test Data

In [None]:
enc_x_test = [ts.ckks_vector(ctx_eval, x.tolist()) for x in x_test]

# Encrypt Model Parameters

In [28]:
eelr.encrypt(ctx_eval)

The following operations are disabled in this setup: matmul, matmul_plain, enc_matmul_plain, conv2d_im2col.
If you need to use those operations, try increasing the poly_modulus parameter, to fit your input.


# Test HE Model

In [29]:
def encrypted_evaluation(model, enc_x_test, y_test):
    t_start = time()

    correct = 0
    for enc_x, y in zip(enc_x_test, y_test):
        # encrypted evaluation
        enc_out = model(enc_x)
        # plain comparison
        out = enc_out.decrypt()
        out = torch.tensor(out)
        out = torch.sigmoid(out)
        if torch.abs(out - y) < 0.5:
            correct += 1

    t_end = time()
    print(f"Evaluated test_set of {len(x_test)} entries in {int(t_end - t_start)} seconds")
    print(f"Accuracy: {correct}/{len(x_test)} = {correct / len(x_test)}")
    return correct / len(x_test)


encrypted_accuracy = encrypted_evaluation(eelr, enc_x_test, y_test)
diff_accuracy = plain_accuracy - encrypted_accuracy
print(f"Difference between plain and encrypted accuracies: {diff_accuracy}")
if diff_accuracy < 0:
    print("Oh! We got a better accuracy on the encrypted test-set! The noise was on our side...")

Evaluated test_set of 836 entries in 31 seconds
Accuracy: 706/836 = 0.8444976076555024
Difference between plain and encrypted accuracies: 0.0


# Build Homomorphically Encrypted LR Model (Trains on Encrypted data)

In [30]:
class EncryptedLR:

    def __init__(self, torch_lr):
        self.weight = torch_lr.lr.weight.data.tolist()[0]
        self.bias = torch_lr.lr.bias.data.tolist()
        # we accumulate gradients and counts the number of iterations
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0

    def forward(self, enc_x):
        enc_out = enc_x.dot(self.weight) + self.bias
        enc_out = EncryptedLR.sigmoid(enc_out)
        return enc_out

    def backward(self, enc_x, enc_out, enc_y):
        out_minus_y = (enc_out - enc_y)
        self._delta_w += enc_x * out_minus_y
        self._delta_b += out_minus_y
        self._count += 1

    def update_parameters(self):
        if self._count == 0:
            raise RuntimeError("You should at least run one forward iteration")
        # update weights
        # We use a small regularization term to keep the output
        # of the linear layer in the range of the sigmoid approximation
        self.weight -= self._delta_w * (1 / self._count) + self.weight * 0.05
        self.bias -= self._delta_b * (1 / self._count)
        # reset gradient accumulators and iterations count
        self._delta_w = 0
        self._delta_b = 0
        self._count = 0

    @staticmethod
    def sigmoid(enc_x):
        # We use the polynomial approximation of degree 3
        # sigmoid(x) = 0.5 + 0.197 * x - 0.004 * x^3
        # from https://eprint.iacr.org/2018/462.pdf
        # which fits the function pretty well in the range [-5,5]
        return enc_x.polyval([0.5, 0.197, 0, -0.004])

    def plain_accuracy(self, x_test, y_test):
        # evaluate accuracy of the model on
        # the plain (x_test, y_test) dataset
        w = torch.tensor(self.weight)
        b = torch.tensor(self.bias)
        out = torch.sigmoid(x_test.matmul(w) + b).reshape(-1, 1)
        correct = torch.abs(y_test - out) < 0.5
        return correct.float().mean()

    def encrypt(self, context):
        self.weight = ts.ckks_vector(context, self.weight)
        self.bias = ts.ckks_vector(context, self.bias)

    def decrypt(self):
        self.weight = self.weight.decrypt()
        self.bias = self.bias.decrypt()

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

In [31]:
# parameters
poly_mod_degree = 8192
coeff_mod_bit_sizes = [40, 21, 21, 21, 21, 21, 21, 40]
# create TenSEALContext
ctx_training = ts.context(ts.SCHEME_TYPE.CKKS, poly_mod_degree, -1, coeff_mod_bit_sizes)
ctx_training.global_scale = 2 ** 21
ctx_training.generate_galois_keys()

Encrypt training data

In [None]:
t_start = time()
# Assuming x_train and y_train are lists of one-dimensional arrays
enc_x_train = [ts.ckks_vector(ctx_training, x) for x in x_train]
enc_y_train = [ts.ckks_vector(ctx_training, y) for y in y_train]
t_end = time()
print(f"Encryption of the training_set took {int(t_end - t_start)} seconds")

In [None]:
eelr = EncryptedLR(LR(n_features))
accuracy = eelr.plain_accuracy(x_test, y_test)
print(f"Accuracy at epoch #0 is {accuracy}")

times = []
for epoch in range(EPOCHS):
    eelr.encrypt(ctx_training)

    # if you want to keep an eye on the distribution to make sure
    # the function approximation is still working fine
    # WARNING: this operation is time consuming
    # encrypted_out_distribution(eelr, enc_x_train)

    t_start = time()
    for enc_x, enc_y in zip(enc_x_train, enc_y_train):
        enc_out = eelr.forward(enc_x)
        eelr.backward(enc_x, enc_out, enc_y)
    eelr.update_parameters()
    t_end = time()
    times.append(t_end - t_start)

    eelr.decrypt()
    accuracy = eelr.plain_accuracy(x_test, y_test)
    print(f"Accuracy at epoch #{epoch + 1} is {accuracy}")


print(f"\nAverage time per epoch: {int(sum(times) / len(times))} seconds")
print(f"Final accuracy is {accuracy}")

diff_accuracy = plain_accuracy - accuracy
print(f"Difference between plain and encrypted accuracies: {diff_accuracy}")
if diff_accuracy < 0:
    print("Oh! We got a better accuracy when training on encrypted data! The noise was on our side...")