# Using PaaS to Accelerate Data Science - DC/OS Data Science Engine

This is part of a series of posts on using PaaS to accelerate enterprise data science teams.

In [None]:
import gc
import json
import os
import re

import boto3
import numpy as np
import pandas as pd
import pyarrow
from pyarrow.parquet import ParquetDataset
import s3fs
import tensorflow as tf

# Fixes display issue
pd.set_option('display.html.border', 0)

# Set the bucket that holds our data
BUCKET = 'stackoverflow-events'

### Always use a random seed for reproducible results

In [None]:
np.random.seed(seed=1337)

### Test GPU Support for Tensorflow/Keras

In [None]:
gpu_avail = tf.test.is_gpu_available(
    cuda_only=False,
    min_cuda_compute_capability=None
)
print(f'1 or more GPUs is available: {gpu_avail}')

from tensorflow.python.client import device_lib
local_devices = device_lib.list_local_devices()
gpu = local_devices[3]
print(f"{gpu.name} is a {gpu.device_type} with {gpu.memory_limit / 1024 / 1024 / 1024:.2f}GB RAM")

### Load the Stackoverflow Questions

For the moment, `pandas.read_parquet` doesn't support loading directories on S3.
See: [pandas/issues/28490](https://github.com/pandas-dev/pandas/issues/28490)

Because the questions are loading from S3, this can take a few minutes, especially outside of AWS. Inside AWS it shouldn't take very long.

In [None]:
# Load the Stack Overflow questions right from S3
s3_parquet_path = f's3://{BUCKET}/08-05-2019/Questions.Stratified.Final.2000.parquet'
s3_fs = s3fs.S3FileSystem()

# Use pyarrow.parquet.ParquetDataset and convert to pandas.DataFrame
posts_df = ParquetDataset(
    s3_parquet_path,
    filesystem=s3_fs,
).read().to_pandas()

# Trim the data in half for a p3.2xlarge
posts_df = posts_df[:500000]

posts_df.head(3)

In [None]:
print(
    '{:,} Stackoverflow questions with a tag having at least 2,000 occurrences'.format(
        len(posts_df.index)
    )
)

### Load the Precomputed Indexes for Converting Between Tag Indexes and Tags

We use these to display the actual labels predicted at the end of this tutorial.

In [None]:
# Get the tag indexes
s3_client = boto3.resource('s3')

def json_from_s3(bucket, key):
    """Given a bucket and key for a JSON object, return the parsed object"""
    obj = s3_client.Object(bucket, key)
    obj.get()['Body'].read().decode('utf-8')
    json_obj = json.loads(obj.get()['Body'].read().decode('utf-8'))
    return json_obj


tag_index = json_from_s3(BUCKET, '08-05-2019/tag_index.2000.json')
index_tag = json_from_s3(BUCKET, '08-05-2019/index_tag.2000.json')

list(tag_index.items())[0:5], list(index_tag.items())[0:5]

In [None]:
# Sanity check the different files
assert( len(tag_index.keys()) == len(index_tag.keys()) )

## Trim the Data into a Multiple of Batch Sizes

This is a requirement for Tensorflow/Keras to split work among multiple GPUs. We also join the previously tokenized text into a single string to use `tf.keras.preprocessing.text.Tokenizer`.

In [None]:
import math

BATCH_SIZE = 64
MAX_LEN = 200
TOKEN_COUNT = 10000
EMBED_SIZE = 50
TEST_SPLIT = 0.3

# Convert label columns to numpy array
labels = posts_df[list(posts_df.columns)[1:]].to_numpy()

# Training_count must be a multiple of the BATCH_SIZE times the MAX_LEN for the Elmo embedding layer
highest_factor = math.floor(len(posts_df.index) / (BATCH_SIZE * MAX_LEN))
training_count = highest_factor * BATCH_SIZE * MAX_LEN
print(f'Highest Factor: {highest_factor:,} Training Count: {training_count:,}')

# Join the previously tokenized data for tf.keras.preprocessing.text.Tokenizer to work with
documents = []
for body in posts_df[0:training_count]['_Body'].values.tolist():
    words = body.tolist()
    documents.append(' '.join(words))

labels = labels[0:training_count]

# Conserve RAM
del posts_df
gc.collect()

# Verify that lengths for x and y match
assert( len(documents) == training_count == labels.shape[0] )

### Trim/Pad the Questions to 200 Words

The data has already been truncated to 200 words per post but the tokenization using the top 10K words reduces this to below 200 in some documents. If any documents vary from 200 words, the data won't convert properly into a numpy matrix below.

Note that the string `__PAD__` has been used previously to pad the documents, so we reuse it here.

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

tokenizer = Tokenizer(
    num_words=TOKEN_COUNT,
    oov_token='__PAD__'
)
tokenizer.fit_on_texts(documents)
sequences = tokenizer.texts_to_sequences(documents)

padded_sequences = pad_sequences(
    sequences,
    maxlen=MAX_LEN,
    dtype='int32',
    padding='post',
    truncating='post',
    value=1
)

# Conserve RAM
del documents
del sequences
gc.collect()

assert( min([len(x) for x in padded_sequences]) == MAX_LEN == max([len(x) for x in padded_sequences]) )

# Verify that the padded sequences is a single two dimensional array
padded_sequences.shape

## Split Data into Test/Train Datasets

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    padded_sequences,
    labels,
    test_size=TEST_SPLIT,
    random_state=1337
)

# Conserve RAM
del padded_sequences
del labels
gc.collect()

assert(X_train.shape[0] == y_train.shape[0])
assert(X_train.shape[1] == MAX_LEN)
assert(X_test.shape[0] == y_test.shape[0]) 
assert(X_test.shape[1] == MAX_LEN)

## Use Class Weights to Balance Uneven Label Counts

Although there has already been filtering and up-sampling of the data to restrict it to a sample of questions with at least one tag that occurs more than 2,000 times, there are still uneven ratios between common and uncommon labels. Without class weights, the most common label will be much more likely to be predicted than the least common. Class weights will make the loss function consider uncommon classes more than frequent ones.

In [None]:
train_weight_vec = list(np.max(np.sum(y_train, axis=0))/np.sum(y_train, axis=0))
train_class_weights = {i: train_weight_vec[i] for i in range(y_train.shape[1])}

sorted(list(train_class_weights.items()), key=lambda x: x[1])[0:10]

## Train a CNN Model to Classify Questions to their Corresponding Tags

Now we’re ready to train a model to classify/label questions with tag categories. The model is based on [Kim-CNN](https://arxiv.org/abs/1408.5882), a commonly used convolutional neural network for sentence and document classification. We use the functional API and we’ve heavily parametrized the code so as to facilitate experimentation. 

In Kim-CNN, we start by encoding the sequences using an *Embedding*, followed by a *Dropout* layer to reduce overfitting. Next we split the graph into multiple *Conv1D* layers with different widths, each followed by *MaxPool1D*. These are joined by concatenation and are intended to characterize patterns of different size sequence lengths in the documents. There follows another *Conv1D*/*GlobalMaxPool1D* layer to summarize the most important of these patterns. This is followed by flattening into a *Dense* layer and then on to the final *sigmoid* output layer. Otherwise we use *selu* throughout.

In [None]:
from tensorflow.keras.initializers import RandomUniform
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras.layers import (
    Dense, Activation, Embedding, Flatten, MaxPool1D, GlobalMaxPool1D, Dropout, Conv1D, Input, concatenate
)
from tensorflow.keras.losses import binary_crossentropy
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam


FILTER_LENGTH       = 300
FILTER_COUNT        = 128
FILTER_SIZES        = [3, 4, 5]
EPOCHS              = 8
ACTIVATION          = 'selu'
CONV_PADDING        = 'same'
EMBED_SIZE          = 50
EMBED_DROPOUT_RATIO = 0.1
CONV_DROPOUT_RATIO  = 0.1
LOSS                = 'binary_crossentropy'
OPTIMIZER           = 'adam'

Next we compile and fit our model. We use a variety of metrics, because no one metric summarizes model performance, and we need to drill down into the true and false positives and negatives. We also use the *ReduceLROnPlateau*, *EarlyStopping* and *ModelCheckpoint* callbacks to improve performance once we hit a plateau, then to stop early, and to persist only the very best model in terms of the validation categorical accuracy. 

Categorical accuracy is the best fit for gauging our model’s performance because it gives points for each row separately for each class we’re classifying. This means that if we miss one, but get the others right, this is a great result. With binary accuracy, the entire row is scored as incorrect.

We give the fit method the class weights we computed earlier, which really helps model performance.

In [None]:
padded_input = Input(
    shape=(X_train.shape[1],), 
    dtype='int32'
)

# Create an embedding with RandomUniform initialization
emb = Embedding(
    TOKEN_COUNT, 
    EMBED_SIZE, 
    input_length=X_train.shape[1],
    embeddings_initializer=RandomUniform()
)(padded_input)
drp = Dropout(EMBED_DROPOUT_RATIO)(emb)

# Create convlution/max pools using different kernel sizes to summarize features of different 
# word countss in the document
convs = []
for filter_size in FILTER_SIZES:
    f_conv = Conv1D(
        filters=FILTER_COUNT,
        kernel_size=filter_size,
        padding=CONV_PADDING,
        activation=ACTIVATION
    )(drp)
    f_pool = MaxPool1D()(f_conv)
    convs.append(f_pool)

l_merge = concatenate(convs, axis=1)
l_conv = Conv1D(
    128,
    5,
    activation=ACTIVATION
)(l_merge)
l_pool = GlobalMaxPool1D()(l_conv)
l_flat = Flatten()(l_pool)
l_drop = Dropout(CONV_DROPOUT_RATIO)(l_flat)
l_dense = Dense(
    128,
    activation=ACTIVATION
)(l_drop)
out_dense = Dense(
    y_train.shape[1],
    activation='sigmoid'
)(l_dense)

model = Model(inputs=padded_input, outputs=out_dense)

model.compile(
    optimizer=OPTIMIZER,
    loss=LOSS,
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(),
        tf.keras.metrics.Precision(),
        tf.keras.metrics.Recall(),
        tf.keras.metrics.Accuracy(),
        tf.keras.metrics.TruePositives(),
        tf.keras.metrics.FalsePositives(),
        tf.keras.metrics.TrueNegatives(),
        tf.keras.metrics.FalseNegatives(),
    ]
)
model.summary()

callbacks = [
    ReduceLROnPlateau(
        monitor='val_categorical_accuracy',
        factor=0.1,
        patience=1,
    ), 
    EarlyStopping(
        monitor='val_categorical_accuracy',
        patience=2,
    ), 
    ModelCheckpoint(
        filepath='kim_cnn_tagger.weights.hdf5',
        monitor='val_categorical_accuracy',
        save_best_only=True
    ),
]

history = model.fit(X_train, y_train,
                    class_weight=train_class_weights,
                    epochs=EPOCHS,
                    batch_size=BATCH_SIZE,
                    validation_data=(X_test, y_test),
                    callbacks=callbacks)

# Conserve RAM
del X_train, y_train

### Load the Best Model from Training Epochs

Because we used `ModelCheckpoint(save_only_best=True)`, the best epoch in terms of `CategoricalAccuracy` is what was saved. We want to use that instead of the last epoch's model, which is what is stored in `model` above. So we load the file before evaluating our model.

In [None]:
model = tf.keras.models.load_model('kim_cnn_tagger.weights.hdf5')
metrics = model.evaluate(X_test, y_test)

### Parse and Print Final Metrics

Metrics include names like *precision_66* which aren't consistent between runs. We fix these to cleanup our report on training the model. We also add an f1 score.

In [None]:
def fix_metric_name(name):
    """Remove the trailing _NN, ex. precision_86"""
    if name[-1].isdigit():
        repeat_name = '_'.join(name.split('_')[:-1])
    else:
        repeat_name = name
    return repeat_name

def fix_value(val):
    """Convert from numpy to float"""
    return val.item() if isinstance(val, np.float32) else val

def fix_metric(name, val):
    repeat_name = fix_metric_name(name)
    py_val = fix_value(val)
    return repeat_name, py_val

log = {}
for name, val in zip(model.metrics_names, metrics):
    repeat_name, py_val = fix_metric(name, val)
    log[repeat_name] = py_val

log.update({'f1': (log['precision'] * log['recall']) / (log['precision'] + log['recall'])})

pd.DataFrame([log])

### Plot the Epoch Accuracy

We want to know the performance at each epoch so that we don't train needlessly large numbers of epochs. 

In [None]:
%matplotlib inline

new_history = {}
for key, metrics in history.history.items():
    new_history[fix_metric_name(key)] = metrics

import matplotlib.pyplot as plt

viz_keys = ['val_categorical_accuracy', 'val_precision', 'val_recall']
# summarize history for accuracy
for key in viz_keys:
    plt.plot(new_history[key])
plt.title('model accuracy')
plt.ylabel('metric')
plt.xlabel('epoch')
plt.legend(viz_keys, loc='upper left')
plt.show()

# summarize history for loss
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

### Check the Actual Prediction Outputs

It is not enough to know theoretical performance. We need to see the actual output of the tagger at different confidence thresholds.

In [None]:
TEST_COUNT = 1000

X_test_text = tokenizer.sequences_to_texts(X_test[:TEST_COUNT])

y_test_tags = []
for row in y_test[:TEST_COUNT].tolist():
    tags = [index_tag[str(i)] for i, col in enumerate(row) if col == 1]
    y_test_tags.append(tags)

In [None]:
CLASSIFY_THRESHOLD = 0.5

y_pred = model.predict(X_test[:TEST_COUNT])
y_pred = (y_pred > CLASSIFY_THRESHOLD) * 1

y_pred_tags = []
for row in y_pred.tolist():
    tags = [index_tag[str(i)] for i, col in enumerate(row) if col > CLASSIFY_THRESHOLD]
    y_pred_tags.append(tags)

### View Prediction Results

It is better to view the results in a DataFrame.

In [None]:
prediction_tests = []
for x, y, z in zip(X_test_text, y_pred_tags, y_test_tags):
    prediction_tests.append({
        'Question': x,
        'Actual': ' '.join(sorted(z)),
        'Predictions': ' '.join(sorted(y)),
    })

pd.set_option('display.max_colwidth', -1)
pd.DataFrame(prediction_tests)

## The Big Finish

That is the big finish! Hopefully you've gotten a sense how to train neural networks using JupyterLab via the DC/OS Data Science Engine.

If you like my writing, check out *Weakly Supervised Learning* (O'Reilly, 2020).