<a href="https://colab.research.google.com/github/prof79/RPKerasNLPDemo/blob/main/RPKerasNLPDemo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Learn Text Classification With Python and Keras
### by Douglas Starnes (Real Python)

URL: https://realpython.com/courses/text-classification-with-keras/

Data Set Source: https://archive.ics.uci.edu/ml/datasets/Sentiment+Labelled+Sentences#

## Lesson 2

In [None]:
DATA_SET_URL = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00331/sentiment%20labelled%20sentences.zip'

In [None]:
import requests
import zipfile

def get_data(url, fn):
  response = requests.get(url)
  f = open(fn, 'wb')
  f.write(response.content)
  f.close()

def extract_data(fn):
  with zipfile.ZipFile(fn, 'r') as zf:
    zf.extractall()

In [None]:
DATA_SET_FILE_NAME = 'dataset.zip'

get_data(DATA_SET_URL, DATA_SET_FILE_NAME)
extract_data(DATA_SET_FILE_NAME)

In [None]:
import os

def rename_data_folder(fn, new_name):
  if os.path.exists(fn):
    os.rename(fn, new_name)


In [None]:
DATA_SET_FOLDER = 'sentiment labelled sentences'

rename_data_folder(DATA_SET_FOLDER, 'data')

In [None]:
import pandas as pd

filepath_dict = {
    'yelp': 'data/yelp_labelled.txt',
    'amazon': 'data/amazon_cells_labelled.txt',
    'imdb': 'data/imdb_labelled.txt'
}

df_list = []

for source, filepath in filepath_dict.items():
  df = pd.read_csv(filepath, names=['sentence', 'label'], sep='\t')
  df['source'] = source
  df_list.append(df)

df = pd.concat(df_list)

df.head()

## Lesson 3

In [None]:
sentences = ['John likes ice cream', 'John hates chocolate']

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

vectorizer = CountVectorizer(min_df=0, lowercase=False)
vectorizer.fit(sentences)
vectorizer.vocabulary_

In [None]:
vectorizer.transform(sentences).toarray()

## Lesson 4

In [None]:
df_yelp = df[df['source'] == 'yelp']

sentences = df_yelp['sentence'].values
labels = df_yelp['label'].values

In [None]:
from sklearn.model_selection import train_test_split

sentences_train, sentences_test, y_train, y_test = train_test_split(sentences, labels, test_size=0.25, random_state=1000)

In [None]:
vectorizer = CountVectorizer()
vectorizer.fit(sentences_train)

X_train = vectorizer.transform(sentences_train)
X_test = vectorizer.transform(sentences_test)

X_train

In [None]:
from sklearn.linear_model import LogisticRegression

classifier = LogisticRegression()
classifier.fit(X_train, y_train)
score = classifier.score(X_test, y_test)

print(score)

In [None]:
for source in df['source'].unique():
  df_source = df[df['source'] == source]
  sentences = df_source['sentence'].values
  labels = df_source['label'].values

  sentences_train, sentences_test, y_train, y_test = train_test_split(sentences, labels, test_size=0.25, random_state=1000)

  vectorizer = CountVectorizer()
  vectorizer.fit(sentences_train)
  X_train = vectorizer.transform(sentences_train)
  X_test = vectorizer.transform(sentences_test)

  classifier = LogisticRegression()
  classifier.fit(X_train, y_train)
  score = classifier.score(X_test, y_test)

  print(f'{source}: {score:.4f}')

## Lesson 6

In [None]:
df_yelp = df[df['source'] == 'yelp']

sentences = df_yelp['sentence'].values
labels = df_yelp['label'].values

sentences_train, sentences_test, y_train, y_test = train_test_split(sentences, labels, test_size=0.25, random_state=1000)

vectorizer = CountVectorizer()
vectorizer.fit(sentences_train)

X_train = vectorizer.transform(sentences_train)
X_test = vectorizer.transform(sentences_test)

X_train.shape

In [None]:
import tensorflow.keras.models as models
import tensorflow.keras.layers as layers

In [None]:
model = models.Sequential()
model.add(layers.Dense(10, input_dim=X_train.shape[1], activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))

In [None]:
model.summary()

In [None]:
model.compile(
    loss='binary_crossentropy',
    optimizer='adam',
    metrics=['accuracy']
)

In [None]:
history = model.fit(
    X_train, 
    y_train, 
    epochs=100, 
    validation_data=(X_test, y_test)
)

In [None]:
_, train_accuracy = model.evaluate(X_train, y_train)
_, test_accuracy = model.evaluate(X_test, y_test)
print(f'Training accuracy: {train_accuracy:.4f}')
print(f'Testing accuracy: {test_accuracy:.4f}')

In [None]:
import matplotlib.pyplot as plt
plt.style.use('ggplot')

def plot_history(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    x = range(1, len(acc) + 1)

    plt.figure(figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(x, acc, 'b', label='Training acc')
    plt.plot(x, val_acc, 'r', label='Validation acc')
    plt.title('Training and validation accuracy')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(x, loss, 'b', label='Training loss')
    plt.plot(x, val_loss, 'r', label='Validation loss')
    plt.title('Training and validation loss')
    plt.legend()

plot_history(history)

In [None]:
type(history.history)

In [None]:
 len(df)

## Lesson 7

In [None]:
cities = ['London', 'Berlin', 'Berlin', 'New York', 'London']

from sklearn.preprocessing import LabelEncoder

encoder = LabelEncoder()
city_labels = encoder.fit_transform(cities)

city_labels

In [None]:
from sklearn.preprocessing import OneHotEncoder

one_hot_encoder = OneHotEncoder(sparse=False)
city_labels = city_labels.reshape((5, 1))
one_hot_encoder.fit_transform(city_labels)

In [None]:
from keras.preprocessing.text import Tokenizer

tokenizer = Tokenizer(num_words=5000)
tokenizer.fit_on_texts(sentences_train)

X_train = tokenizer.texts_to_sequences(sentences_train)
X_test = tokenizer.texts_to_sequences(sentences_test)

print(sentences_train[2])
print(X_train[2])

In [None]:
from tensorflow.keras.utils import pad_sequences

X_train = pad_sequences(X_train, padding='post', maxlen=100)
X_test = pad_sequences(X_test, padding='post', maxlen=100)

print(sentences_train[2])
print(X_train[2])

In [None]:
model = models.Sequential()
model.add(
    layers.Embedding(
        input_dim=len(tokenizer.word_index) + 1, 
        output_dim=50, 
        input_length=100
    )
)
model.add(layers.Flatten())
model.add(layers.Dense(10, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(
    loss='binary_crossentropy', 
    optimizer='adam', 
    metrics=['accuracy']
)

In [None]:
history = model.fit(X_train, y_train, epochs=20, validation_data=(X_test, y_test), batch_size=10)

In [None]:
_, train_accuracy = model.evaluate(X_train, y_train)
print(f'Training Accuracy: {train_accuracy:.4f}')
_, test_accuracy = model.evaluate(X_test, y_test)
print(f'Testing Accuracy: {test_accuracy:.4f}')
plot_history(history)

In [None]:
model = models.Sequential()
model.add(
    layers.Embedding(
        input_dim=len(tokenizer.word_index) + 1, 
        output_dim=50, 
        input_length=100
    )
)
model.add(layers.GlobalMaxPool1D())
model.add(layers.Flatten())
model.add(layers.Dense(10, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(
    loss='binary_crossentropy', 
    optimizer='adam', 
    metrics=['accuracy']
)

history = model.fit(
    X_train, 
    y_train, 
    epochs=20, 
    validation_data=(X_test, y_test), 
    batch_size=10
)

_, train_accuracy = model.evaluate(X_train, y_train)
print(f'Training Accuracy: {train_accuracy:.4f}')
_, test_accuracy = model.evaluate(X_test, y_test)
print(f'Testing Accuracy: {test_accuracy:.4f}')
plot_history(history)

## Lesson 8

In [None]:
GLOVE_URL = 'http://nlp.stanford.edu/data/glove.6B.zip'

GLOVE_FILE_NAME = 'glove.zip'

get_data(GLOVE_URL, GLOVE_FILE_NAME)
extract_data(GLOVE_FILE_NAME)

In [None]:
import numpy as np 

def create_embedding_matrix(filepath, word_index, embedding_dim):
  vocab_size = len(word_index) + 1
  embedding_matrix = np.zeros((vocab_size, embedding_dim))

  with open(filepath) as f:
    for line in f:
      word, *vector = line.split()
      if word in word_index:
        idx = word_index[word]
        embedding_matrix[idx] = np.array(vector, dtype=np.float32)[:embedding_dim]

  return embedding_matrix 

In [None]:
embedding_matrix = create_embedding_matrix('glove.6B.50d.txt', tokenizer.word_index, 50)

In [None]:
embedding_matrix.shape

In [None]:
nonzero_elements = np.count_nonzero(np.count_nonzero(embedding_matrix, axis=1))
nonzero_elements / (len(tokenizer.word_index) + 1)

In [None]:
model = models.Sequential()
model.add(layers.Embedding(
    input_dim=len(tokenizer.word_index) + 1, 
    output_dim=50, 
    input_length=100,
    weights=[embedding_matrix],
    trainable=False))
model.add(layers.GlobalMaxPool1D())
model.add(layers.Dense(10, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(
    loss='binary_crossentropy', 
    optimizer='adam', 
    metrics=['accuracy']
)

In [None]:
history = model.fit(
    X_train, 
    y_train, 
    epochs=50, 
    validation_data=(X_test, y_test), 
    batch_size=10
)

_, train_accuracy = model.evaluate(X_train, y_train)
print(f'Training Accuracy: {train_accuracy:.4f}')
_, test_accuracy = model.evaluate(X_test, y_test)
print(f'Testing Accuracy: {test_accuracy:.4f}')
plot_history(history)

In [None]:
model = models.Sequential()
model.add(layers.Embedding(
    input_dim=len(tokenizer.word_index) + 1, 
    output_dim=50, 
    input_length=100,
    weights=[embedding_matrix],
    trainable=True))
model.add(layers.GlobalMaxPool1D())
model.add(layers.Dense(10, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(
    loss='binary_crossentropy', 
    optimizer='adam', 
    metrics=['accuracy']
)

In [None]:
history = model.fit(
    X_train, 
    y_train, 
    epochs=50, 
    validation_data=(X_test, y_test), 
    batch_size=10
)

_, train_accuracy = model.evaluate(X_train, y_train)
print(f'Training Accuracy: {train_accuracy:.4f}')
_, test_accuracy = model.evaluate(X_test, y_test)
print(f'Testing Accuracy: {test_accuracy:.4f}')
plot_history(history)

## Lesson 9

In [None]:
model = models.Sequential()
model.add(layers.Embedding(
    input_dim=len(tokenizer.word_index) + 1, 
    output_dim=100, 
    input_length=100))
model.add(layers.Conv1D(128, 5, activation='relu'))
model.add(layers.GlobalMaxPool1D())
model.add(layers.Dense(10, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(
    loss='binary_crossentropy', 
    optimizer='adam', 
    metrics=['accuracy']
)

In [None]:
model.summary()

In [None]:
history = model.fit(
    X_train, 
    y_train, 
    epochs=10, 
    validation_data=(X_test, y_test), 
    batch_size=10
)

_, train_accuracy = model.evaluate(X_train, y_train)
print(f'Training Accuracy: {train_accuracy:.4f}')
_, test_accuracy = model.evaluate(X_test, y_test)
print(f'Testing Accuracy: {test_accuracy:.4f}')
plot_history(history)

In [None]:
def create_model(num_filters, kernel_size, vocab_size, embedding_dim, maxlen):
  model = models.Sequential()
  model.add(layers.Embedding(vocab_size, embedding_dim, input_length=maxlen))
  model.add(layers.Conv1D(num_filters, kernel_size, activation='relu'))
  model.add(layers.GlobalMaxPooling1D())
  model.add(layers.Dense(10, activation='relu'))
  model.add(layers.Dense(1, activation='sigmoid'))
  model.compile(
      optimizer='adam', 
      loss='binary_crossentropy', 
      metrics=['accuracy']
  )
  return model

In [None]:
param_grid = dict(
    num_filters=[32, 64, 128],
    kernel_size=[3, 5, 7],
    vocab_size=[5000],
    embedding_dim=[50],
    maxlen=[100]
)

In [None]:
from keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import RandomizedSearchCV

# Main settings
epochs = 20
embedding_dim = 50
maxlen = 100
output_file = 'data/output.txt'

# Run grid search for each source (yelp, amazon, imdb)
for source, frame in df.groupby('source'):
    print('Running grid search for data set :', source)
    sentences = df['sentence'].values
    y = df['label'].values

    # Train-test split
    sentences_train, sentences_test, y_train, y_test = train_test_split(
        sentences, y, test_size=0.25, random_state=1000)

    # Tokenize words
    tokenizer = Tokenizer(num_words=5000)
    tokenizer.fit_on_texts(sentences_train)
    X_train = tokenizer.texts_to_sequences(sentences_train)
    X_test = tokenizer.texts_to_sequences(sentences_test)

    # Adding 1 because of reserved 0 index
    vocab_size = len(tokenizer.word_index) + 1

    # Pad sequences with zeros
    X_train = pad_sequences(X_train, padding='post', maxlen=maxlen)
    X_test = pad_sequences(X_test, padding='post', maxlen=maxlen)

    # Parameter grid for grid search
    param_grid = dict(num_filters=[32, 64, 128],
                      kernel_size=[3, 5, 7],
                      vocab_size=[vocab_size],
                      embedding_dim=[embedding_dim],
                      maxlen=[maxlen])
    model = KerasClassifier(build_fn=create_model,
                            epochs=epochs, batch_size=10,
                            verbose=False)
    grid = RandomizedSearchCV(estimator=model, param_distributions=param_grid,
                              cv=4, verbose=1, n_iter=5)
    grid_result = grid.fit(X_train, y_train)

    # Evaluate testing set
    test_accuracy = grid.score(X_test, y_test)

    # Save and evaluate results
    prompt = input(f'finished {source}; write to file and proceed? [y/n]')
    if prompt.lower() not in {'y', 'true', 'yes'}:
        break
    with open(output_file, 'a') as f:
        s = ('Running {} data set\nBest Accuracy : '
             '{:.4f}\n{}\nTest Accuracy : {:.4f}\n\n')
        output_string = s.format(
            source,
            grid_result.best_score_,
            grid_result.best_params_,
            test_accuracy)
        print(output_string)
        f.write(output_string)