In [None]:
# import libraries
#try:
  # %tensorflow_version only exists in Colab.
  #!pip install tf-nightly
#except Exception:
#  pass
import tensorflow as tf
import pandas as pd
from tensorflow import keras
#!pip install tensorflow-datasets
#import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt

print(tf.__version__)

In [None]:
# get data files
!wget https://cdn.freecodecamp.org/project-data/sms/train-data.tsv
!wget https://cdn.freecodecamp.org/project-data/sms/valid-data.tsv

train_file_path = "train-data.tsv"
test_file_path = "valid-data.tsv"

In [None]:
dataset_train = []
label_train = []
with open(train_file_path, mode = 'r',encoding = 'utf-8') as trfl:
    lyn = trfl.readline()
    while lyn != '':
        cat, sms = lyn.split('\t', 1)
        dataset_train.append(sms.strip())
        if cat == 'ham':
            label_train.append(0)
        elif cat == 'spam':
            label_train.append(1)
        else:
            print('Some problem with reading train-data in line:', lyn)
        lyn = trfl.readline()
dataset_test = []
label_test = []
with open(test_file_path, mode = 'r',encoding = 'utf-8') as tefl:
    lyn = tefl.readline()
    while lyn != '':
        cat, sms = lyn.split('\t', 1)
        dataset_test.append(sms.strip())
        if cat == 'ham':
            label_test.append(0)
        elif cat == 'spam':
            label_test.append(1)
        else:
            print('Some problem with reading test-data in line:', lyn)
        lyn = tefl.readline()
tfds_train = tf.data.Dataset.from_tensor_slices((dataset_train, label_train))
tfds_test = tf.data.Dataset.from_tensor_slices((dataset_test, label_test))

In [None]:
BUFFER_SIZE = 10000
BATCH_SIZE = 199
tfds_train = tfds_train.shuffle(BUFFER_SIZE, reshuffle_each_iteration = False)    # shuffles the data to BUFFER_SIZE, set buffer_size = dataset.cardinality() for uniform shuffling but will load all data into memory. reshuffle_each_iteration shuffles data every epoch (or whenever it is called, e.g. using take)
tfds_train = tfds_train.batch(BATCH_SIZE)    # uses data in this batch size
tfds_train = tfds_train.prefetch(tf.data.AUTOTUNE)    # this optimizes data fetching. also check https://www.tensorflow.org/guide/data_performance
tfds_test = tfds_test.batch(BATCH_SIZE)
tfds_test = tfds_test.prefetch(tf.data.AUTOTUNE)

In [None]:
VOCAB_SIZE = 1024
encoder = tf.keras.layers.TextVectorization(max_tokens = VOCAB_SIZE, standardize = 'lower_and_strip_punctuation')    # if max_tokens is not passed, it'll include all the words in its vocabulary. standardize option can change case to lower and/or remove all punctuations
encoder.adapt(dataset_train)
vocab = np.array(encoder.get_vocabulary())    # list of all vocabulary, sorted by frequency
print(len(vocab), vocab[:10])    # prints size and first 10 vocabulary of this layer, [UNK] refers to unknown vocabulary
print(encoder(dataset_train[1]))    # passing a sentence to this layer will return indices of each word in its vocabulary

In [None]:
# define the model
model = tf.keras.Sequential([
    encoder,    # text vectorization layer
    tf.keras.layers.Embedding(input_dim = len(encoder.get_vocabulary()), output_dim = 32, mask_zero = True),    # converts words to trainable vectors. input_dim is size of vocabulary; output_dim is size of each vector; mask_zero is needed to deal with 0-padding in variable length input
    tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(32)),    # Bidirectional wrapper propagates the input forward and backwards through the Long Short-Term Memory (LSTM) RNN layer. 32 is dimensionality of output space
    tf.keras.layers.Dense(32, activation = 'relu', kernel_initializer = 'he_uniform'),
    tf.keras.layers.Dense(1)
])
model.compile(loss = tf.keras.losses.BinaryCrossentropy(from_logits = True),    # for binary classification
              optimizer = tf.keras.optimizers.Adam(learning_rate = 1.0e-3),
              metrics = ['accuracy'])
model.summary()    # check summary of the model

In [None]:
history = model.fit(tfds_train,
                    epochs = 8,
                    validation_data = tfds_test)

In [None]:
# function to predict messages based on model
# (should return list containing prediction and label, ex. [0.008318834938108921, 'ham'])
def predict_message(pred_text):
    prediction = [tf.nn.sigmoid(model.predict([pred_text], verbose = 0)).numpy()[0,0]]
    if prediction[0] <= 0.5:
        prediction.append('ham')
    else:
        prediction.append('spam')
    return (prediction)

pred_text = "how are you doing today?"

prediction = predict_message(pred_text)
print(prediction)

In [None]:
# Run this cell to test your function and model. Do not modify contents.
def test_predictions():
  test_messages = ["how are you doing today",
                   "sale today! to stop texts call 98912460324",
                   "i dont want to go. can we try it a different day? available sat",
                   "our new mobile video service is live. just install on your phone to start watching.",
                   "you have won £1000 cash! call to claim your prize.",
                   "i'll bring it tomorrow. don't forget the milk.",
                   "wow, is your arm alright. that happened to me one time too"
                  ]

  test_answers = ["ham", "spam", "ham", "spam", "spam", "ham", "ham"]
  passed = True

  for msg, ans in zip(test_messages, test_answers):
    prediction = predict_message(msg)
    if prediction[1] != ans:
      passed = False

  if passed:
    print("You passed the challenge. Great job!")
  else:
    print("You haven't passed yet. Keep trying.")

test_predictions()
