<a href="https://colab.research.google.com/github/lawun330/FCC-ML-Journey/blob/main/fcc_sms_text_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# import libraries
try:
  # %tensorflow_version only exists in Colab.
  !pip install tf-nightly
except Exception:
  pass
import tensorflow as tf
import pandas as pd
from tensorflow import keras
!pip install tensorflow-datasets
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt

print(tf.__version__)

In [None]:
# get data files
!wget https://cdn.freecodecamp.org/project-data/sms/train-data.tsv
!wget https://cdn.freecodecamp.org/project-data/sms/valid-data.tsv

train_file_path = "train-data.tsv"
test_file_path = "valid-data.tsv"

In [None]:
# create dataframe objects from tsv file
train_dataset = pd.read_table(train_file_path, names=['Type', 'Message'])
test_dataset = pd.read_csv(test_file_path, sep='\t', names=['Type', 'Message'])

# create labels and features
train_labels, train_features = train_dataset['Type'], train_dataset['Message']
test_labels, test_features = test_dataset['Type'], test_dataset['Message']

In [None]:
# encoding
def bag_of_words(list_of_sentences, word_encoding_integer=1):
    vocab = {} # a set of unique vocabularies in a dataset
    bag = {} # show how many times a vocabulary appears in a dataset
    encoded_sentences = [] # encoded lines

    for sentence in list_of_sentences:
        words_per_sentence = sentence.lower().split(" ") # list the words
        encoded_sentence = []

        for word in words_per_sentence: # for every word in a sentence
            if word in vocab: # if the word is encoded already
                encoded_integer = vocab[word] # get its encoded value
            else: # if the word is new
                vocab[word] = word_encoding_integer # assign the encoded value
                encoded_integer = word_encoding_integer # get its encoded value
                word_encoding_integer += 1 # update encoding value for next word

            # if the word is new, assign frequency # if it is old, add frequency
            if encoded_integer in bag: bag[encoded_integer] += 1
            else: bag[encoded_integer] = 1

            encoded_sentence.append(encoded_integer) # convert each line of words into line of encoded integers

        encoded_sentences.append(encoded_sentence)

    return vocab, bag, encoded_sentences

In [None]:
# decoding
example_vocab = {}
def decode_int(encoded_sentence=[], vocab_name=example_vocab):
    decoded_sentence = ""
    for names, encoded_ints in vocab_name.items(): # iterate through the given set of vocabulary
        for ints in encoded_sentence: # iterate through the encoded values within an encoded sentence
            if encoded_ints == ints: # if each encoded value is in vocab set
                decoded_sentence += " "+str(names) # get its original word
    return decoded_sentence

In [None]:
# encoding the features
# the encoding integer 0 represents lacking of a word, the encoding integer value should start from 1
vocab, vocab_repetition, encoded_sentences = bag_of_words(train_features) # encoding integer value starts from 1, the default
test_vocab, test_vocab_repetition, encoded_test_sentences = bag_of_words(test_features)

# encoding the labels
# with "sigmoid" activation the encoding integers should be 0 and 1
label_vocab, label_vocab_repetition, encoded_labels = bag_of_words(train_labels,0) # encoding integer value starts from 0
label_test_vocab, label_test_vocab_repetition, encoded_t_labels = bag_of_words(test_labels,0)
# test the functions to features and labels
print(train_features[0]) # original
print(encoded_sentences[0]) # encoded
print(decode_int(encoded_sentences[0], vocab_name=vocab)) # decoded

print()
print(test_features[0]) # original
print(encoded_test_sentences[0]) # encoded
print(decode_int(encoded_test_sentences[0], vocab_name=test_vocab)) # decoded

print()
print(train_labels[0], test_labels[0]) # original
print(encoded_labels[0], encoded_t_labels[0]) # encoded
print(decode_int(encoded_labels[0], vocab_name=label_vocab), decode_int(encoded_t_labels[0], vocab_name=label_test_vocab)) # decoded

In [None]:
# Finding MAX_LEN by the average word length
def find_MAX_LEN(list_of_sentences):
    total_word_counts=0
    for i in list_of_sentences:
        total_word_counts+=len(i)

    return total_word_counts // len(list_of_sentences)

MAX_LEN = find_MAX_LEN(train_features)
MAX_LEN

In [None]:
# variables
MAX_LEN = 80
BATCH_SIZE = 64
VOCAB_SIZE = len(vocab) #11331
epochs = 10

In [None]:
# preprocessing the data

# make every sentence the same length
train_data = keras.utils.pad_sequences(encoded_sentences, MAX_LEN)
test_data = keras.utils.pad_sequences(encoded_test_sentences, MAX_LEN)

# flatten the nested list in labels for the model
encoded_train_labels = [eachVal for eachList in encoded_labels for eachVal in eachList]
encoded_test_labels = [eachVal for eachList in encoded_t_labels for eachVal in eachList]

# convert to numpy format
train_data = np.array(train_data)
test_data = np.array(test_data)
encoded_train_labels = np.array(encoded_train_labels)
encoded_test_labels = np.array(encoded_test_labels)

In [None]:
# build the model
model = tf.keras.Sequential([
                             tf.keras.layers.Embedding(VOCAB_SIZE+1, 32),
                             tf.keras.layers.LSTM(32),
                             tf.keras.layers.Dense(1, activation="sigmoid")
])
model.summary()

# compile
model.compile(loss="binary_crossentropy",
              optimizer="rmsprop",
              metrics=['accuracy'])
# train
history = model.fit(train_data, encoded_train_labels, epochs=epochs, validation_split=0.2)

In [None]:
# evaluate
results = model.evaluate(test_data, encoded_test_labels)

In [None]:
# function to predict messages based on model
# (should return list containing prediction and label, ex. [0.008318834938108921, 'ham'])
def predict_message(pred_text):
    prediction = []
    tokens = keras.preprocessing.text.text_to_word_sequence(pred_text)
    tokens = [vocab[word] if word in vocab else 0 for word in tokens]
    encoded_pred_text = keras.utils.pad_sequences([tokens], MAX_LEN)[0] # numpy object

    pred = np.zeros((1, MAX_LEN))
    pred[0] = encoded_pred_text

    predicted_array = model.predict(pred)
    predicted_val = predicted_array[0].tolist()[0]
    prediction.append(predicted_val)

    if predicted_val<0.5:
        prediction.append("ham")
    elif predicted_val>=0.5:
        prediction.append("spam")
    return (prediction)

pred_text = "how are you doing today?"

prediction = predict_message(pred_text)
print(prediction)

In [None]:
# Run this cell to test your function and model. Do not modify contents.
def test_predictions():
  test_messages = ["how are you doing today",
                   "sale today! to stop texts call 98912460324",
                   "i dont want to go. can we try it a different day? available sat",
                   "our new mobile video service is live. just install on your phone to start watching.",
                   "you have won Â£1000 cash! call to claim your prize.",
                   "i'll bring it tomorrow. don't forget the milk.",
                   "wow, is your arm alright. that happened to me one time too"
                  ]

  test_answers = ["ham", "spam", "ham", "spam", "spam", "ham", "ham"]
  passed = True

  for msg, ans in zip(test_messages, test_answers):
    prediction = predict_message(msg)
    if prediction[1] != ans:
      passed = False

  if passed:
    print("You passed the challenge. Great job!")
  else:
    print("You haven't passed yet. Keep trying.")

test_predictions()
