<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/data-architecture.png?raw=true" />

# Streaming Data Pipeline for a Real-Time Dashboard with Dataflow

## Overview
You own a fleet of New York City taxi cabs and are looking to monitor how well your business is doing in real-time. You build a streaming data pipeline to capture taxi revenue, passenger count, ride status, and much more, and then visualize the results in a management dashboard.

## Objectives

- Create a Dataflow job from a template
- Subscribe to a Pub/Sub topic
- Stream a Dataflow pipeline into BigQuery
- Monitor a Dataflow pipeline in BigQuery
- Analyze results with SQL
- Visualize key metrics in Looker Studio

## Task 1. Source a pre-created Pub/Sub topic and create a BigQuery dataset

We'll be using an extract of the [NYC Taxi & Limousine Commission’s open dataset](https://data.cityofnewyork.us/).

In **Cloud Shell**, run the following command to create the Pub/Sub topic.

In [None]:
cd ~

# Install sdk
sudo apt-get install google-cloud-sdk

# Install sdk
sudo apt-get install jq -y

# Download Source Data
gsutil cp gs://cloud-training/bdml/taxisrcdata/realtime1000.json  realtime1000.json
 
# Create PubSub Topic
gcloud pubsub topics create taxirides-realtime --message-retention-duration=7d

# Send messages to PubSub
jq -c '.[]' realtime1000.json | xargs -t -0  -d '\n'  -I  {} gcloud pubsub topics publish taxirides-realtime --message {}

In [None]:
bq --location=us-west1 mk taxirides

Run this command to create the taxirides.realtime table (empty schema that you will stream into later).

In [None]:
bq --location=us-west1 mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime

<img src="images/10.png?raw=true" />

## Task 2. Create a Cloud Storage bucket

In Cloud Shell, run the following command to create the bucket

In [None]:
gcloud storage buckets create gs://qwiklabs-gcp-01-62340484a253 --location=US

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/20.png?raw=true" />

## Task 3. Set up a Dataflow Pipeline

In Cloud Shell, run the following command to create a dataflow job

In [None]:
gcloud dataflow jobs run streaming-taxi-pipeline  \
--gcs-location gs://dataflow-templates-us-central1/latest/PubSub_to_BigQuery  \
--region us-central1  \
--max-workers 2  \
--num-workers 1  \
--staging-location gs://qwiklabs-gcp-01-62340484a253/tmp/  \
--parameters inputTopic=projects/qwiklabs-gcp-01-62340484a253/topics/taxirides-realtime,outputTableSpec=qwiklabs-gcp-01-62340484a253:taxirides.realtime

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/40.png?raw=true" />

## Task 4. Analyze the taxi data using BigQuery

In the Query Editor inside BigQuery, type the following, and then click Run:

In [None]:
SELECT * FROM taxirides.realtime LIMIT 10

**Note**: If no records are returned, wait another minute and re-run the above query (Dataflow takes 3-5 minutes to setup the stream).

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/50.png?raw=true" />

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/60.png?raw=true" />

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/70.png?raw=true" />

## Task 5. Perform aggregations on the stream for reporting

Copy and paste the following query, and then click Run.

In [None]:
WITH streaming_data AS (
SELECT
  timestamp,
  TIMESTAMP_TRUNC(timestamp, HOUR, 'UTC') AS hour,
  TIMESTAMP_TRUNC(timestamp, MINUTE, 'UTC') AS minute,
  TIMESTAMP_TRUNC(timestamp, SECOND, 'UTC') AS second,
  ride_id,
  latitude,
  longitude,
  meter_reading,
  ride_status,
  passenger_count
FROM
  taxirides.realtime
ORDER BY timestamp DESC
LIMIT 1000
)
# calculate aggregations on stream for reporting:
SELECT
 ROW_NUMBER() OVER() AS dashboard_sort,
 minute,
 COUNT(DISTINCT ride_id) AS total_rides,
 SUM(meter_reading) AS total_revenue,
 SUM(passenger_count) AS total_passengers
FROM streaming_data
GROUP BY minute, timestamp

**Note:** Ensure Dataflow is registering data in BigQuery before proceeding to the next task.

The result shows key metrics by the minute for every taxi drop-off.

Click **Save > Save query**.

In the Save query dialog, in the **Name** field, type **My Saved Query**.

Click **Save**.

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/80.png?raw=true" />

## Task 6. Stop the Dataflow Job

In this task, you stop the Dataflow job to free up resources for your project.

In the Cloud console, in the **Navigation menu**, click **Dataflow**.

Click the **streaming-taxi-pipeline**, or the new job name.

Click **Stop**, and then select **Cancel > Stop Job**.

## Task 7. Create a real-time dashboard

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/90.png?raw=true" />

## Task 8. Create a time series dashboard

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/100.png?raw=true" />

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/110.png?raw=true" />

<img src="https://github.com/thecodemancer/Creating-a-Streaming-Data-Pipeline-for-a-Real-Time-Dashboard-with-Dataflow/blob/35d23d0eefcc5e390b91952bd52047a5ed2c8b31/images/120.png?raw=true" />


## Further reading

- [jq](https://stedolan.github.io/jq/)
- [Pub/Sub](https://cloud.google.com/pubsub/)
- [Dataflow](https://cloud.google.com/dataflow/)
- [Cloud Storage](https://cloud.google.com/storage/)
- [BigQuery](https://cloud.google.com/bigquery/)
- [Looker Studio](https://cloud.google.com/looker-studio)


---
If you made it this far, follow [David Regalado](https://beacons.ai/davidregalado) for more code!