## (Bonus) Streaming data prediction using Cloud ML Engine 

This notebook illustrates:

1. Create a PubSub Topic and Subscription.
2. Create a Dataflow Streaming pipeline to consume messages.
3. Use the deployed Cloud ML Engine API to make prediction.
4. Stroe the data and the prediction in BigQuery.
5. Run a stream data simulator.

In [None]:
#%%bash

#pip install -U protobuf
#pip install -U apache_beam
#pip install six==1.10
#pip freeze

In [1]:
import time
import datetime
from google.cloud import pubsub
import json
import apache_beam as beam
import os
print(beam.__version__)

2.2.0


No handlers could be found for logger "oauth2client.contrib.multistore_file"


In [None]:
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
RUNNER = "Dataflow"
PROJECT = 'ksalama-gcp-playground'
DATASET = 'playground_ds'
TABLE = 'babyweight_estimates'
STG_BUCKET = 'stagging-ksalama-gcs-cloudml'
REGION = 'europe-west1'
TOPIC = 'babyweights'
SUBSCRIPTION='babyweights-sub'

## Create PubSub Topic and Subscription

In [None]:
client = pubsub.Client()
topic = client.topic(TOPIC)

if not topic.exists():
    print('Creating pub/sub topic {}...'.format(TOPIC))
    topic.create()

print('Pub/sub topic {} is up and running'.format(TOPIC))
print("")


subscription = topic.subscription(name=SUBSCRIPTION)
if not subscription.exists():
    print('Creating pub/sub subscription {}...'.format(SUBSCRIPTION))
    subscription.create(client=client)

print ('Pub/sub subscription {} is up and running'.format(SUBSCRIPTION))
print("")

## Submit Dataflow Stream Processing Job

In [None]:
pubsub_subscription = "projects/{}/subscriptions/{}".format(PROJECT,SUBSCRIPTION)
#pubsub_topic = "projects/{}/topics/{}".format(PROJECT,TOPIC)

schema_definition = {
    'source_id':'INTEGER',
    'source_timestamp':'TIMESTAMP',
    'estimated_weight_kg':'FLOAT',
    'is_male': 'STRING',
    'mother_age': 'FLOAT',
    'mother_race': 'STRING',
    'plurality': 'FLOAT',
    'gestation_weeks': 'INTEGER',
    'mother_married': 'BOOLEAN',
    'cigarette_use': 'BOOLEAN',
    'alcohol_use': 'BOOLEAN'
}

schema = str(schema_definition).replace('{','').replace('}','').replace("'",'').replace(' ','')

print('Pub/Sub Subscription Link: {}'.format(pubsub_subscription))
print('')
print('BigQuery Dataset: {}'.format(DATASET))
print('BigQuery Tabe: {}'.format(TABLE))
print('')
print('BigQuery Table Schema: {}'.format(schema))


def estimate_weight(json_message):
    
    PROJECT='ksalama-gcp-playground'
    MODEL_NAME='babyweight_estimator'
    VERSION='v1'
    
    import json
    from googleapiclient import discovery
    from oauth2client.client import GoogleCredentials
    
    credentials = GoogleCredentials.get_application_default()
    api = discovery.build('ml', 'v1', credentials=credentials,
                discoveryServiceUrl='https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json')

    instance = json.loads(json_message)
    source_id = instance.pop('source_id')
    source_timestamp = instance.pop('source_timestamp')
    
    request_data = {'instances': [instance]}

    model_url = 'projects/{}/models/{}/versions/{}'.format(PROJECT, MODEL_NAME, VERSION)
    response = api.projects().predict(body=request_data, name=model_url).execute()

    estimates = list(map(lambda item: round(item["scores"],2)
        ,response["predictions"]
    ))
    
    estimated_weight_kg =  round(int(estimates[0]) * 0.453592,2)
    
    instance['estimated_weight_kg'] = estimated_weight_kg
    instance['source_id'] = source_id
    instance['source_timestamp'] = source_timestamp

    return instance
  
def run_babyweight_estimates_streaming_pipeline():
    
    job_name = 'ingest-babyweight-estimates-{}'.format(datetime.datetime.now().strftime('%y%m%d-%H%M%S'))
    print 'Launching Dataflow job {}'.format(job_name)
    print 'Check the Dataflow jobs on Google Cloud Console...'

    STG_DIR = 'gs://{}/babyweight'.format(STG_BUCKET)

    options = {
        'staging_location': os.path.join(STG_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(STG_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'streaming': True,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
      }


    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    
    pipeline = beam.Pipeline(RUNNER, options=opts)
      
    (
      pipeline | 'Read data from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription) 
               | 'Process message' >> beam.Map(estimate_weight)
               | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=PROJECT, dataset=DATASET, table=TABLE, 
                                                                schema=schema,
                                                                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                                               )
    )

    pipeline.run()

In [None]:
run_babyweight_estimates_streaming_pipeline()

In [None]:
instances =  [
      {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'Asian Indian',
        'plurality': 1.0,
        'gestation_weeks': 39,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'False'
      },
      {
        'is_male': 'False',
        'mother_age': 29.0,
        'mother_race': 'Asian Indian',
        'plurality': 1.0,
        'gestation_weeks': 38,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'False'
      },
      {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'White',
        'plurality': 1.0,
        'gestation_weeks': 39,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'False'
      },
      {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'White',
        'plurality': 2.0,
        'gestation_weeks': 37,
        'mother_married': 'True',
        'cigarette_use': 'False',
        'alcohol_use': 'True'
      }
  ]

## Send Data Points to PubSub

In [None]:
from random import shuffle

iterations = 10000
sleep_time = 1

for i in range(iterations):
    
    shuffle(instances)
    
    for data_point in instances:
        
        source_timestamp = datetime.datetime.now().strftime(TIME_FORMAT)
        source_id = str(abs(hash(str(data_point)+str(source_timestamp))) % (10 ** 10))
        data_point['source_id'] = source_id
        data_point['source_timestamp'] = source_timestamp
        
        message = json.dumps(data_point)
        topic.publish(message=message, source_id = source_id, source_timestamp=source_timestamp)

    print("Batch {} was sent. Last Message was: {}".format(i, message))
    print("")

    time.sleep(sleep_time)

print("Done!")

## Consume PubSub Topic 

In [None]:
client = pubsub.Client()
topic = client.topic(TOPIC)
subscription = topic.subscription(SUBSCRIPTION)
message = subscription.pull()

print("source_id", message[0][1].attributes["source_id"])
print("source_timestamp:", message[0][1].attributes["source_timestamp"])
print("")
print(message[0][1].data)