In [1]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# How to use this notebook
[Google Cloud AI Platform Notebooks](https://cloud.google.com/ml-engine/docs/notebooks/overview) is a hosted JupyterLab environment that comes optimized for machine learning.  
   
Machine Learning in Google Cloud can be deployed from notebooks in GCP, local environments, or...
  
### Instructions for deploying Notebook on GCP:
1. [Set up your Google Cloud Platform (GCP) project](https://console.cloud.google.com/cloud-resource-manager?_ga=2.150499254.-1267767919.1550615012).
2. [Enable billing for the GCP project.](https://cloud.google.com/billing/docs/how-to/modify-project)
3. [Enable the Compute Engine API.](https://console.cloud.google.com/flows/enableapi?apiid=compute.googleapis.com&_ga=2.150499254.-1267767919.1550615012)
4. [Create a new AI Platform Notebooks instance.](https://cloud.google.com/ml-engine/docs/notebooks/create-new)
    - Select "TensorFlow 1.x" as the instance type, or ML framework.
    - Including a GPU for this tutorial is not necessary. However, it may be helpful for future (or existing SageMaker) models that do require GPUs. [TODO: add support for using GPU]
    - Select Python 3 if you are asked what type of kernel
5. Select "Open Jupyterlab" for the new notebook. You will be redirected to a URL for your notebook instance.
6. Clone this GitHub repository with the "Git clone" button in the notebook. [TODO: Find better link](https://cloud.google.com/ml-engine/docs/notebooks/save-to-github)  
   
Make sure to [shut down the Notebook](https://cloud.google.com/ml-engine/docs/notebooks/shut-down) when you're done with this tutorial to avoid any unnecessary charges. 

### Instructions for deploying Notebook locally:  
If you're running this notebook outside of GCP, upload it like you normally do for Jupyter Notebooks.   
   
In order to access GCP services from a local Jupyter Notebook, you'll need to set up GCP authentication, so that your API requests can be authorized. This can be done by setting the Application Default Credentials:
```
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/path/to/file.json"
```
Setting the Application Default Credentials is not necessary when using AI Platform Notebooks unless you would like to authenticate using a different service account than the default Compute Engine service account.

# Preprocess MNIST dataset
Convert the MNIST images into TFRecords and upload the TFRecords to Google Cloud Storage (GCS).
  
## Install libraries
AI Platform Notebooks comes pre-installed with TensorFlow and Keras (if TensorFlow 1.x is set as the ML framework at creation time). [TODO]: determine if this section is necessary

In [5]:
import os
import numpy as np
from tensorflow.keras.datasets import mnist
import tensorflow as tf

Cloud AI Platform may come preinstalled a different version of TensorFlow than SageMaker.

In [6]:
print(tf.__version__)

1.14.0


# Set up Google Cloud Storage
When working with AI Platform, it is recommended to store training data in GCS. Reasons include:
* Training data must be accessible to the training service
* Storing data in GCS reduces latency

More information on working with GCS with AI Platform can be found [here.](https://cloud.google.com/ml-engine/docs/tensorflow/working-with-cloud-storage) 
   
Specify a name for your existing (or new) GCS bucket with the BUCKET_NAME. It should be prefixed with "gs://" and must be unique across all buckets in Cloud Storage.

In [7]:
BUCKET_NAME='gs://ml-model-migration'
PROJECT='ml-model-migrations'
REGION='us-central1'

### Create a new Storage Bucket
If the GCS bucket must be created, run the following bash command. Creating a GCS bucket can either be done through the front-end or command line. More instructions on creating a Google Cloud Storage Bucket can be found [here.](https://cloud.google.com/storage/docs/creating-buckets)

In [8]:
!gsutil mb -l {REGION} {BUCKET_NAME}

Creating gs://ml-model-migration/...
ServiceException: 409 Bucket ml-model-migration already exists.


### Authentication and Authorization
The AI Platform notebook is authenticated as the default Compute Engine service account (unless otherwise specified at the time of notebook creation). This means that it should already have authorization to create new buckets and read/write from existing buckets. 
  
If you are getting authorization errors, review the relevant service account's IAM permissions. If the storage bucket is not part of the same project as this Notebook, the Compute Engine service account may need to be granted access to the Cloud Storage bucket.  
  
To check which service account should be granted access, verify which service account is authenticated for this notebook. The service account should be included as the "email" field for the access token's info:

In [4]:
# This does not work when using ADC
import subprocess

def access_token():
    return subprocess.check_output(
        'gcloud auth application-default print-access-token',
        shell=True,
    ).decode().strip()

!curl https://www.googleapis.com/oauth2/v1/tokeninfo?access_token={access_token()}

{
  "issued_to": "111616252376478783342",
  "audience": "111616252376478783342",
  "scope": "https://www.googleapis.com/auth/userinfo.email https://www.googleapis.com/auth/cloud-platform",
  "expires_in": 3584,
  "email": "946556229441-compute@developer.gserviceaccount.com",
  "verified_email": true,
  "access_type": "offline"
}


If using Application Default Credentials:
```
!cat "path/to/file.json"
```

google.auth.default()

# Write and Upload TFRecords

In addition to the [Google Cloud Storage Python Client](https://github.com/googleapis/google-cloud-python/tree/master/storage), some Python modules support reading/writing files locally and with GCS interchangeably. The module will read/write from GCS if the "gs://" prefix for the file or directory is specified.   
  
Options include:
- [tf.io.gfile](https://www.tensorflow.org/api_docs/python/tf/io/gfile) for file I/O wrappers without thread locking
- [tf.io.TFRecordWriter](https://www.tensorflow.org/api_docs/python/tf/io/TFRecordWriter) for writing records to a TFRecords file in GCS
- [pandas 0.24.0 or later](https://pandas.pydata.org/)  
  
Pandas also supports reading and writing files to S3. However, Pandas does not support creating TFRecords. 

In SageMaker, writing TFRecords and then uploading them to cloud storage requires two seperate operations. TFRecords must first be written locally and then uploaded to S3. [TODO: confirm statement] In GCP, these actions can be done in a single step: TFRecords can be directly written in Google Cloud Storage using the aforementioned modules. 

It

In [9]:
def load_mnist_data():   
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train = np.reshape(x_train, [-1, 28, 28, 1])
    x_test = np.reshape(x_test, [-1, 28, 28, 1])
    train_data = {'images':x_train, 'labels':y_train}
    test_data = {'images':x_test, 'labels':y_test}
    return train_data, test_data

In [17]:
def export_tfrecords(data_set, name, directory):
    """Converts MNIST dataset to tfrecords.
    
    Args:
        data_set: Dictionary containing a numpy array of images and labels.
        name: Name given to the exported tfrecord dataset.
        directory: Directory that the tfrecord files will be saved in.
    """
    def _int64_feature(value):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


    def _bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
    
    images = data_set['images']
    labels = data_set['labels']
    num_examples = images.shape[0]  
    rows = images.shape[1]
    cols = images.shape[2]
    depth = images.shape[3]

    filename = os.path.join(directory, name + '.tfrecords')
    print('Writing', filename)
   
    writer = tf.python_io.TFRecordWriter(filename)
    for index in range(num_examples):
        image_raw = images[index].tostring()
        example = tf.train.Example(features=tf.train.Features(feature={
            'height': _int64_feature(rows),
            'width': _int64_feature(cols),
            'depth': _int64_feature(depth),
            'label': _int64_feature(int(labels[index])),
            'image_raw': _bytes_feature(image_raw)}))
        writer.write(example.SerializeToString())
    writer.close()

In [10]:
train_data, test_data = load_mnist_data()

In [None]:
export_tfrecords(train_data, 'tfrecord', os.path.join(BUCKET_NAME, 'train'))
export_tfrecords(test_data, 'tfrecord', os.path.join(BUCKET_NAME, 'test'))

In [None]:
apache_beam

# Creating TFRecords with Apache Beam

In [None]:
# Only run this the first time! 
# !pip3 install --quiet apache-beam[gcp]
# !pip3 install --quiet tensorflow-transform

In [18]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
import tensorflow_transform as tft
from tensorflow_transform.beam import impl as tft_beam
from tensorflow_transform.beam import tft_beam_io
from tensorflow_transform.tf_metadata import dataset_metadata, dataset_schema
import datetime

W0718 02:09:19.508379 140616460195584 deprecation_wrapper.py:119] From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow_transform/beam/common.py:51: The name tf.ConfigProto is deprecated. Please use tf.compat.v1.ConfigProto instead.

W0718 02:09:19.521399 140616460195584 deprecation_wrapper.py:119] From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow_transform/beam/impl.py:283: The name tf.SparseTensorValue is deprecated. Please use tf.compat.v1.SparseTensorValue instead.



[Apache Beam](https://beam.apache.org/documentation/programming-guide/)....
A Beam pipeline represents a Directed Acyclic Graph of steps. It can have multiple input sources or multiple output sinks; a Beam operation (PTransform) can both read and output multiple PCollections.

Beam pipelines can be used with different runners, or engines.
[Cloud Dataflow](https://cloud.google.com/dataflow/) is a fully-managed service which can be used to parallelize data preprocessing.   
   
A PCollection represents a distributed data set that your Beam pipeline operates on. Your pipeline typically [creates an initial PCollection](https://beam.apache.org/documentation/programming-guide/#creating-a-pcollection) by reading data from an external dataset or in-memory data. In order to distribute preprocessing, each data sample must be on its own row.  
   
In order for in-memory data to be read into a PCollection, each individual sample must be on its own row. So, we'll need to convert our MNIST data from a dict of lists (dictionary contains two keys, 'images' and 'labels', each with a list of values) to a list of dicts (list contains a dictionary for each sample, each dictionary containing 'image' and 'label').   
 
Our samples will use the Python Beam, because most data scientists are familiar with it.

In [11]:
def convert_to_list_dicts(data_dict):
    """Convert dict of lists to list of dicts.
    
    Necessary to convert MNIST data so that each element of Beam PCollection
    represents an individual sample.
    """
    data_list = []
    for i in range(len(data_dict['images'])):
        element = {
            'image': data_dict['images'][i],
            'label': data_dict['labels'][i]
        }
        data_list.append(element)
    return data_list

In [12]:
train_data_list = convert_to_list_dicts(train_data)
test_data_list = convert_to_list_dicts(test_data)


## Create and run Beam pipeline locally

Apache Beam is a distributed batch and stream processing framework...   
   
Machine Learning can require lots of data. Preprocessing this data and generating TFRecords can be extremely compute intensive and take a lot of time. Apache Beam on Dataflow can drastically reduce the amount of time it will take to preprocess your data by distributing the work across multiple workers. 

Beam processes elements in random order.

Code is the same for batch and for stream preprocessing.
  
For our toy dataset, it might take longer to generate TFRecords using Beam than using TFRecordWriter in the prior example. This is due to the additional overhead. As the preprocessing becomes more complex and training data scales, Beam will significantly reduce processing time.
[TODO]: show time to run each (try with additional data)

Beam pipelines require a source of data to process. You can either use an existing I/O connector or create your own:
* [Built-in I/O Connectors](https://beam.apache.org/documentation/io/built-in/): to connect to Apache HDFS, Google Cloud Storage, local filesystems, BigQuery, etc.
* [Create your own Beam source](https://beam.apache.org/documentation/io/developing-io-overview/): to connect to a data store that isn't supported by Beam's existing I/O connectors

In [23]:
options = PipelineOptions()
temp_dir = os.path.join(BUCKET_NAME, 'temp')
runner = 'DirectRunner' # Use DirectRunner to run pipeline locally

In [20]:
def preprocessing_fn(inputs, rows, cols, depth):
    """Preprocesses 
    
    Args rows, cols, depths are side inputs.
    """
    image_raw = inputs['image'].tostring()
    label = int(inputs['label'])
    return {
        'height':rows,
        'width': cols,
        'depth': depth,
        'image_raw': image_raw,
        'label': label
    }

In [21]:
raw_data_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec({
        'height': tf.io.FixedLenFeature([], tf.int64),
        'width': tf.io.FixedLenFeature([], tf.int64),
        'depth': tf.io.FixedLenFeature([], tf.int64),
        'label': tf.io.FixedLenFeature([], tf.int64),
        'image_raw': tf.io.FixedLenFeature([], tf.string)
    })
)

W0718 02:09:31.726813 140616460195584 deprecation_wrapper.py:119] From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow_transform/tf_metadata/schema_utils.py:63: The name tf.SparseFeature is deprecated. Please use tf.io.SparseFeature instead.

W0718 02:09:31.728561 140616460195584 deprecation_wrapper.py:119] From /home/jupyter/.local/lib/python3.5/site-packages/tensorflow_transform/tf_metadata/schema_utils.py:110: The name tf.FixedLenFeature is deprecated. Please use tf.io.FixedLenFeature instead.



In [26]:
sample_image = train_data_list[0]['image']
rows = sample_image.shape[0]
cols = sample_image.shape[1]
depth = sample_image.shape[2]

output_dir = os.path.join(BUCKET_NAME, 'data', datetime.datetime.now().strftime("%m%d%Y%H%M%S"))

with beam.Pipeline(runner, options=options) as p:
    with tft_beam.Context(temp_dir=temp_dir):
        for dataset_type, dataset in [('Train', train_data_list),
                                      ('Test', test_data_list)]:
            input_data = (p 
                          | 'Create{}Data'.format(dataset_type) >> beam.Create(dataset)
                          | 'Preprocess{}Data'.format(dataset_type) >> beam.Map(
                              preprocessing_fn, rows, cols, depth))
            
            input_data | 'Write{}Data'.format(
                dataset_type) >> beam.io.tfrecordio.WriteToTFRecord(
                    os.path.join(output_dir, dataset_type),
                    coder=tft.coders.ExampleProtoCoder(raw_data_metadata.schema))
        

W0718 01:53:59.761785 140481446565632 tfrecordio.py:57] Couldn't find python-snappy so the implementation of _TFRecordUtil._masked_crc32c is not as fast as it could be.


## Create Beam pipeline on Google Cloud Dataflow
The same pipeline that you created to run Beam locally can be used on the Cloud with multiple workers.
Distributed processing over multiple workers.

The only differences between running locally and on the cloud 

Enable Dataflow API.
You create a PCollection by either reading data from an external source using Beam’s Source API, or you can create a PCollection of data stored in an in-memory collection class in your driver program. 

In [13]:
runner = 'DataflowRunner'

In [14]:
def read_data(dataset_type):
    from tensorflow.keras.datasets import mnist # import mnist in the package locally to ensure it's imported on workers
    # alternatively, use full path for tf.keras.datasets.mnist.load_data()

    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    (x, y) = (x_train, y_train) if dataset_type == 'Train' else (x_test, y_test)
    for image, label in zip(x, y):
        yield {'image': image, 'label': label}
            

In [15]:
from apache_beam.options.pipeline_options import SetupOptions

In [16]:
%%writefile setup.py

from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = [
    'tensorflow-transform'
]

setup(
    name='preprocessing',
    version='0.1',
    author='Kim Milam',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages()
)

Overwriting setup.py


If you stop the cell after the job shows up in the Dataflow console, the job will continue running.   
   
Importing packages onto workers:
* import package in ParDo module

You should be able to easily replace the source and transformations in the below Beam Pipeline, so that it works with your data source and desired preprocessing steps.
1. Create raw data and apply Python transformations
2. Transform data into TFrecord format and apply TF transformations
3. Serialize the TFrecord data and wrte to GCS

In [26]:
sample_image = train_data_list[0]['image']
rows = sample_image.shape[0]
cols = sample_image.shape[1]
depth = sample_image.shape[2]

output_dir = os.path.join(BUCKET_NAME, 'data', datetime.datetime.now().strftime("%m%d%Y%H%M%S"))
options = PipelineOptions()
options.view_as(SetupOptions).setup_file = './setup.py'
options.view_as(GoogleCloudOptions).project = PROJECT
options.view_as(GoogleCloudOptions).job_name = 'job'+datetime.datetime.now().strftime("%m%d%Y%H%M%S")
options.view_as(GoogleCloudOptions).staging_location = os.path.join(BUCKET_NAME, 'staging')
temp_dir = os.path.join(BUCKET_NAME, 'temp')
options.view_as(GoogleCloudOptions).temp_location = temp_dir

with beam.Pipeline(runner, options=options) as p:
    with tft_beam.Context(temp_dir=temp_dir):
        for dataset_type in ['Train', 'Test']: # iterate through dataset_types so PCollections stay separate
            input_data = (p 
                          | 'Create{}'.format(dataset_type) >> beam.Create([dataset_type])
                          | 'Read{}Data'.format(dataset_type) >> beam.FlatMap(read_data)
                          | 'Preprocess{}Data'.format(dataset_type) >> beam.Map(preprocessing_fn, rows, cols, depth)
                         )
            input_data | 'Write{}Data'.format(
                dataset_type) >> beam.io.tfrecordio.WriteToTFRecord(
                    os.path.join(output_dir, dataset_type),
                    coder=tft.coders.ExampleProtoCoder(raw_data_metadata.schema))

W0718 02:20:10.741082 140616460195584 pipeline_options.py:261] Discarding unparseable args: ['-f', '/home/jupyter/.local/share/jupyter/runtime/kernel-6ec692cb-8757-44b0-8999-19b2bd5db10c.json']
W0718 02:20:10.745774 140616460195584 pipeline_options.py:261] Discarding unparseable args: ['-f', '/home/jupyter/.local/share/jupyter/runtime/kernel-6ec692cb-8757-44b0-8999-19b2bd5db10c.json']


Need to make sure that all of the required packages are pickled and installed on the worker nodes.
  
  
From Beam documentation:
When you run your pipeline locally, the packages that your pipeline depends on are available because they are installed on your local machine. However, when you want to run your pipeline remotely, you must make sure these dependencies are available on the remote machines.

By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Cloud Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Cloud Dataflow worker.

We use the save_main_session option because one or more DoFn's in this
workflow rely on global context (e.g., a module imported at module level).
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = True

Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.


<b>Beam Pipeline</b>: Graph of transformations   
<b>PTransform</b>: Transform performing massively parallel computation   
<b>PCollection</b>: Data flowing in the pipeline   

# Migrate data from AWS to GCP

In [None]:
print("hi")