In [1]:
import tensorflow as tf

# import json
# from oauth2client.client import GoogleCredentials
# import requests

# from google.cloud import pubsub_v1

# import apache_beam as beam
# from apache_beam.options.pipeline_options import PipelineOptions

Look at list of image-files which will be sent via pub/sub messaging, for inferencing/prediction..

In [2]:
filenames = tf.io.gfile.glob("gs://fire_detection_anurag/test_images/*")

filenames

['gs://fire_detection_anurag/test_images/fire1.jpg',
 'gs://fire_detection_anurag/test_images/fire2.jpg',
 'gs://fire_detection_anurag/test_images/fire3.jpg',
 'gs://fire_detection_anurag/test_images/fire4.jpg',
 'gs://fire_detection_anurag/test_images/fire5.jpg',
 'gs://fire_detection_anurag/test_images/no_fire1.jpg',
 'gs://fire_detection_anurag/test_images/no_fire2.jpg',
 'gs://fire_detection_anurag/test_images/no_fire3.jpg',
 'gs://fire_detection_anurag/test_images/no_fire4.jpg',
 'gs://fire_detection_anurag/test_images/no_fire5.jpg']

## Subscribe to the pub/sub topic (created in other notebook)

In [3]:
project_id = "kubeflow-1-0-2"

topic_id = "inference_images"  # created in other notebook
pub_sub_topic = "projects/{}/topics/{}".format(project_id, topic_id)

subscription_id = "my_subscription"
subscription_id = "projects/{}/subscriptions/{}".format(project_id, subscription_id)

pub_sub_topic, subscription_id

('projects/kubeflow-1-0-2/topics/inference_images',
 'projects/kubeflow-1-0-2/subscriptions/my_subscription')

In [4]:
!gcloud pubsub subscriptions create $subscription_id --topic $topic_id

Created subscription [projects/kubeflow-1-0-2/subscriptions/my_subscription].


## Connect pub/sub topic to Apache Beam pipeline

In [6]:
%%writefile pub_sub_beam.py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import json
from oauth2client.client import GoogleCredentials
import requests

pub_sub_topic = 'projects/kubeflow-1-0-2/topics/inference_images'
subscription_id = 'projects/kubeflow-1-0-2/subscriptions/my_subscription'
PROJECT = "kubeflow-1-0-2"
REGION = "us-central1"
ENDPOINT_ID = "6569494015031377920"

class ModelPredict:
    def __init__(self, project, region, endpoint_id):
        self._api = "https://{}-aiplatform.googleapis.com/v1/projects/{}/locations/{}/endpoints/{}:predict".format(
                                                                                                                   region,
                                                                                                                   project,
                                                                                                                   region,
                                                                                                                   endpoint_id
                                                                                                                  )   
        
    def __call__(self, filenames):        
        token = GoogleCredentials.get_application_default().get_access_token().access_token
        if isinstance(filenames, str):
            # Only one element, put it into a batch of 1.
            data = {
                    "instances": [
                                  {"filenames": filenames}
                                 ]
                   }
        else:
            data = {
                    "instances": []
                   }
            for f in filenames:
                data["instances"].append({
                                          "filenames" : f
                                        })
        # print(data)
        headers = {"Authorization": "Bearer " + token }
        response = requests.post(self._api,
                                 json=data,
                                 headers=headers)
        response = json.loads(response.content.decode("utf-8"))
        
        if isinstance(filenames, str):
            result = response["predictions"][0]
            result["filename"] = filenames
            yield result
        else:
            for (a,b) in zip(filenames, response["predictions"]):
                result = b
                result["filename"] = a
                yield result

pipeline_options = PipelineOptions(
                                   streaming=True,  # required for Beam connector with pub/sub
                                  )
                
with beam.Pipeline(options=pipeline_options) as p:
    (p 
     | "getinput" >> beam.io.ReadFromPubSub(subscription=subscription_id)
     | "batch" >> beam.BatchElements(min_batch_size=2,
                                     max_batch_size=3)
     | "getpred" >> beam.FlatMap(ModelPredict(PROJECT,
                                              REGION,
                                              ENDPOINT_ID))
     | "write" >> beam.Map(print)
    )

Overwriting pub_sub_beam.py


## Inferencing on streaming (or batch of) images

In [7]:
!python pub_sub_beam.py

{'probability': 0.994521737, 'image_type_str': 'Fire', 'image_type_int': 0, 'filename': b'gs://fire_detection_anurag/test_images/fire1.jpg'}
{'image_type_int': 0, 'probability': 0.711659372, 'image_type_str': 'Fire', 'filename': b'gs://fire_detection_anurag/test_images/fire2.jpg'}
{'probability': 0.987360537, 'image_type_str': 'Fire', 'image_type_int': 0, 'filename': b'gs://fire_detection_anurag/test_images/fire3.jpg'}
{'image_type_str': 'Fire', 'image_type_int': 0, 'probability': 0.983968496, 'filename': b'gs://fire_detection_anurag/test_images/fire4.jpg'}
{'image_type_str': 'Fire', 'image_type_int': 0, 'probability': 0.995875299, 'filename': b'gs://fire_detection_anurag/test_images/fire5.jpg'}
{'image_type_int': 1, 'probability': 0.827532887, 'image_type_str': 'No-Fire', 'filename': b'gs://fire_detection_anurag/test_images/no_fire1.jpg'}
{'image_type_int': 1, 'probability': 0.990039051, 'image_type_str': 'No-Fire', 'filename': b'gs://fire_detection_anurag/test_images/no_fire2.jpg'}
{

## Clean-up

In [8]:
!gcloud pubsub subscriptions delete $subscription_id

Deleted subscription [projects/kubeflow-1-0-2/subscriptions/my_subscription].
