In [None]:
import pandas as pd
import time
import datetime
from google.cloud import pubsub
import json
import apache_beam as beam
import os
print(beam.__version__)

In [None]:
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
RUNNER = "Dataflow"
PROJECT = 'ksalama-gcp-playground'
DATASET = 'playground_ds'
TABLE = 'taxi_trips'
STG_BUCKET = 'stagging-ksalama-gcs-cloudml'
REGION = 'europe-west1'
TOPIC = 'taxi-trips'
SUBSCRIPTION='taxi-trips-sub'

## Submit Dataflow Stream Processing Job

In [None]:
pubsub_subscription = "projects/{}/subscriptions/{}".format(PROJECT,SUBSCRIPTION)
pubsub_topic = "projects/{}/topics/{}".format(PROJECT,TOPIC)

print(pubsub_subscription)
print(DATASET,TABLE)

def process_events(message):
    data_row = message.data
    #preform all yor data processing, transformation, calling external APIs, etc.
    return data_row
  
def run_taxi_trips_pipeline():
    
    job_name = 'ingest-taxi-trips-{}'.format(datetime.datetime.now().strftime('%y%m%d-%H%M%S'))
    print 'Launching Dataflow job {}'.format(job_name)
    print 'Check the Dataflow jobs on Google Cloud Console...'

    STG_DIR = 'gs://{}/taxi-fare'.format(STG_BUCKET)

    options = {
        'staging_location': os.path.join(STG_DIR, 'tmp', 'staging'),
        'temp_location': os.path.join(STG_DIR, 'tmp'),
        'job_name': job_name,
        'project': PROJECT,
        'streaming': True,
        'teardown_policy': 'TEARDOWN_ALWAYS',
        'no_save_main_session': True
      }


    opts = beam.pipeline.PipelineOptions(flags=[], **options)
    
    pipeline = beam.Pipeline(RUNNER, options=opts)
      
    (
      pipeline | 'Read data from PubSub' >> beam.io.ReadStringsFromPubSub(subscription=pubsub_subscription) 
               | 'Process message' >> beam.Map(process_events) # filter, window, group, aggregate 
               | 'Write to BigQuery' >> beam.io.WriteToBigQuery(project=PROJECT, dataset=DATASET,table=TABLE)
    )

    pipeline.run()

In [None]:
run_taxi_trips_pipeline()

## Generate Data Points

In [None]:
csv_header_names = ['trip_datetime', 'pickup_dayofweek', 'pickup_hour','pickup_lon', 'pickup_lat', 'dropoff_lon', 'dropoff_lat', 'passenger_count', 'fare_amount']
header_names = ['trip_datetime', 'pickup_lon', 'pickup_lat', 'dropoff_lon', 'dropoff_lat', 'passenger_count', 'fare_amount']

dataset = pd.read_csv('data/train-data.csv', header=None, names=csv_header_names)[header_names]
def get_data_points(count=10):
  
    data_points = []
  
    instances = dataset.sample(n=count).values
  
    for row in instances:
        data_point = dict()
    
    for i in range(len(row)):
        data_point[header_names[i]] = row[i]
     
    data_points.append(data_point)
      
    return data_points

## Send Data Points to Pub/Sub

In [None]:
batch_size = 10
iterations = 10
sleep_time = 1

client = pubsub.Client()
topic = client.topic(TOPIC)

if not topic.exists():
    print ('Creating pub/sub topic {}...'.format(TOPIC))
    topic.create()

print ('Pub/sub topic {} is up and running'.format(TOPIC))
print("")

for i in range(iterations):

    data_points = get_data_points(batch_size)
    
    for data_point in data_points:

        source_id = str(abs(hash(str(data_point))) % (10 ** 10))
        source_timestamp = datetime.datetime.now().strftime(TIME_FORMAT)
        message = json.dumps(data_point)
        topic.publish(message=message, source_id = source_id, source_timestamp=source_timestamp)

    print("Batch {} was sent. Last Message was: {}".format(i, message))
    print("")

    time.sleep(sleep_time)

print("Done!")

## Consume PubSub Topic 

In [None]:
client = pubsub.Client()
topic = client.topic(TOPIC)
subscription = topic.subscription(SUBSCRIPTION)
message = subscription.pull()

# print(message[0][1].source_timestamp)
print("source_id", message[0][1].attributes["source_id"])
print("source_timestamp:", message[0][1].attributes["source_timestamp"])
print("")
print(message[0][1].data)