<a href="https://colab.research.google.com/github/pdeyhim/google-demo/blob/master/DatalakeDemo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Summary




The goal of this demo is to showcase an actual customer use-case to leverage BigQuery as the single source of truth for streaming and batch data sources. For this use-case customer’s data sources (streaming and batch) had a common schema. In order to make the pipeline simple and show the power of Dataflow, the architecture converts batch files to streaming records. The following depicts the overall architecture
[Notebook on Github](https://github.com/pdeyhim/google-demo/blob/master/DatalakeDemo.ipynb)



---



# Parameters

**Before getting started lets fill in some of the variables that we need to execute some of the codes later in this notebook. The fields with "Insert Text Here" are required fields. **

**project_name should be set to your qwiklab project name**

**[optional] dataset_name: your dataset name. Pleas keep this name short and simple**

**[optional] table_name: your bigquery table name. Please keep this name short and simple**

In [0]:
project_name = ''  #@param {type: "string"}
gcs_bucket_name = project_name + '_demo' 
dataset_name = 'demo'  #@param {type: "string"}
table_name = 'random_actors'  #@param {type: "string"}
data_source_topic_name = table_name+"_incoming_v1"


![alt text](https://raw.githubusercontent.com/pdeyhim/google-demo/master/images/BQColabDiagrams/arch.jpg)

**A:** Initially  Dataflow sample data generators will generate two types of data: streaming and batch data. Streaming dataflow job publishes records to pubsub. Batch dataflow job stores batch data in GCS.

**B:** Batch to streaming conversion dataflow job scans GCS for any new batch files generated by the batch dataflow job and converts each line to a record that later gets published to the streaming pubsub topic. 

**C:** Data ingest to BigQuery dataflow job reads data from pubsub topic that has both streaming and batch records and publishes the records to BigQuery.


# Installing dependencies

We wil begin with installing our dependencies

In [0]:
%%bash
pip install google-cloud-pubsub

In [0]:
%%bash
apt-get -y install jq

In [0]:
from google.colab import auth
from google.cloud import pubsub_v1
from google.cloud import bigquery
from google.cloud import storage

import os
import subprocess
import time



---


# Authenticate Yourself

Before executing any code we need to authenticate with Google Cloud. User the credentials you were given bu Qwiklab to authenticate. Once authenticate you'll be presented with an authentication code. You'll be asked to copy/paste the given code to the text box provided below

In [0]:
auth.authenticate_user()



---



# Create Google Cloud Resources

We need to create a few Google Cloud resources before staring our Dataflow Jobs: Google Cloud Storage bucket, PubSub topic, and BigQuery table


Creating PubSub Topic

In [0]:
topic_name = 'projects/{project_id}/topics/{topic}'.format(
    project_id=project_name,
    topic=data_source_topic_name, 
)
publisher = pubsub_v1.PublisherClient()

try:
  publisher.create_topic(topic_name)
  print("Created PubSub Topic: {}".format(data_source_topic_name))
except Exception as e: print(e)

We're using BigQuery as our data lake. For this specific dataset we're creating a table that will hold raw data

In [0]:
bq_client = bigquery.Client(project=project_name)
bq_client.create_dataset(dataset=dataset_name)

In [0]:
bq_client.query('''CREATE OR REPLACE TABLE `{dataset}.{table}` (ts timestamp, user_id INT64, actor STRING, location STRUCT<zip STRING, longitude	 STRING, latitude	STRING>, v1 FLOAT64, v2 FLOAT64, v3 FLOAT64, v4 FLOAT64, v5 FLOAT64, v6 FLOAT64) 
  PARTITION BY DATE(ts)'''.format(dataset=dataset_name,table=table_name)).result()

Creating GCS Bucket that will hold some configurations and also incoming batch data

In [0]:
storage_client = storage.Client(project=project_name)
storage_client = storage.Client(project=project_name)

try: 
  storage_client.create_bucket(gcs_bucket_name)
  print("bucket created")
except Exception as e: print(e)



---




# Generating Data


In this section we'll generate random sample data in shape of json records both in streaming and batch format using Dataflow clusters.



![alt text](https://raw.githubusercontent.com/pdeyhim/google-demo/master/images/BQColabDiagrams/datagen.jpg
)

Defining the schema of generated data for both Streaming and Batch data

In [0]:
bucket = storage_client.get_bucket(gcs_bucket_name)
streaming_actor_schema = bucket.blob("datagen-schema/streaming-actor.json")
batch_actor_schema = bucket.blob("datagen-schema/batch-actor.json")

In [0]:
streaming_actor_schema.upload_from_string('''
[
  {"name": "user_id","class":"foreign-key","size": 10000,"skew":0},
  {"name": "actor", "class":"string", "dist":{"streaming-actor":1}},
  {"name": "ts", "class":"current-time"},
  {"name":"v1", "class":"int", "min":1, "max":2},
  {"name": "v2", "class": "random-walk"},
  {"name": "v3", "class": "random-walk","s": 1},
  {"name": "v4", "class": "random-walk", "precision": {"class": "gamma", "dof": 1}},
  {"name": "v5", "class": "gamma", "alpha": 0.1, "beta": 0.1},
  {"name": "v6", "class": "random-walk", "mean":100, "sd":5},
  {"name": "location","class": "zip","near": "37.77,-122.41","milesFrom": 1, "fields":"latitude, longitude, zip"}
]
''')

In [0]:
batch_actor_schema.upload_from_string('''
[
  {"name": "user_id","class":"foreign-key","size": 10000,"skew":0},
  {"name": "actor", "class":"string", "dist":{"batch-actor":1}},
  {"name": "ts", "class":"current-time"},
  {"name": "v1", "class":"sin"},
  {"name": "v2", "class": "random-walk"},
  {"name": "v3", "class": "random-walk","s": 1},
  {"name": "v4", "class": "random-walk", "precision": {"class": "gamma", "dof": 1}},
  {"name": "v5", "class": "gamma", "alpha": 0.1, "beta": 0.1},
  {"name": "v6", "class": "random-walk", "mean":100, "sd":5},
  {"name": "location","class": "zip","near": "37.77,-122.41","milesFrom": 1, "fields":"latitude, longitude, zip"}
]
''')

Uploading datagenerator schemas to GCS

In [0]:
for blob in bucket.list_blobs(prefix="datagen-schema"):
  print(blob)

Downloading data generator code

In [0]:
%%bash 
wget https://www-us.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.tar.gz; tar -xzf apache-maven-3.6.0-bin.tar.gz

In [0]:
%%bash
git clone https://github.com/pdeyhim/log-synth.git /content/log-synth

In [0]:
%%bash
cd /content/log-synth

/content/apache-maven-3.6.0/bin/mvn  install -Dmaven.test.skip=true

In [0]:
%%bash
git clone https://github.com/pdeyhim/datagenerator.git /content/datagenerator/

In [0]:
cd /content/datagenerator/

Starting streaming data generator Dataflow job

In [0]:
streaming_datagen_mvn_command = '''/content/apache-maven-3.6.0/bin/mvn compile exec:java \
 -Dexec.mainClass=com.google.datagenerator.SampleDataGeneratorStreaming \
 -Dexec.cleanupDaemonThreads=false \
 -Dexec.args=" \
 --project={project_name} \
 --stagingLocation=gs://{gcs_bucket_name}/dataflow/pipelines/sampler/staging \
 --tempLocation=gs://{gcs_bucket_name}/dataflow/pipelines/sampler/temp \
 --runner=DataflowRunner \
 --windowDuration=5s \
 --numShards=5 \
 --qps=100 \
 --schemaLocation=gs://{gcs_bucket_name}/datagen-schema/streaming-actor.json \
 --outputTopic=projects/{project_name}/topics/{data_source_topic_name} \
 --autoscalingAlgorithm=THROUGHPUT_BASED \
 --maxNumWorkers=3"
'''.format(gcs_bucket_name=gcs_bucket_name,project_name=project_name,data_source_topic_name=data_source_topic_name)

out = subprocess.run(streaming_datagen_mvn_command,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out.stdout.decode().split("\n")

Starting batch data generator Dataflow job

In [0]:
batch_datagen_mvn_command = '''/content/apache-maven-3.6.0/bin/mvn compile exec:java \
 -Dexec.mainClass=com.google.datagenerator.SampleDataGeneratorBatch \
 -Dexec.cleanupDaemonThreads=false \
 -Dexec.args=" \
 --project={project_name} \
 --stagingLocation=gs://{gcs_bucket_name}/dataflow/pipelines/sampler/staging \
 --tempLocation=gs://{gcs_bucket_name}/dataflow/pipelines/sampler/temp \
 --runner=DataflowRunner \
 --windowDuration=5m \
 --numShards=5 \
 --qps=1000 \
 --schemaLocation=gs://{gcs_bucket_name}/datagen-schema/batch-actor.json \
 --outputDirectory=gs://{gcs_bucket_name}/dataflow-sampler/data/ \
 --outputFilenamePrefix=user-simulated-data- \
 --outputFilenameSuffix=.json \
 --autoscalingAlgorithm=THROUGHPUT_BASED \
 --maxNumWorkers=3"'''.format(gcs_bucket_name=gcs_bucket_name,project_name=project_name )

out = subprocess.run(batch_datagen_mvn_command,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out.stdout.decode().split("\n")

Once the commands above have successfully executed you should be able to view the progress of each Dataflow job in the consose: 

In [0]:
print("CONSOLE LINK: ","https://console.cloud.google.com/dataflow?project={project}".format(project=project_name))

# Converting Batch to Streaming Data



![alt text](https://raw.githubusercontent.com/pdeyhim/google-demo/master/images/BQColabDiagrams/batch2stream.jpg)

In [0]:
%%bash 
git clone https://github.com/GoogleCloudPlatform/DataflowTemplates.git /content/DataflowTemplates

In [0]:
cd /content/DataflowTemplates

In [0]:
 gcs_to_pubsub = '''/content/apache-maven-3.6.0/bin/mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.TextToPubsubStream \
-Dexec.args=" \
--project={project_name} \
--stagingLocation=gs://{gcs_bucket_name}/dataflow/pipelines/gcstopubsub/staging \
--tempLocation=gs://{gcs_bucket_name}/dataflow/pipelines/gcstopubsub/temp2 \
--runner=DataflowRunner \
--inputFilePattern=gs://{gcs_bucket_name}/dataflow-sampler/data/user-simulated-data-* \
--outputTopic=projects/{project_name}/topics/{data_source_topic_name}"'''.format(gcs_bucket_name=gcs_bucket_name,project_name=project_name,data_source_topic_name=data_source_topic_name)

out = subprocess.run(gcs_to_pubsub,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out.stdout.decode().split("\n")

# Streaming Data to BigQuery

![alt text](https://raw.githubusercontent.com/pdeyhim/google-demo/master/images/BQColabDiagrams/pubsubtobq.jpg)

Once both streaming and batch data is replicated to PubSub, the final dataflow job is responsible for ensuring data is copied to our raw BigQuery

In [0]:
pubsub_to_bigquery= '''/content/apache-maven-3.6.0/bin/mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
-Dexec.args=" \
--project={project_name} \
--stagingLocation=gs://{gcs_bucket_name}/dataflow/pipelines/pubsubtobq/staging \
--tempLocation=gs://{gcs_bucket_name}/dataflow/pipelines/pubsubtobq/temp2 \
--runner=DataflowRunner \
--inputTopic=projects/{project_name}/topics/{data_source_topic_name} \
--outputTableSpec={bigquery_tableid}"'''.format(gcs_bucket_name=gcs_bucket_name,project_name=project_name,data_source_topic_name=data_source_topic_name,bigquery_tableid=project_name+":"+dataset_name+"."+table_name)

out = subprocess.run(pubsub_to_bigquery,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out.stdout.decode().split("\n")

# Exploring Data

he dataflow jobs that you've just launched are inserting data into BigQuery table in real-time. Execute the command below to run a SQL query to see the data being inserted. This query simply gives us the unique values for the "actor" column. You should see two unique values: batch-actor and streaming-actor. This cell will continue running until two unique value is reported by BigQuery


In [0]:
df = bq_client.query("SELECT DISTINCT actor FROM `{dataset}.{table}`".format(dataset=dataset_name,table=table_name)).to_dataframe()

while (len(df) <= 1):
  df = bq_client.query("SELECT DISTINCT actor FROM `{dataset}.{table}`".format(dataset=dataset_name,table=table_name)).to_dataframe()
  time.sleep(10)
df.head(10)

# Data Transformation: Aggregation Dataflow



Next we're going to use another dataflow job that will use the same pubsub topic but instead of inserting every record into BigQuery, the dataflow job will aggregate the incoming records on "v1_total" column. This demonstrates the ability to create a real-time aggregated column. Generally the aggregation/summary tables are created using some ETL jobs that may run in batch timeframes (i.e every 10 min) which inherently means our summary table is always stale. This job ensures data is always aggregated with the most recent data.

We'll start by creating a summary table in BigQuery that our dataflow job will insert aggregated results to.

In [0]:
agg_table= table_name + "_agg"
bq_client.query('''CREATE OR REPLACE TABLE `{dataset}.{agg_table}` (user_id STRING, v1_total INT64, ts TIMESTAMP) 
  PARTITION BY DATE(ts)'''.format(dataset=dataset_name,agg_table=agg_table)).result()

In [0]:
cd /content/datagenerator/

In [0]:
 realtime_aggregation_job = '''/content/apache-maven-3.6.0/bin/mvn compile exec:java \
-Dexec.mainClass=com.google.datagenerator.RealtimeAggregationExample \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project={project_name} \
--stagingLocation=gs://{gcs_bucket_name}/dataflow/pipelines/realtimeagg/staging \
--tempLocation=gs://{gcs_bucket_name}/dataflow/pipelines/realtimeagg/temp \
--runner=DataflowRunner \
--inputTopic=projects/{project_name}/topics/{data_source_topic_name} \
--bigQueryTable={dataset}.{agg_table}"'''.format(gcs_bucket_name=gcs_bucket_name,project_name=project_name,data_source_topic_name=data_source_topic_name,dataset=dataset_name,agg_table=agg_table)

out = subprocess.run(realtime_aggregation_job,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out.stdout.decode().split("\n")

Lets take a look at our summary table. The following cell will continue running until a few records are being reported by BigQuery

In [0]:
df_agg = bq_client.query("SELECT * FROM `{dataset}.{table}` ORDER BY v1_total DESC LIMIT 10".format(dataset=dataset_name,table=agg_table)).to_dataframe()
while (len(df_agg) <= 1):
  df_agg = bq_client.query("SELECT * FROM `{dataset}.{table}` ORDER BY v1_total DESC LIMIT 10".format(dataset=dataset_name,table=agg_table)).to_dataframe()
  time.sleep(10)
df_agg.head(10)

# Deleting Google Cloud Resources

Set this to YES before executing the proceeding cells

In [0]:
DELETE = 'NO' #@param ["YES", "NO"] 

Stopping Dataflow jobs

In [0]:
%%bash -s "$project_name" "$DELETE"
if [ $2 = "YES" ] ; then 
  for i in `gcloud --project $1 --format json dataflow jobs list --status active | jq  -r '.[] | .id'` ; do gcloud --project $1 dataflow jobs cancel $i  ; done 
fi

Deleting PubSub Topic

In [0]:
if (DELETE in "YES"):
  try:
    publisher.delete_topic(topic_name)
  except Exception as e: print(e)

Delete BigQuery tables and dataset



In [0]:
if (DELETE in "YES"):
  try:
    bq_client.delete_table("{project}.{dataset}.{table}".format(project=project_name,dataset=dataset_name,table=table_name))
    bq_client.delete_table("{project}.{dataset}.{table}".format(project=project_name,dataset=dataset_name,table=agg_table))
    bq_client.delete_dataset(dataset=dataset_name)
  except Exception as e: print(e)

Delete GCS objects and bucket

In [0]:
if (DELETE in "YES"):
  try:
    print("Deleting blobs")
    for blob in bucket.list_blobs():
      blob.delete()
    print("Deleting bucket")
    bucket.delete(force=True)
  except Exception as e: print(e)