In [None]:
# Encoder-Transformer

import re
from bs4 import BeautifulSoup
import zipfile
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')


lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

def clean_text(file_content : str, lemmatize: bool , html_parser: bool, rm_numbers: bool):
  # with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
  # with open(file_path, "r") as file:
  #   file_content = file.read().decode('utf-8').lower()
  #content = file_content.strip().split('\n')
  content = file_content.lower()
  if html_parser:
    content = BeautifulSoup(content, "html.parser").text
  if rm_numbers:
    content = re.sub(r'\d+', '', content)
  content = ' '.join(content.split())
  tokens = word_tokenize(content)
  tokens = [word for word in tokens if word.lower() not in stop_words]
  if lemmatize:
    tokens = [lemmatizer.lemmatize(token) for token in tokens]
  return tokens


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# Input Embeddings
import numpy as np
from gensim.downloader import load
from scipy.sparse import vstack, csr_matrix

def create_embeddings(sent_tokens, model):

  num_columns = model.vector_size
  # num_rows = 0  # No rows initially
  word_embeddings = []
  # Create the csr_matrix
  # embed_matrix = csr_matrix((num_rows, num_columns))
  for token in sent_tokens:
    if token in model: #model.key_to_index:
      word_embeddings.append(model[token])
    else:
      word_embeddings.append(np.zeros(model.vector_size))

  embed_matrix = csr_matrix(word_embeddings)
  return embed_matrix

In [None]:
def positional_encodings(sent_tokens, model_size=50):
    num_columns = model_size
    sent_len = len(sent_tokens)

    encodings = []

    for pos in range(sent_len):
        pos_encoding = np.zeros(num_columns)
        for i in range(num_columns):
            if i % 2 == 0:
                pos_encoding[i] = np.sin(pos / np.power(10000, (2 * i) / num_columns))
            else:
                pos_encoding[i] = np.cos(pos / np.power(10000, (2 * i) / num_columns))
        encodings.append(pos_encoding)

    # Convert the list of position encodings to a sparse matrix
    pos_encodings_matrix = csr_matrix(encodings)

    return pos_encodings_matrix

In [None]:
def sum_encodings_embeddings(X,Y):
  return (X+Y).toarray()

In [None]:
def xavier_init(fan_in, fan_out):
    limit = np.sqrt(6 / (fan_in + fan_out))
    return np.random.uniform(-limit, limit, size=(fan_in, fan_out))

def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=-1, keepdims=True)

def self_attention(X, model_size=50, d_k = 50):
  Q, K, V = X, X, X
  d_model = model_size
  W_Q = xavier_init(d_model, d_k)
  W_K = xavier_init(d_model, d_k)
  W_V = xavier_init(d_model, d_k)
  Q_i = np.dot(Q, W_Q)
  K_i = np.dot(K, W_K)
  V_i = np.dot(V, W_V)

  attention_scores = np.dot(Q_i, K_i.T) / np.sqrt(d_k)
  attention_weights = softmax(attention_scores)

  # Compute the attention output
  attention_output = np.dot(attention_weights, V_i)

  return attention_output

In [None]:
def layer_norm(x, eps=1e-6):
    mean = np.mean(x, axis=-1, keepdims=True)
    std = np.std(x, axis=-1, keepdims=True)
    return (x - mean) / (std + eps)

# Y is the result from the multihead attention
def add_and_norm(X, Y):
  return layer_norm(X + Y)

In [None]:

def feedforward_network(x, d_ff, d_model=50):
    """
    Apply the feedforward network to the input x.

    Parameters:
    - x: Input array with shape (batch_size, sequence_length, d_model)
    - d_model: Dimensionality of the model (input and output dimensions)
    - d_ff: Dimensionality of the hidden layer in the feedforward network

    Returns:
    - Output array with the same shape as input x
    """

    # Initialize weights and biases for the two layers
    W1 = xavier_init(d_model, d_ff)
    b1 = xavier_init(x.shape[0], d_ff)
    W2 = xavier_init(d_ff, x.shape[1])
    b2 = xavier_init(x.shape[0], x.shape[1])

    # Apply the feedforward network
    hidden_layer = np.dot(x, W1) + b1
    hidden_layer = np.maximum(0, hidden_layer)  # ReLU activation
    output_layer = np.dot(hidden_layer, W2) + b2

    return output_layer

In [None]:
# Load a pre-trained Word2Vec model
word2vec_model = load('glove-wiki-gigaword-50')

In [None]:


def encoder(input, model):
  #cleaned_input = clean_text(file_path, lemmatize = True, html_parser = False, rm_numbers = True)
  input_embeddings = create_embeddings(input, model)
  pos_encodings = positional_encodings(input, model_size=50)
  input_with_pos = sum_encodings_embeddings(input_embeddings, pos_encodings)
  print(input_with_pos.shape)
  self_head_attention = self_attention(input_with_pos, model_size=50, d_k = 50)
  print(self_head_attention.shape)
  adding_and_norm_one = add_and_norm(input_with_pos, self_head_attention,)
  print(adding_and_norm_one.shape)
  ff_layer = feedforward_network(adding_and_norm_one, d_ff= 150, d_model=50)
  print(ff_layer.shape)
  adding_and_norm_two = add_and_norm(ff_layer, adding_and_norm_one)
  return softmax(adding_and_norm_two)

In [None]:
# import tensorflow as tf
# from tensorflow.keras.datasets import imdb
# from tensorflow.keras.preprocessing.sequence import pad_sequences

# # Load IMDB dataset
# (train_data, train_labels), (test_data, test_labels) = imdb.load_data(num_words=10000)

# # Preprocess data: pad sequences to ensure uniform input size
# maxlen = 100
# train_data = pad_sequences(train_data, maxlen=maxlen)
# test_data = pad_sequences(test_data, maxlen=maxlen)


from google.colab import drive
drive.mount('/content/drive')



Mounted at /content/drive


In [None]:
# # Get the word index dictionary
# word_index = imdb.get_word_index()

# # Invert the word index to get a mapping from integer indices to words
# reverse_word_index = {value: key for (key, value) in word_index.items()}

# # Decode a review (let's take the first one from train_data)
# decoded_review = ' '.join([reverse_word_index.get(i - 3, '?') for i in train_data[0]])

with open("/content/drive/MyDrive/Colab Notebooks/SMSSpamCollection", "r") as file:
  file_content = file.readlines()
spam_content = [line for line in file_content if "spam" in line]
#print(spam_content[:2])
sp_content = [line[5:] for line in spam_content]
print(sp_content[:2])
# clean_text("/content/drive/MyDrive/Colab Notebooks/SMSSpamCollection", True , False, True)

["Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's\n", "FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv\n"]


In [None]:
print(len(sp_content))
train_data = sp_content[:498]
test_data = sp_content[498:]

747


In [None]:
sample_text = train_data[0]
print(sample_text)

Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's



In [None]:
train_labels = [1 for elem in train_data]
predicted_list = []
for i in range(len(train_labels)):
  print(train_data[i])
  cleaned_text = clean_text(train_data[i], True, True, True)
  print(cleaned_text)
  result = encoder(cleaned_text, word2vec_model)
  predicted_list.append(np.argmax(result))
  break

Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's

['free', 'entry', 'wkly', 'comp', 'win', 'fa', 'cup', 'final', 'tkts', 'st', 'may', '.', 'text', 'fa', 'receive', 'entry', 'question', '(', 'std', 'txt', 'rate', ')', '&', 'c', "'s", 'apply', "'s"]
(27, 50)
(27, 50)
(27, 50)
(27, 50)


In [None]:
print(predicted_list)

[900]


In [None]:
result = encoder(train_data, word2vec_model)
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Example: Using Logistic Regression for classification
classifier = LogisticRegression()
classifier.fit(result, train_labels)  # Ensure you use the correct labels here

# Predict on test data
test_result = encoder(test_data, word2vec_model)
predictions = classifier.predict(test_result)

# Evaluate the model
accuracy = accuracy_score(test_labels, predictions)
print(f"Accuracy: {accuracy:.4f}")

TypeError: unhashable type: 'numpy.ndarray'

In [None]:
# Saving model
import joblib

# Save the classifier
joblib.dump(classifier, 'classifier_model.pkl')
# Save the encoder if needed
joblib.dump(encoder, 'encoder_model.pkl')

In [None]:
# Loading model
# Load the classifier
classifier = joblib.load('classifier_model.pkl')