## Evaluating the trained models on our handmade dataset

Alex Ludwigson

In [1]:
import torch, torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import numpy as np
import pandas as pd
import nltk
import math
from tqdm import tqdm
import os 
import io
import transformers
from transformers import GPT2Config, GPT2ForSequenceClassification, GPT2Tokenizer, AutoTokenizer,  AutoModelForSequenceClassification

import matplotlib.pyplot as plt

Transformer's performance on the handmade dataset:

In [2]:
glove_file = "./datasets/glove.6B.100d.txt" #or 50d

embeddings_dict = {}

with open(glove_file, 'r', encoding='utf8') as f:
    for i, line in enumerate(f):
        if i == 0:
            print(line)
        line = line.strip().split(' ')
        word = line[0]
        embed = np.asarray(line[1:], "float")

        embeddings_dict[word] = embed

print('Loaded {} words from glove'.format(len(embeddings_dict)))

embedding_matrix = np.zeros((len(embeddings_dict)+2, 100)) #add 1 for padding

word2id = {}
for i, word in enumerate(embeddings_dict.keys()):

    word2id[word] = i                                #Map each word to an index
    embedding_matrix[i] = embeddings_dict[word]      #That index holds the Glove embedding in the embedding matrix

# Our joint vocabulary for both models / sanity check to see if we've loaded it correctly:
print(word2id['the'])
print(embedding_matrix[word2id['the']])

word2id['<pad>'] = embedding_matrix.shape[0] - 2
word2id['<start>'] = embedding_matrix.shape[0] - 1
print(embedding_matrix[word2id['<pad>']])

max_length = 120 #inclusive of start token
start_id = word2id['<start>']

class TransformerModel(nn.Module):

    def __init__(self, embedding_matrix, model_size, n_heads, n_layers, hidden_size, embedding_dims=100, vocab_size=None):
        super().__init__()

        if not (embedding_matrix is None): #glove
            self.embedding = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix))
        else:
            self.embedding = nn.Embedding(vocab_size, embedding_dims)
        self.pos_encoding = PositionalEncoding(embedding_dims, max_length)
        self.input_linear = nn.Linear(embedding_dims, model_size)
        encoder_layers = nn.TransformerEncoderLayer(model_size, n_heads, hidden_size, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layers, n_layers)
        #self.encoder = nn.Transformer(encoder_layers, n_layers, batch_first=True)
        self.output_hidden_1 = nn.Linear(model_size, hidden_size)
        self.relu = nn.ReLU()
        self.output_hidden_2 = nn.Linear(hidden_size, 2) #binary classification
        self.model_size = model_size

        #initialize
        initrange = 0.1
        self.input_linear.weight.data.uniform_(-initrange, initrange)
        self.input_linear.bias.data.zero_()
        self.output_hidden_1.weight.data.uniform_(-initrange, initrange)
        self.output_hidden_1.bias.data.zero_()
        self.output_hidden_2.weight.data.uniform_(-initrange, initrange)
        self.output_hidden_2.bias.data.zero_()

    def forward(self, input):

        #print("intput.shape: ", input.shape, len(input.shape))
        input = (self.embedding(input) * math.sqrt(self.model_size)) #recommended from documentation
        input = self.pos_encoding(input)
        #print("input after poe:", input.shape)

        input = self.input_linear(input) #get a representation that has the model size for the positionally encoded embeddings
        #print("after input linear: ", input.shape)
        
        output = self.encoder(input)[:,0] #take the last vector
        #print("after encoder: ", output.shape)
        output = self.output_hidden_1(output)

        output = self.relu(output)
        output = self.output_hidden_2(output)

        #print("after linear:", output.shape)

        return output

class PositionalEncoding(nn.Module):

    def __init__(self, model_size, max_len): #from torch documentation
        super().__init__()
        self.dropout = nn.Dropout(p=0.1)
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, model_size, 2) * (-math.log(10000.0) / model_size))
        pe = torch.zeros(max_len, 1, model_size)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
        
    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

#Define hyperparameters
epochs = 6
batch_size = 32
print_frequency = 250
n_heads = 2
n_layers = 2
model_size = 28
hidden_size = 48
pos_weight_coeff = 1.05


def predict(model, valid_dataloader):

    softmax = nn.Softmax()

    model.eval()
    
    sigmoid = nn.Sigmoid()

    total_examples = 0
    total_positive = 0
    total_negative = 0

    true_positive = 0
    false_positive = 0
    true_negative = 0
    false_negative = 0

    for x, y in valid_dataloader:
        x = x.squeeze()
        if (len(x.shape) == 0): continue
        output = model(x)
        output = softmax(output)

        for i in range(output.shape[0]):
            if (output[i][0].item() >= 0.5):
                if (y[i].item() == 0):
                    true_negative += 1
                    total_negative += 1
                else:
                    false_negative += 1
                    total_positive += 1
                    #print(y)
            else:
                if (y[i].item() == 0):
                    false_positive += 1
                    total_negative += 1
                    #print(y)
                else:
                    true_positive += 1
                    total_positive += 1
        total_examples += output.shape[0]
        #print("total examples:", total_examples, "; T+:", true_positive, "; F+:", false_positive, "; T-:", true_negative, "; F-:", false_negative)

    accuracy = (true_positive + true_negative) / total_examples
    t_p = true_positive/total_examples
    f_p = false_positive/total_examples
    t_n = true_negative/total_examples
    f_n = false_negative/total_examples
    p = true_positive/(true_positive + false_positive)
    r = true_positive/(true_positive + false_negative)
    f_score = (2*p*r)/(p+r)

    print('accuracy: %s/%s = %s' % (true_positive+true_negative, total_examples, (true_positive + true_negative) / total_examples))
    print('True positive: %s' % t_p)
    print("False positive: %s" % f_p)
    print('True negative: %s' % t_n)
    print("False negative: %s" % f_n)
    print("(P, R, F-Score) = (%s, %s, %s)\n" % (p, r, f_score))
    return accuracy

def tokenize_example(line):
    example = [start_id]
    tokenized = nltk.word_tokenize(line)
    i = 0
    for token in tokenized:
        if not (token in word2id): continue #not using <unk> for spam dataset
        i += 1
        if (i >= max_length): break
        example.append(word2id[token])
        
    #add padding
    padding = word2id["<pad>"]
    for i in range(max_length - len(example)):
        example.append(padding)
    return np.array(example)

def tokenize(df):
    examples = []
    for index, row in df.iterrows():
        example = tokenize_example(row["text"])
        if (len(example) > 0 and len(example.shape) > 0): examples.append((example, row["label"]))
    return examples

#load the state dict 
transform = TransformerModel(embedding_matrix, model_size=model_size, n_heads=n_heads, n_layers=n_layers, hidden_size=hidden_size)
transform.load_state_dict(torch.load('./trained_models/spam.pt'))
#read the file
spam_hm = pd.read_csv("./HomebrewDataset.csv")
print(spam_hm.head())
spam_hm_tok = tokenize(spam_hm)
spam_valid_dataloader = torch.utils.data.DataLoader(spam_hm_tok, batch_size=batch_size)
predict(transform, spam_valid_dataloader)


the -0.038194 -0.24487 0.72812 -0.39961 0.083172 0.043953 -0.39141 0.3344 -0.57545 0.087459 0.28787 -0.06731 0.30906 -0.26384 -0.13231 -0.20757 0.33395 -0.33848 -0.31743 -0.48336 0.1464 -0.37304 0.34577 0.052041 0.44946 -0.46971 0.02628 -0.54155 -0.15518 -0.14107 -0.039722 0.28277 0.14393 0.23464 -0.31021 0.086173 0.20397 0.52624 0.17164 -0.082378 -0.71787 -0.41531 0.20335 -0.12763 0.41367 0.55187 0.57908 -0.33477 -0.36559 -0.54857 -0.062892 0.26584 0.30205 0.99775 -0.80481 -3.0243 0.01254 -0.36942 2.2167 0.72201 -0.24978 0.92136 0.034514 0.46745 1.1079 -0.19358 -0.074575 0.23353 -0.052062 -0.22044 0.057162 -0.15806 -0.30798 -0.41625 0.37972 0.15006 -0.53212 -0.2055 -1.2526 0.071624 0.70565 0.49744 -0.42063 0.26148 -1.538 -0.30223 -0.073438 -0.28312 0.37104 -0.25217 0.016215 -0.017099 -0.38984 0.87424 -0.72569 -0.51058 -0.52028 -0.1459 0.8278 0.27062

Loaded 400000 words from glove
0
[-0.038194 -0.24487   0.72812  -0.39961   0.083172  0.043953 -0.39141
  0.3344   -0.57545   0.087459  0

  return self._call_impl(*args, **kwargs)


0.7333333333333333

Bert's performance on the handmade dataset:

In [3]:
def remove_doublequotes(file_dir): # 
    raw_file_str = ''
    with open(file_dir, 'r', encoding='utf-8') as f:
        raw_file_str = f.read().replace('""', '"')
    with open(file_dir, 'w', encoding='utf-8') as f:
        f.write(raw_file_str)

#partition a dictionary
def split_dict (dict1, index):
    dict1c = dict1
    dict1 = dict(list(dict1c.items())[index:])
    dict2 = dict(list(dict1c.items())[:index])
    return dict1, dict2

device = torch.device("cpu") #sets your device. Code will run at a snails pace without gpu



class Datasets(Dataset):
    def __init__ (self, testpath=None, Emails = None, size =None, final_data = None , data_processed=False): 

        if (data_processed):
            self.length = len(final_data)
            self.test_set = final_data
            self.set_labels = [self.test_set[x] for x in self.test_set]
            self.set_text = list (self.test_set.keys())
            return

        if Emails == False:
            remove_doublequotes(testpath)


        if (size):
            self.test_set = pd.read_csv(testpath, nrows = size)
        else: 
            self.test_set = pd.read_csv(testpath)
 
        if (Emails == True):
            self.test_set = self.test_set.set_index('text')['label'].to_dict()

        else:
            self.test_set['Unnamed: 0'] = self.test_set ['Unnamed: 0'].apply(lambda x : 2 - x)
            self.test_set =  self.test_set.set_index('text_label')['Unnamed: 0'].to_dict() 

        self.length = len (self.test_set)

        self.set_labels = [self.test_set[x] for x in self.test_set]
        self.set_text = list (self.test_set.keys())

        return
    #homework 3 inspired validation data spilt function
    def split(self, ratio = .8 ):
        index = int(ratio*self.length)

        split,self.test_set = split_dict(self.test_set, index)

        self.set_labels = self.set_labels[:index]
        self.set_text = self.set_text[:index]
        self.length = len(self.test_set)

        return split
    #functions required by pytorch for handling data
    def __len__(self):
        return self.length

    def __getitem__(self, index):
        return {'text': self.set_text[index], 'label': self.set_labels[index]}
    

class _tokenize(object):
    def __init__(self,  use_tokenizer, max=512):
         self.use_tokenizer = use_tokenizer
         self.max_sequence_len =max
    #basically just calls the tokenizer, returning embeddings  dict
    def __call__(self, data):
        text= [x['text'] for x in data]
        label = [x ['label'] for x in data]


        embeddings = self.use_tokenizer(text=text, return_tensors = "pt", padding = True, truncation= True, max_length = self.max_sequence_len)
        embeddings.update({'labels' : torch.tensor(label)})
        return embeddings
    
def calculate_stats(labels, predictions):
    acc = 0.0
    fp =0.0
    fn = 0.0
    tp =0.0
    tn = 0.0
    size = len(labels)
    counter = 0

    for x in labels:
        if x == 1 and predictions[counter] == 1:
            tp+=1
            acc +=1
        elif x == 0 and predictions[counter] == 0:
            tn+=1
            acc+=1
        elif x == 1:
            fp  +=1
        elif x == 0:
            fn +=1 
        counter +=1

    return [acc/size, tp/size, tn/size, fp/size, fn/size]

def test (model, data, device, ):
    print ("Evaluating")

    total_loss = 0.0
    predictions = []
    labels= []

    model.eval()
    #print(data.get_item(0))
    for batch in tqdm (data, total=len(data)):
        labels += batch['labels'].numpy().flatten().tolist()
        batch = {i:j.type(torch.long).to(device) for i,j in batch.items()}
        with torch.no_grad():


            model_out = model(**batch)
            loss,logits =model_out[:2]
            total_loss += loss.item()

            logits = logits.detach().cpu().numpy() 

            predictions  += logits.argmax(axis = -1).flatten().tolist()
    total_loss = total_loss/len(data)


    stats= calculate_stats (labels, predictions)

    return total_loss, stats


berto = AutoTokenizer.from_pretrained("bert-base-uncased")
bert = AutoModelForSequenceClassification.from_pretrained('./trained_models/bert')#f model is on gpu, ignore this
bert.to(device)
bertoken = _tokenize(berto)
homebrewdataset = Datasets(testpath='./HomebrewDataset.csv', Emails=True)
homebrew_test = DataLoader(homebrewdataset, batch_size= 1, shuffle=False, collate_fn = bertoken)
print(len(homebrew_test))
optimizer = torch.optim.AdamW(bert.parameters(), lr= 2e-5, eps = 1e-8)
steps = len(homebrew_test)
scheduler = transformers.get_linear_schedule_with_warmup(optimizer, num_warmup_steps = 0 , num_training_steps = steps)


v_loss, stats_test = test(bert,homebrew_test, device)
print("ACCURACY:")
print(stats_test[0])

30
Evaluating


100%|██████████| 30/30 [00:06<00:00,  4.45it/s]

ACCURACY:
0.9333333333333333





Logistic Regressions's performance on handmade dataset:

In [4]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score, roc_curve, auc, precision_recall_curve, confusion_matrix
import matplotlib.pyplot as plt
#import seaborn as sns

tfidf_vectorizer = TfidfVectorizer(stop_words='english', max_features=5000)
split_ratio = 0.8

# Load datasets
spam_test = pd.read_csv('./datasets/SpamHam/test.csv').fillna('')
spam_test['label'] = spam_test['label']


spam_ftrain = pd.read_csv('./datasets/SpamHam/train.csv').fillna('')
spam_ftrain['label'] = spam_ftrain['label']
spamsize = int(split_ratio*spam_ftrain.shape[0])
spam_train=spam_ftrain.iloc[:spamsize]
spam_valid=spam_ftrain.iloc[spamsize:]

urls_test = pd.read_csv('datasets/PhishingURLs/test.csv').fillna('')
urls_test['label'] = urls_test['label'].apply(lambda x: 1 if x == 1 else 0)

urls_ftrain = pd.read_csv('datasets/PhishingURLs/test.csv').fillna('')
urls_ftrain['label'] = urls_ftrain['label'].apply(lambda x: 1 if x == 1 else 0)
urlsize = int(split_ratio*spam_ftrain.shape[0])
urls_train=urls_ftrain.iloc[:urlsize]
urls_valid=urls_ftrain.iloc[urlsize:]

homebrew_data = pd.read_csv('HomebrewDataset.csv').fillna('')
homebrew_data['label'] = homebrew_data['label'].apply(lambda x: 1 if x==1 else 0)

# Split each dataset into training and testing sets
#spam_train, spam_test = train_test_split(spam_full, test_size=0.2, random_state=42)
#urls_train, urls_test = train_test_split(urls_full, test_size=0.2, random_state=42)


# Combine the training data from all datasets
#combined_train = pd.concat([spam_train, urls_train], ignore_index=True)
#combined_valid = pd.concat([spam_valid, urls_valid], ignore_index=True)
# Feature extraction for combined training data
X_train_S = tfidf_vectorizer.fit_transform(spam_train['text'])
y_train_S = spam_train['label']
X_valid_S = tfidf_vectorizer.fit_transform(spam_valid['text'])
y_valid_S = spam_valid['label']

X_train_U = tfidf_vectorizer.fit_transform(urls_train['text'])
y_train_U = spam_train['label']
X_valid_U = tfidf_vectorizer.fit_transform(urls_valid['text'])
y_valid_U = spam_valid['label']
# Train Logistic Regression model
#print(y_train)
#print(X_train)
model = LogisticRegression(max_iter=1000, class_weight='balanced')
model.fit(X_train_S, y_train_S)
model.fit(X_train_U, y_train_U)

def evaluate_model(X, y, model, dataset_name):
    predictions = model.predict(X)
    accuracy = accuracy_score(y, predictions)
    conf_matrix = confusion_matrix(y, predictions)


    print(f"\n{dataset_name} Accuracy:", accuracy)
    print(conf_matrix)
    print(f"{dataset_name} Classification Report:\n", classification_report(y, predictions))

X_homebrew_test = tfidf_vectorizer.transform(homebrew_data['text'])
y_homebrew_test = homebrew_data['label']
evaluate_model(X_homebrew_test, y_homebrew_test, model, "Homebrew Test Data")


Homebrew Test Data Accuracy: 0.5333333333333333
[[10  3]
 [11  6]]
Homebrew Test Data Classification Report:
               precision    recall  f1-score   support

           0       0.48      0.77      0.59        13
           1       0.67      0.35      0.46        17

    accuracy                           0.53        30
   macro avg       0.57      0.56      0.52        30
weighted avg       0.58      0.53      0.52        30

