## Constructing a Data Pipeline in TensorFlow

This notebook shows how to go from a blank slate (an empty Jupyter Notebook in Amazon Sagemaker) all the way to state of the art results in Natural Language Processing using deep learning. This particular notebook uses TensorFlow in order to construct a data pipeline. The other notebook in this repository ("2-Sentiment-Classification-with-BERT.ipynb") builds on the work that we have done here; it uses the data pipeline to classify Yelp reviews as positive or negativate. 

Notes: 

1) The notebook is constructed with teaching in mind, and as a result, all of the primary code is displayed within this notebook.

2) In order to teach effectively, this notebook emphasizes clarity over code efficiency. For example, in parts of the notebook, variables are duplicated in order to make the code easier to read and understand. 

3) This notebook was run on [Amazon Sagemaker](https://aws.amazon.com/sagemaker/) using ml.t2.xlarge. Some of the imports may be slightly different if you are using Google Colab or some other system.

### Step 1: Import and Setup

Start off by loading the basics. This first step is to make sure that that your imports take in the most recent changes to a file.

In [1]:
%load_ext autoreload
% autoreload 2

Next, a good portion of python packages are installed with a tool called pip. The following installed the most current version of pip.

In [2]:
!pip install --upgrade pip

Requirement already up-to-date: pip in /home/ec2-user/anaconda3/envs/tensorflow_p36/lib/python3.6/site-packages (19.3.1)


In this exercise, we are just going to put together a data pipeline. For the example shown, a CPU (ml.t2.xlarge on Amazon Sagemaker) is sufficient enough to do this work.
As a result, I install Tensorflow 2.0 without the GPU features. 

Please note that this notebook only involves the construction of the pipeline. As a result, I don't handle the error associated with the tensorflow-serving-api (which focuses on the deployment of a model).

In [3]:
!yes | pip uninstall -q tensorflow
!pip install -q tensorflow==2.0.0rc1

Proceed (y/n)? yes: standard output: Broken pipe
yes: write error
[31mERROR: tensorflow-serving-api 1.14.0 has requirement tensorflow~=1.14.0, but you'll have tensorflow 2.0.0rc1 which is incompatible.[0m


In [4]:
import tensorflow as tf
import pandas as pd
from tensorflow.python.lib.io.tf_record import TFRecordWriter


In [5]:
tf.__version__

'2.0.0-rc1'

### Step 2 (Optional): Constructing the Yelp Dataset

The following code is going to construct the tab delimited information that we are going to use in our example. In this example, we are going to look at Yelp in order to
understand how to classify comments based on emotion (either 'Positive' or 'Negative'). To make sure that we have our process correct, we can verify our results against the rating
that a customer gives to yelp ('the true representation of sentiment').

The approach here can then be easily extended to other types of social media (e.g. Twitter, Blogs, etc).

The code uses a python package called requests in order to extract 50 stores from Arizona, California, Michigan, Texas, Washington, Florida, and Ohio. Then the code will pull 3 reviews from each of the stores. The net result is to get at least 750 instances of good data (the rough math is that we would get 50*3*7=1050 reviews, but there will be some defective data, and this will reduce the final amount of data collected).


Note: In order for this code to work, you will need to go onto the Yelp website, register for a developer account, and get your own api_key. 

In [6]:
! pip install -q requests

In [7]:
import requests
import json
import csv
import time

In [8]:
api_key='<put your YELP api key here>'
headers = {'Authorization': 'Bearer %s' % api_key}

In [9]:
states = ['AZ', 'CA', 'MI', 'TX', 'WA', 'FL', 'OH']

api_calls = []

for state in states:
    params={'term': 'AT&T', 'location' : state, 'limit': '50'}
    req = requests.get('https://api.yelp.com/v3/businesses/search', 
                            params=params, headers=headers)
    api_calls.append(req)
    time.sleep(5)
    

In [10]:
att_ids = []

for api_call in api_calls:
    try:
        yelp_json = json.loads(api_call.text)
        for shop_json in yelp_json['businesses']:
            att_ids.append(shop_json['id'])
    except:
        continue

It should be noted in the code below, that public APIs don't work well when requests are made without any kind of delay. I put ```time.sleep(1)``` in the code in order to increase the chances that the public api will return back with usable information.

In [11]:
idx = 1
negative = '1'
neutral = '2'
positive = '3'

sentiment_lookup = {'1': negative, '2': negative, '3': neutral, '4': positive, '5': positive}


with open('data/Yelp-ATT-Social-Media.tsv', 'wt') as out_file:
    tsv_writer = csv.writer(out_file, delimiter='\t')
    tsv_writer.writerow(['idx', 'utterance', 'sentiment'])
    
    for att_id in att_ids:
        url = f'https://api.yelp.com/v3/businesses/{att_id}/reviews'
        # Get the 3 reviews per shop
        try:
            time.sleep(1)
            req = requests.get(url,  headers=headers)
            yelp_json = json.loads(req.text)

            for review in yelp_json['reviews']:
                sentiment = sentiment_lookup[str(review['rating'])]
                tsv_writer.writerow([str(idx), review['text'], sentiment])
                idx = idx + 1
        except: 
            continue

### Step 3: Preparing the data

Even if you have not run the above code, I have already aggreated this information (for "public use" and non-commerical, research purposes). This aggregate information is in 
the data folder in the Yelp-ATT-Social-Media.tsv file. 

It is important to note that this file really just represents Yelp comments that were pulled with the keyword AT&T. Because of the way that the Yelp API is setup, this means that the
information comes from more than just AT&T Stores

In [12]:
df = pd.read_csv("data/Yelp-ATT-Social-Media.tsv", sep="\t", header=0)

As seen in the Yelp code above and in the following Pandas dataframe, I have the following coding for sentiment : '1': Negative, '2': Neutral, '3': Positive. For simplicity and 
teaching purposes, we will only look at the case, positive and negative responses. 

In [13]:
pd.set_option('display.max_colwidth', -1)
df.head()

Unnamed: 0,idx,utterance,sentiment
0,1,Ok so I finally amped myself up enough to go into the store about a wifi issue I'm having with my phone. I was ready to go down the list of everything I...,3
1,2,"Came into this location to make a cash payment arrived at 2:10pm today , went to try to use the kiosk they have inside the store , warning don't use it it's...",1
2,3,Its. Very bad service And big lier people work there\nI buy new phone there after when i open the phone i see it's used \nI back to at&t we tell me this not...,1
3,4,I've been with AT&T since 2009. I went into the store to purchase another phone and add another phone line to my account. The first five minutes of the...,1
4,5,"The most amazing customer service ever!!! Went to upgrade 4 lines. I was assisted by quasim, garrett, and Katie. These three employees are a dream team! I...",3


In [14]:
# Remove neutral responses from this teaching example. 
neutral_responses = df[df['sentiment'] == 2]
df = df.drop(neutral_responses.index)

One of the main insights that I have seen is that <b> deep learning algorithms do not train well if the training data is skewed and if the deep learning "loss" function does not account for this imbalance. </b>

For simplicity, we will make sure that there is an equal number of positive and negative responses in our training data. 

In [15]:
# make sure that the split between negative and positive is equally balanced in order to get effective training.
df_positive = df[df['sentiment'] == 3]
df_negative = df[df['sentiment'] == 1]

df_train_positive = df_positive.sample(369)
df_positive_remaining = df_positive.drop(df_train_positive.index)
df_validate_positive = df_positive_remaining.sample(frac=0.5)
df_test_positive = df_positive_remaining.drop(df_validate_positive.index)

assert len(df_train_positive) + len(df_validate_positive) + len(df_test_positive) == len(df[df['sentiment'] == 3])

df_train_negative = df_negative.sample(369)
df_negative_remaining = df_negative.drop(df_train_negative.index)
df_validate_negative = df_negative_remaining.sample(frac=0.5)
df_test_negative = df_negative_remaining.drop(df_validate_negative.index)

assert len(df_train_negative) + len(df_validate_negative) + len(df_test_negative) == len(df[df['sentiment'] == 1])

assert len(df_train_positive) == len(df_train_negative)

train_df = pd.concat([df_train_positive, df_train_negative]).sample(frac=1)
validate_df = pd.concat([df_validate_positive, df_validate_negative]).sample(frac=1)
test_df = pd.concat([df_test_positive, df_test_negative]).sample(frac=1)


Up to this point, the data had been split and examined using a popular python package called Pandas. Now, just use only a numpy representation of the values. 

In [16]:
train_csv = train_df.values
validate_csv = validate_df.values
test_csv = test_df.values


### Step 4: Build the TFRecords

Our example is taking data (the CSV values that were extracted from Pandas dataframes), and converting that into a string that is written out to a TFRecord. Although our example fits within memory, it is easy to see how the following for loop could be extended to work with streaming social media data. 

Beause the TFRecord could be generated from streams (a series of files representing potentially infinite data), we need to eventually tell the model how many steps that we need to process. When setting up "in memory" examples in order to set up a pipeline, I have found it useful to capture the size of the train, validation, and test datasets. This is captured in the following generate_json_info function and in the corresponding "data/yelp_info.json" file. 


In [17]:
import time
import json

In [18]:
def create_tf_example(features, label):
    tf_example = tf.train.Example(features=tf.train.Features(feature={
        'idx': tf.train.Feature(int64_list=tf.train.Int64List(value=[features[0]])),
        'sentence': tf.train.Feature(bytes_list=tf.train.BytesList(value=[features[1].encode('utf-8')])),
        'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
    }))

    return tf_example

In [19]:
def convert_csv_to_tfrecord(csv, file_name):
    start_time = time.time()
    writer = TFRecordWriter(file_name)
    for index, row in enumerate(csv):
        try:
            if row is None:
                raise Exception('Row Missing')
            if row[0] is None or row[1] is None or row[2] is None:
                raise Exception('Value Missing')
            if row[1].strip() is '':
                raise Exception('Utterance is empty')
            features, label = row[:-1], row[-1]
            example = create_tf_example(features, label)
            writer.write(example.SerializeToString())
        except Exception as inst:
            print(type(inst))
            print(inst.args)
            print(inst)
    writer.close()
    print(f"{file_name}: --- {(time.time() - start_time)} seconds ---")

def generate_json_info(local_file_name):
    info = {"train_length": len(train_df), "validation_length": len(validate_df),
            "test_length": len(test_df)}

    with open(local_file_name, 'w') as outfile:
        json.dump(info, outfile)
    
    
convert_csv_to_tfrecord(train_csv, "data/yelp_train.tfrecord")
convert_csv_to_tfrecord(validate_csv, "data/yelp_validate.tfrecord")
convert_csv_to_tfrecord(test_csv, "data/yelp_test.tfrecord")

generate_json_info("data/yelp_info.json")

data/yelp_train.tfrecord: --- 0.03516888618469238 seconds ---
data/yelp_validate.tfrecord: --- 0.004355669021606445 seconds ---
data/yelp_test.tfrecord: --- 0.004141807556152344 seconds ---


### Step 5: Confirm that TFRecord has encoded correctly

After you generate the TFRecords and the corresponding size of the records, it is good practice to make sure that everything encoded correctly. We can confirm this by setting up code to pull from the TFRecords, setting the Tensors that we want to extract, and setting up a python iterator to briefly inspect the code. 

In [20]:
tr_ds = tf.data.TFRecordDataset("data/yelp_train.tfrecord")        # The dataset for train information

If you know beforehand the number of items in a feature, you should use FixedLenFeature. If you do not, use a VarLenFeature. In our case, we know that we are taking in 1 index, 1 text utterance (the 'sentence'), and 1 label per example. As a result, each of these are FixedLenFeatures . 

In [21]:
feature_spec = {
    'idx': tf.io.FixedLenFeature([], tf.int64),
    'sentence': tf.io.FixedLenFeature([], tf.string),
    'label': tf.io.FixedLenFeature([], tf.int64)
}

In [22]:
def parse_example(example_proto):
  # Parse the input tf.Example proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_spec)

In [23]:
tr_parse_ds = tr_ds.map(parse_example)
val_parse_ds = val_ds.map(parse_example)

In [24]:
dataset_iterator = iter(tr_parse_ds)

In [25]:
dataset_iterator.get_next()

{'idx': <tf.Tensor: id=56, shape=(), dtype=int64, numpy=367>,
 'label': <tf.Tensor: id=57, shape=(), dtype=int64, numpy=3>,
 'sentence': <tf.Tensor: id=58, shape=(), dtype=string, numpy=b'This is a Metro PCS store that excels in customer service! The manager, all the way down to the staff are great at assisting customers. Almost everyone in...'>}



Congratulations! At this point you now have your data encoded as a TFRecord. You are now in a position to feed this information to a data pipeline (tf.data.Dataset) which makes it incredibly easy to then push that data to two types of Tensorflow modeling frameworks (the Keras model or the TensorFlow Estimator). 

At the point where you are ready, take a look at the next tutorial in this respository [("2-Sentiment-Classification-with-BERT.ipynb")](https://github.com/ralphbrooks/tensorflow-tutorials/blob/master/2-Sentiment-Classification-with-BERT.ipynb) .



Also, for the first time in 10 years, I am back on the job market looking for consulting opportunities or full time employment.  If you think I can be of help to you, feel free to reach out. I am on twitter at [@ralphbrooks](https://twitter.com/ralphbrooks) .