In [19]:
from google.cloud import pubsub_v1
import logging
from concurrent import futures
import base64


logging.basicConfig(level=logging.INFO)

In [2]:
project_id = "mlops-camp"
topic_id = "ride_events"

subscription_id = "gcf-prediction_handler-us-central1-ride_events"

In [3]:
def get_callback(future, message):
    def callback(future):
        try:
            logging.info("Published message %s.", future.result(timeout=1))
        except futures.TimeoutError as exc:
            print("Publishing %s timeout: %r", message, exc)
    return callback

In [4]:
# Initialize a Publisher client.
client = pubsub_v1.PublisherClient()
# Create a fully qualified identifier of form `projects/{project_id}/topics/{topic_id}`
topic_path = client.topic_path(project_id, topic_id)

In [32]:
data = b"Hello, World!"
api_future = client.publish(topic_path, data)

In [33]:
api_future.result()

'5010524709024951'

Received Message {
  data: b'Hello, World!'
  ordering_key: ''
  attributes: {}
}.
Acknowledged 5010524709024951.


In [34]:
data.decode()

'Hello, World!'

INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 404 Resource not found (resource=projects/mlops-camp/subscriptions/gcf-prediction_handler-us-central1-ride_events).
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-recoverable stream error 404 Resource not found (resource=projects/mlops-camp/subscriptions/gcf-prediction_handler-us-central1-ride_events).
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:RPC termination has signaled streaming pull manager shutdown.
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 404 Resource not found (resource=projects/mlops-camp/subscriptions/gcf-prediction_handler-us-central1-ride_events).
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-recoverable stream error 404 Resource not found (resource=projects/mlops-camp/subscriptions/gcf-predict

In [21]:
subscriber_client = pubsub_v1.SubscriberClient()
subscription_path = subscriber_client.subscription_path(project_id, subscription_id)

In [22]:
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        print(f"Received {message}.")
        # Acknowledge the message. Unack'ed messages will be redelivered.
        message.ack()
        print(f"Acknowledged {message.message_id}.")

In [23]:
streaming_pull_future = subscriber_client.subscribe(subscription_path, callback)

In [24]:
streaming_pull_future.result()

Received Message {
  data: b'Hello, World!'
  ordering_key: ''
  attributes: {}
}.
Acknowledged 5005877610612138.


KeyboardInterrupt: 

In [27]:
publish_futures = []

for i in range(1, 5):
    _message = f"message {i}"
    message = _message.encode("utf-8")
    
    future = client.publish(topic_path, message)
    future.add_done_callback(get_callback(future, message))
    
    publish_futures.append(future)

# Wait for all the futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

INFO:root:Published message 5007166147401026.
INFO:root:Published message 5007166147401027.
INFO:root:Published message 5007166147401028.
INFO:root:Published message 5007166147401029.


DoneAndNotDoneFutures(done={<Future at 0x10eddaa00 state=finished returned str>, <Future at 0x10edf8190 state=finished returned str>, <Future at 0x10edf8040 state=finished returned str>, <Future at 0x10edd44f0 state=finished returned str>}, not_done=set())

Received Message {
  data: b'message 1'
  ordering_key: ''
  attributes: {}
}.Received Message {
  data: b'message 2'
  ordering_key: ''
  attributes: {}
}.
Acknowledged 5007166147401027.
Received Message {
  data: b'message 3'
  ordering_key: ''
  attributes: {}
}.
Acknowledged 5007166147401028.

Acknowledged 5007166147401026.
Received Message {
  data: b'message 4'
  ordering_key: ''
  attributes: {}
}.
Acknowledged 5007166147401029.


INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 503 The service was unable to fulfill your request. Please try again. [code=8a75]
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 503 The service was unable to fulfill your request. Please try again. [code=8a75]
INFO:google.api_core.bidi:Re-established stream
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 503 The service was unable to fulfill your request. Please try again. [code=8a75]
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 503 The service was unable to fulfill your request. Please try again. [code=8a75]
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 503 Connection reset by peer
INFO:google.cloud.pubsub_v1.subscriber._protocol.stre

In [5]:
from utils.models import RideData

In [22]:
event = {
    "ride": {
        "PULOCATIONID": 130, 
        "DOLOCATIONID": 205, 
        "TRIP_DISTANCE": 3.66
        },
    "ride_id": 123
    }

data = {"data": base64.b64encode(event.decode("utf-8"))}




AttributeError: 'dict' object has no attribute 'decode'

TypeError: parse_raw() takes exactly 2 positional arguments (1 given)

In [42]:
import base64

import mock

import main


mock_context = mock.Mock()
mock_context.event_id = '617187464135194'
mock_context.timestamp = '2019-07-15T22:09:03.761Z'
mock_context.resource = {
    'name': 'projects/my-project/topics/my-topic',
    'service': 'pubsub.googleapis.com',
    'type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
}


In [43]:
mock_context

<Mock id='4551069600'>

In [24]:
from typing import get_type_hints, Optional
from base64 import b64decode

from pydantic import (  # pylint: disable=no-name-in-module
    BaseModel,
    Field,
    root_validator,
    ValidationError,
)

#import structlog  # type: ignore
#from flask import Request, jsonify
from google.cloud.functions.context import Context  # type:  ignore

In [25]:


def inject_pubsub_model(func):
    """ Wrap method with pydantic dependency injection for PubSub functions """

    def wrapper(event: dict, context: Context):
        kwargs = {}
        for arg_name, arg_type in get_type_hints(func).items():
            parse_raw = getattr(arg_type, "parse_raw", None)
            if callable(parse_raw):
                try:
                    kwargs[arg_name] = parse_raw(
                        b64decode(event["data"]).decode("utf-8")
                    )
                except ValidationError as err:
                    logger.error("Validation error", err=err)
                    raise err

                logger.info(
                    "Decoded model and injected",
                    model=arg_type.__name__,
                    func=func.__name__,
                )

        return func(**kwargs)

    return wrapper