# Streaming

Building a custom Dataflow job that will parse dates in our payload and save to a different BigQuery table.

In [1]:
import logging
import json
import time
import traceback
from datetime import datetime
from pytz import timezone

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners import DataflowRunner

import google.auth

from utils.utils import publish_to_topic
from IPython.core.display import display, HTML

In [2]:
# Auth using the service account the notebook is running as; if problems then add permissions in IAM
project = google.auth.default()[1]

options = pipeline_options.PipelineOptions(
    streaming=True,
    project=project
)

Collect messages for one minute.

In [3]:
ib.options.recording_duration = '1m'

Interactive runner will execute the pipeline for the configured 1 minute.

In [4]:
p = beam.Pipeline(InteractiveRunner(), options=options)

topic = "projects/data2-340001/topics/arduino-telemetry"

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
            | beam.Map(json.loads))

ib.show(pubsub)

<IPython.core.display.Javascript object>

In [5]:
#parse the timestanp into a date time format with my timezone
def to_timezone(timestamp):
    date = datetime.fromtimestamp(timestamp)
    date_format='%Y-%m-%d %H:%M:%S'
    date = date.strftime(date_format)
    return date

In [6]:
#add a step to the pipeline to parse time timestamp
p = beam.Pipeline(InteractiveRunner(), options=options)

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
            | beam.Map(json.loads)
            | beam.Map(lambda x: {"timestamp":to_timezone(x['timestamp']), "temp":x['temp'], "humidity":x['humidity'], 
                                  "pressure":x['pressure'], "illuminance":x['illuminance']}))

ib.show(pubsub)

<IPython.core.display.Javascript object>

Now let's run this as a Dataflow job to process the messages.

In [7]:
logging.getLogger().setLevel(logging.INFO)

def streaming_pipeline(project, region="us-central1"):
    from datetime import datetime
    #parse the timestanp into a date time format with my timezone
    def to_timezone(timestamp):
        date = datetime.fromtimestamp(timestamp)
        date_format='%Y-%m-%d %H:%M:%S'
        date = date.strftime(date_format)
        return date
    
    table = "data2-340001:sensor_data.arduino_prepared"
    schema = "timestamp:datetime,temp:float,humidity:float,pressure:float,illuminance:float"
    
    options = PipelineOptions(
        streaming=True,
        project=project,
        region=region,
        staging_location="gs://pb-temp-gcs/files", #change to your bucket
        temp_location="gs://pb-temp-gcs/temp" #change to your bucket
    )

    p = beam.Pipeline(DataflowRunner(), options=options)

    pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
                | beam.Map(json.loads)
                | beam.Map(lambda x: {"timestamp":to_timezone(x['timestamp']), "temp":x['temp'], "humidity":x['humidity'], 
                                      "pressure":x['pressure'], "illuminance":x['illuminance']}))

    pubsub | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=schema,
                                  create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND)

    return p.run()

In [8]:
try:
    pipeline = streaming_pipeline(project)
    print("\n PIPELINE RUNNING \n")
except (KeyboardInterrupt, SystemExit):
    raise
except:
    print("\n PIPELINE FAILED")
    traceback.print_exc()

INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.36.0/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpre0jp0pg', 'apache-beam==2.36.0', '--no-deps', '--no-binary', ':all:']


INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.36.0/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpre0jp0pg', 'apache-beam==2.36.0', '--no-deps', '--only-binary', ':all:', '--python-version', '37', '--implementation', 'cp', '--abi', 'cp37m', '--platform', 'manylinux1_x86_64']
INFO:apache_beam.runners.portability.stager:Staging binary distribution of the SDK from PyPI: apache_beam-2.36.0-cp37-cp37m-manylinux1_x86_64.whl
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.36.0
INFO:root:Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python37-fnapi:2.36.0
INFO:root:Python SDK container image set to "gcr.io/cloud-dataflow/v1beta3/python37-fnapi:2.36.0" for Docker environment
INFO:apache_beam.int


 PIPELINE RUNNING 



## Remember to cancel the running pipelines.

In [None]:
pipeline.cancel()