# 1. Hidden Markov Models (HMMs)
## 1-1. Text Classifier

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import string
from sklearn.model_selection import train_test_split

In [None]:
input_files = [
  'edgar_allan_poe.txt',
  'robert_frost.txt',
]

In [1]:
!head datasets/edgar_allan_poe.txt

LO! Death hath rear'd himself a throne
In a strange city, all alone,
Far down within the dim west
Where the good, and the bad, and the worst, and the best,
Have gone to their eternal rest.
 
There shrines, and palaces, and towers
Are not like any thing of ours
Oh no! O no! ours never loom
To heaven with that ungodly gloom!


In [2]:
!head datasets/robert_frost.txt

Two roads diverged in a yellow wood,
And sorry I could not travel both
And be one traveler, long I stood
And looked down one as far as I could
To where it bent in the undergrowth; 

Then took the other, as just as fair,
And having perhaps the better claim
Because it was grassy and wanted wear,
Though as for that the passing there


In [None]:
# collect data into lists
input_texts = []
labels = []

for label, f in enumerate(input_files):
  print(f"{f} corresponds to label {label}")

  for line in open(f):
    line = line.rstrip().lower()
    if line:
      # remove punctuation
      line = line.translate(str.maketrans('', '', string.punctuation))

      input_texts.append(line)
      labels.append(label)

In [None]:
train_text, test_text, Ytrain, Ytest = train_test_split(input_texts, labels)

In [None]:
len(Ytrain), len(Ytest)

In [None]:
train_text[:5]

In [None]:
Ytrain[:5]

In [None]:
idx = 1
word2idx = {'<unk>': 0}

In [None]:
# populate word2idx
for text in train_text:
    tokens = text.split()
    for token in tokens:
      if token not in word2idx:
        word2idx[token] = idx
        idx += 1

In [None]:
word2idx

In [None]:
len(word2idx)

In [None]:
# convert data into integer format
train_text_int = []
test_text_int = []

for text in train_text:
  tokens = text.split()
  line_as_int = [word2idx[token] for token in tokens]
  train_text_int.append(line_as_int)

for text in test_text:
  tokens = text.split()
  line_as_int = [word2idx.get(token, 0) for token in tokens]
  test_text_int.append(line_as_int)

In [None]:
train_text_int[100:105]

In [None]:
# initialize A and pi matrices - for both classes
V = len(word2idx)

A0 = np.ones((V, V))
pi0 = np.ones(V)

A1 = np.ones((V, V))
pi1 = np.ones(V)

In [None]:
# compute counts for A and pi
def compute_counts(text_as_int, A, pi):
  for tokens in text_as_int:
    last_idx = None
    for idx in tokens:
      if last_idx is None:
        # it's the first word in a sentence
        pi[idx] += 1
      else:
        # the last word exists, so count a transition
        A[last_idx, idx] += 1

      # update last idx
      last_idx = idx


compute_counts([t for t, y in zip(train_text_int, Ytrain) if y == 0], A0, pi0)
compute_counts([t for t, y in zip(train_text_int, Ytrain) if y == 1], A1, pi1)

In [None]:
# normalize A and pi so they are valid probability matrices
# convince yourself that this is equivalent to the formulas shown before
A0 /= A0.sum(axis=1, keepdims=True)
pi0 /= pi0.sum()

A1 /= A1.sum(axis=1, keepdims=True)
pi1 /= pi1.sum()

In [None]:
# log A and pi since we don't need the actual probs
logA0 = np.log(A0)
logpi0 = np.log(pi0)

logA1 = np.log(A1)
logpi1 = np.log(pi1)

In [None]:
# compute priors
count0 = sum(y == 0 for y in Ytrain)
count1 = sum(y == 1 for y in Ytrain)
total = len(Ytrain)
p0 = count0 / total
p1 = count1 / total
logp0 = np.log(p0)
logp1 = np.log(p1)
p0, p1

In [None]:
# build a classifier
class Classifier:
  def __init__(self, logAs, logpis, logpriors):
    self.logAs = logAs
    self.logpis = logpis
    self.logpriors = logpriors
    self.K = len(logpriors) # number of classes

  def _compute_log_likelihood(self, input_, class_):
    logA = self.logAs[class_]
    logpi = self.logpis[class_]

    last_idx = None
    logprob = 0
    for idx in input_:
      if last_idx is None:
        # it's the first token
        logprob += logpi[idx]
      else:
        logprob += logA[last_idx, idx]
      
      # update last_idx
      last_idx = idx
    
    return logprob
  
  def predict(self, inputs):
    predictions = np.zeros(len(inputs))
    for i, input_ in enumerate(inputs):
      posteriors = [self._compute_log_likelihood(input_, c) + self.logpriors[c] \
             for c in range(self.K)]
      pred = np.argmax(posteriors)
      predictions[i] = pred
    return predictions

In [None]:
# each array must be in order since classes are assumed to index these lists
clf = Classifier([logA0, logA1], [logpi0, logpi1], [logp0, logp1])

Ptrain = clf.predict(train_text_int)
print(f"Train acc: {np.mean(Ptrain == Ytrain)}")

In [None]:
Ptest = clf.predict(test_text_int)
print(f"Test acc: {np.mean(Ptest == Ytest)}")

In [None]:
from sklearn.metrics import confusion_matrix, f1_score

cm = confusion_matrix(Ytrain, Ptrain)
cm

In [None]:
cm_test = confusion_matrix(Ytest, Ptest)
cm_test

In [None]:
f1_score(Ytrain, Ptrain)

In [None]:
f1_score(Ytest, Ptest)

## 1-2. Poetry Generator

In [None]:
import numpy as np
import string

np.random.seed(1234)

In [None]:
initial = {} # start of a phrase
first_order = {} # second word only
second_order = {}

In [None]:
def remove_punctuation(s):
    return s.translate(str.maketrans('','',string.punctuation))

In [None]:
def add2dict(d, k, v):
  if k not in d:
    d[k] = []
  d[k].append(v)

# [cat, cat, dog, dog, dog, dog, dog, mouse, ...]

In [None]:
for line in open('robert_frost.txt'):
  tokens = remove_punctuation(line.rstrip().lower()).split()

  T = len(tokens)
  for i in range(T):
    t = tokens[i]
    if i == 0:
      # measure the distribution of the first word
      initial[t] = initial.get(t, 0.) + 1
    else:
      t_1 = tokens[i-1]
      if i == T - 1:
        # measure probability of ending the line
        add2dict(second_order, (t_1, t), 'END')
      if i == 1:
        # measure distribution of second word
        # given only first word
        add2dict(first_order, t_1, t)
      else:
        t_2 = tokens[i-2]
        add2dict(second_order, (t_2, t_1), t)

In [None]:
# normalize the distributions
initial_total = sum(initial.values())
for t, c in initial.items():
    initial[t] = c / initial_total

In [None]:
# convert [cat, cat, cat, dog, dog, dog, dog, mouse, ...]
# into {cat: 0.5, dog: 0.4, mouse: 0.1}

def list2pdict(ts):
  # turn each list of possibilities into a dictionary of probabilities
  d = {}
  n = len(ts)
  for t in ts:
    d[t] = d.get(t, 0.) + 1
  for t, c in d.items():
    d[t] = c / n
  return d

In [None]:
for t_1, ts in first_order.items():
  # replace list with dictionary of probabilities
  first_order[t_1] = list2pdict(ts)

In [None]:
for k, ts in second_order.items():
  second_order[k] = list2pdict(ts)

In [None]:
def sample_word(d):
  # print "d:", d
  p0 = np.random.random()
  # print "p0:", p0
  cumulative = 0
  for t, p in d.items():
    cumulative += p
    if p0 < cumulative:
      return t
  assert(False) # should never get here

In [None]:
def generate():
  for i in range(4): # generate 4 lines
    sentence = []

    # initial word
    w0 = sample_word(initial)
    sentence.append(w0)

    # sample second word
    w1 = sample_word(first_order[w0])
    sentence.append(w1)

    # second-order transitions until END
    while True:
      w2 = sample_word(second_order[(w0, w1)])
      if w2 == 'END':
        break
      sentence.append(w2)
      w0 = w1
      w1 = w2
    print(' '.join(sentence))

In [None]:
generate()

## 1-3. Article Spinner

In [None]:
import numpy as np
import pandas as pd
import textwrap
import nltk
from nltk import word_tokenize
from nltk.tokenize.treebank import TreebankWordDetokenizer

In [None]:
df = pd.read_csv('datasets/bbc_text_cls.csv')
df.head()

In [None]:
labels = set(df['labels'])
labels

In [None]:
# Pick a label whose data we want to train from
label = 'business'

In [None]:
texts = df[df['labels'] == label]['text']
texts.head()

In [None]:
# collect counts
probs = {} # key: (w(t-1), w(t+1)), value: {w(t): count(w(t))}

for doc in texts:
  lines = doc.split("\n")
  for line in lines:
    tokens = word_tokenize(line)
    for i in range(len(tokens) - 2):
      t_0 = tokens[i]
      t_1 = tokens[i + 1]
      t_2 = tokens[i + 2]
      key = (t_0, t_2)
      if key not in probs:
        probs[key] = {}
      
      # add count for middle token
      if t_1 not in probs[key]:
        probs[key][t_1] = 1
      else:
        probs[key][t_1] += 1

In [None]:
# normalize probabilities
for key, d in probs.items():
  # d should represent a distribution
  total = sum(d.values())
  for k, v in d.items():
    d[k] = v / total

In [None]:
probs

In [None]:
texts.iloc[0].split("\n")

In [None]:
def spin_document(doc):
  # split the document into lines (paragraphs)
  lines = doc.split("\n")
  output = []
  for line in lines:
    if line:
      new_line = spin_line(line)
    else:
      new_line = line
    output.append(new_line)
  return "\n".join(output)

In [None]:
detokenizer = TreebankWordDetokenizer()

In [None]:
texts.iloc[0].split("\n")[2]

In [None]:
detokenizer.detokenize(word_tokenize(texts.iloc[0].split("\n")[2]))

In [None]:
def sample_word(d):
  p0 = np.random.random()
  cumulative = 0
  for t, p in d.items():
    cumulative += p
    if p0 < cumulative:
      return t
  assert(False) # should never get here

In [None]:
def spin_line(line):
  tokens = word_tokenize(line)
  i = 0
  output = [tokens[0]]
  while i < (len(tokens) - 2):
    t_0 = tokens[i]
    t_1 = tokens[i + 1]
    t_2 = tokens[i + 2]
    key = (t_0, t_2)
    p_dist = probs[key]
    if len(p_dist) > 1 and np.random.random() < 0.3:
      # let's replace the middle word
      middle = sample_word(p_dist)
      output.append(t_1)
      output.append("<" + middle + ">")
      output.append(t_2)

      # we won't replace the 3rd token since the middle
      # token was dependent on it
      # instead, skip ahead 2 steps
      i += 2
    else:
      # we won't replace this middle word
      output.append(t_1)
      i += 1
  # append the final token - only if there was no replacement
  if i == len(tokens) - 2:
    output.append(tokens[-1])
  return detokenizer.detokenize(output)

In [None]:
np.random.seed(1234)

In [None]:
i = np.random.choice(texts.shape[0])
doc = texts.iloc[i]
new_doc = spin_document(doc)

In [None]:
print(textwrap.fill(
    new_doc, replace_whitespace=False, fix_sentence_endings=True))

## 1-4. Cipher Detection

In [None]:
# create substitution cipher

# one will act as the key, other as the value
letters1 = list(string.ascii_lowercase)
letters2 = list(string.ascii_lowercase)

true_mapping = {}

# shuffle second set of letters
random.shuffle(letters2)

# populate map
for k, v in zip(letters1, letters2):
  true_mapping[k] = v

In [None]:
# the language model

# initialize Markov matrix
M = np.ones((26, 26))

# initial state distribution
pi = np.zeros(26)

# a function to update the Markov matrix
def update_transition(ch1, ch2):
  # ord('a') = 97, ord('b') = 98, ...
  i = ord(ch1) - 97
  j = ord(ch2) - 97
  M[i,j] += 1

# a function to update the initial state distribution
def update_pi(ch):
  i = ord(ch) - 97
  pi[i] += 1

# get the log-probability of a word / token
def get_word_prob(word):
  # print("word:", word)
  i = ord(word[0]) - 97
  logp = np.log(pi[i])

  for ch in word[1:]:
    j = ord(ch) - 97
    logp += np.log(M[i, j]) # update prob
    i = j # update j

  return logp

# get the probability of a sequence of words
def get_sequence_prob(words):
  # if input is a string, split into an array of tokens
  if type(words) == str:
    words = words.split()

  logp = 0
  for word in words:
    logp += get_word_prob(word)
  return logp

In [None]:
# create a markov model based on an English dataset
# is an edit of https://www.gutenberg.org/ebooks/2701
# (I removed the front and back matter)

# download the file
if not os.path.exists('moby_dick.txt'):
  print("Downloading moby dick...")
  r = requests.get('https://lazyprogrammer.me/course_files/moby_dick.txt')
  with open('moby_dick.txt', 'w') as f:
    f.write(r.content.decode())

In [None]:
# for replacing non-alpha characters
regex = re.compile('[^a-zA-Z]')

# load in words
for line in open('moby_dick.txt'):
  line = line.rstrip()

  # there are blank lines in the file
  if line:
    line = regex.sub(' ', line) # replace all non-alpha characters with space

    # split the tokens in the line and lowercase
    tokens = line.lower().split()

    for token in tokens:
      # update the model

      # first letter
      ch0 = token[0]
      update_pi(ch0)

      # other letters
      for ch1 in token[1:]:
        update_transition(ch0, ch1)
        ch0 = ch1

# normalize the probabilities
pi /= pi.sum()
M /= M.sum(axis=1, keepdims=True)

In [None]:
# encode a message

# this is a random excerpt from Project Gutenberg's
# The Adventures of Sherlock Holmes, by Arthur Conan Doyle
# https://www.gutenberg.org/ebooks/1661

original_message = '''I then lounged down the street and found,
as I expected, that there was a mews in a lane which runs down
by one wall of the garden. I lent the ostlers a hand in rubbing
down their horses, and received in exchange twopence, a glass of
half-and-half, two fills of shag tobacco, and as much information
as I could desire about Miss Adler, to say nothing of half a dozen
other people in the neighbourhood in whom I was not in the least
interested, but whose biographies I was compelled to listen to.

In [None]:
# a function to encode a message
def encode_message(msg):
  # downcase
  msg = msg.lower()

  # replace non-alpha characters
  msg = regex.sub(' ', msg)

  # make the encoded message
  coded_msg = []
  for ch in msg:
    coded_ch = ch # could just be a space
    if ch in true_mapping:
      coded_ch = true_mapping[ch]
    coded_msg.append(coded_ch)

  return ''.join(coded_msg)


encoded_message = encode_message(original_message)


# a function to decode a message
def decode_message(msg, word_map):
  decoded_msg = []
  for ch in msg:
    decoded_ch = ch # could just be a space
    if ch in word_map:
      decoded_ch = word_map[ch]
    decoded_msg.append(decoded_ch)

  return ''.join(decoded_msg)

In [None]:
# run an evolutionary algorithm to decode the message

# this is our initialization point
dna_pool = []
for _ in range(20):
  dna = list(string.ascii_lowercase)
  random.shuffle(dna)
  dna_pool.append(dna)

In [None]:
def evolve_offspring(dna_pool, n_children):
  # make n_children per offspring
  offspring = []

  for dna in dna_pool:
    for _ in range(n_children):
      copy = dna.copy()
      j = np.random.randint(len(copy))
      k = np.random.randint(len(copy))

      # switch
      tmp = copy[j]
      copy[j] = copy[k]
      copy[k] = tmp
      offspring.append(copy)

  return offspring + dna_pool

In [None]:
num_iters = 1000
scores = np.zeros(num_iters)
best_dna = None
best_map = None
best_score = float('-inf')
for i in range(num_iters):
  if i > 0:
    # get offspring from the current dna pool
    dna_pool = evolve_offspring(dna_pool, 3)

  # calculate score for each dna
  dna2score = {}
  for dna in dna_pool:
    # populate map
    current_map = {}
    for k, v in zip(letters1, dna):
      current_map[k] = v

    decoded_message = decode_message(encoded_message, current_map)
    score = get_sequence_prob(decoded_message)

    # store it
    # needs to be a string to be a dict key
    dna2score[''.join(dna)] = score

    # record the best so far
    if score > best_score:
      best_dna = dna
      best_map = current_map
      best_score = score

  # average score for this generation
  scores[i] = np.mean(list(dna2score.values()))

  # keep the best 5 dna
  # also turn them back into list of single chars
  sorted_dna = sorted(dna2score.items(), key=lambda x: x[1], reverse=True)
  dna_pool = [list(k) for k, v in sorted_dna[:5]]

  if i % 200 == 0:
    print("iter:", i, "score:", scores[i], "best so far:", best_score)

In [None]:
# use best score
decoded_message = decode_message(encoded_message, best_map)

print("LL of decoded message:", get_sequence_prob(decoded_message))
print("LL of true message:", get_sequence_prob(regex.sub(' ', original_message.lower())))


# which letters are wrong?
for true, v in true_mapping.items():
  pred = best_map[v]
  if true != pred:
    print("true: %s, pred: %s" % (true, pred))

In [None]:
# print the final decoded message
print("Decoded message:\n", textwrap.fill(decoded_message))

print("\nTrue message:\n", original_message)

In [None]:
plt.plot(scores)
plt.show()

# 2. Naive Bayes
## 2-1. Spam Detection

In [None]:
import numpy as np
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, f1_score, confusion_matrix
from sklearn.naive_bayes import MultinomialNB
from wordcloud import WordCloud

In [None]:
# https://www.kaggle.com/uciml/sms-spam-collection-dataset
!wget https://lazyprogrammer.me/course_files/spam.csv

In [None]:
# file contains some invalid chars
# depending on which version of pandas you have
# an error may be thrown
df = pd.read_csv('spam.csv', encoding='ISO-8859-1')
df.head()

In [None]:
# drop unnecessary columns
df = df.drop(["Unnamed: 2", "Unnamed: 3", "Unnamed: 4"], axis=1)
df.head()

In [None]:
# rename columns to something better
df.columns = ['labels', 'data']
df.head()

In [None]:
df['labels'].hist()

In [None]:
# create binary labels
df['b_labels'] = df['labels'].map({'ham': 0, 'spam': 1})
Y = df['b_labels'].to_numpy()

In [None]:
# split up the data
df_train, df_test, Ytrain, Ytest = train_test_split(
    df['data'], Y, test_size=0.33)

In [None]:
# try multiple ways of calculating features
# featurizer = TfidfVectorizer(decode_error='ignore')
# Xtrain = featurizer.fit_transform(df_train)
# Xtest = featurizer.transform(df_test)

featurizer = CountVectorizer(decode_error='ignore')
Xtrain = featurizer.fit_transform(df_train)
Xtest = featurizer.transform(df_test)

In [None]:
Xtrain

In [None]:
# create the model, train it, print scores
model = MultinomialNB()
model.fit(Xtrain, Ytrain)
print("train acc:", model.score(Xtrain, Ytrain))
print("test acc:", model.score(Xtest, Ytest))

In [None]:
Ptrain = model.predict(Xtrain)
Ptest = model.predict(Xtest)
print("train F1:", f1_score(Ytrain, Ptrain))
print("test F1:", f1_score(Ytest, Ptest))

In [None]:
Prob_train = model.predict_proba(Xtrain)[:,1]
Prob_test = model.predict_proba(Xtest)[:,1]
print("train AUC:", roc_auc_score(Ytrain, Prob_train))
print("test AUC:", roc_auc_score(Ytest, Prob_test))

In [None]:
cm = confusion_matrix(Ytrain, Ptrain)
cm

In [None]:
# Scikit-Learn is transitioning to V1 but it's not available on Colab
# The changes modify how confusion matrices are plotted
def plot_cm(cm):
  classes = ['ham', 'spam']
  df_cm = pd.DataFrame(cm, index=classes, columns=classes)
  ax = sn.heatmap(df_cm, annot=True, fmt='g')
  ax.set_xlabel("Predicted")
  ax.set_ylabel("Target")

plot_cm(cm)

In [None]:
cm_test = confusion_matrix(Ytest, Ptest)
plot_cm(cm_test)

In [None]:
# visualize the data
def visualize(label):
  words = ''
  for msg in df[df['labels'] == label]['data']:
    msg = msg.lower()
    words += msg + ' '
  wordcloud = WordCloud(width=600, height=400).generate(words)
  plt.imshow(wordcloud)
  plt.axis('off')
  plt.show()

In [None]:
visualize('spam')

In [None]:
visualize('ham')

In [None]:
# see what we're getting wrong
X = featurizer.transform(df['data'])
df['predictions'] = model.predict(X)

In [None]:
# things that should be spam
sneaky_spam = df[(df['predictions'] == 0) & (df['b_labels'] == 1)]['data']
for msg in sneaky_spam:
  print(msg)

In [None]:
# things that should not be spam
# perhaps some are mislabeled?
not_actually_spam = df[(df['predictions'] == 1) & (df['b_labels'] == 0)]['data']
for msg in not_actually_spam:
  print(msg)

# 3. Logistic Regression
## 3-1. Sentiment Analysis

# 4. PCA & SVD
## 4-1. Latent Semantic Indexing

# 5. Latent Dirichlet Allocation
## 5-1. Topic Modeling

# 6. Text Classification