# GCP Big Data Analytics Pipeline

We are going to develope GCP data analytics pipelie
with Batch Processing
and Real-Time data streaming

<img src="images/pipeline.png">

In [1]:
# !pip install google-cloud
# !pip install google-cloud-pubsub
# !pip install google.cloud_bigquery
# !pip install google.cloud_storage

from google.oauth2 import service_account #For GCP Account connection
from google.cloud import pubsub_v1 # For PubSub Client
from google.cloud import bigquery # For BigQuery Client
from google.cloud import storage # For Cloud Storage Client

import json # For Message syntax
import os
import glob
import time
import concurrent.futures
import subprocess

## Setup Configuration

In [2]:
def load_configuration(file):
    try:
        conf = open(file).read()
        conf = json.loads(conf)
        values = list(conf.values())
        if None in values:
            print("Please confirm all fields are mentioned in the credential files! Try again!")
        return conf
    except:
        print("Errro Occurred! Please check if file is available!")

In [3]:
conf = load_configuration("./Configuration/conf.json")
cred = service_account.Credentials.from_service_account_file(conf["cred_file"])
project_id = conf["project_id"]
project_name = conf["project_name"]
topic_name = conf["topic_name"]
bucket_name = conf["bucket_name"]
subscription_name = conf["subscription_name"]
dataset_name = conf["dataset_name"]
table_name = conf["table_name"]
schema = conf["schema"]
job_name = conf["job_name"]

# Building Pipeline components

## Storage Bucket

In [4]:

#Creating a Cloud Storag Bucket:
storage_client = storage.Client(project=project_id,credentials=cred)
bucket = storage_client.create_bucket(bucket_name)
print('Bucket {} created.'.format(bucket.name))


Bucket aws-review-data created.


<img src="images/storage.png">

## PubSub Topic

In [5]:

#Creating PubSub Topic:
publisher = pubsub_v1.PublisherClient(credentials=cred)
topic_path = publisher.topic_path(project_id, topic_name)
topic = publisher.create_topic(topic_path)
print('Topic created: {}'.format(topic))


Topic created: name: "projects/gcp-bigdata-analytics-pipeline/topics/gcp-topic"



<img src="images/topic.png">

## PubSub Subscription

In [6]:

#Creating PubSub Subscription:
subscriber = pubsub_v1.SubscriberClient(credentials = cred)
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(project_id, subscription_name)
subscription = subscriber.create_subscription(subscription_path, topic_path)
print('Subscription created: {}'.format(subscription))


Subscription created: name: "projects/gcp-bigdata-analytics-pipeline/subscriptions/gcp-subscription"
topic: "projects/gcp-bigdata-analytics-pipeline/topics/gcp-topic"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}
expiration_policy {
  ttl {
    seconds: 2678400
  }
}



<img src="images/subscription.png">

## Big Query Dataset

In [7]:
#Creating a BigQuery Dataset:
client = bigquery.Client(project=project_id, credentials=cred)
dataset_id = client.project+"."+dataset_name
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset)
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

Created dataset gcp-bigdata-analytics-pipeline.AWS_Product_Review


## Load Schema

In [8]:
#Import Table schema from a JSON file:
table_schema_data = json.loads(open(schema).read())
table_schema = []
for line in table_schema_data:
    col = bigquery.SchemaField(name=line['name'], field_type=line['type'], mode=line['mode'], description=line['description'])
    table_schema.append(col)

## Create Big Query Table

In [9]:

#Creating a BigQuery Table under the Datset created:
table_id = project_id+"."+dataset_name+"."+table_name
table = bigquery.Table(table_id, schema=table_schema)
table = client.create_table(table)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


Created table gcp-bigdata-analytics-pipeline.AWS_Product_Review.Reviews


<img src="images/big query table.png">

# Real-Time streaming data

We will now simulate the real-time data capture at big query database using GCP pub/sub and data flow

In [10]:
def getJobId(result):
    r_list = ["\\n", "\\r", "\"", " ", "{", "}"]
    data = result
    for item in r_list:
        data = str(data).replace(item, "")
    job_id = data.split(",")[2].split(":")[1]
    return job_id

In [11]:

#Creating a Dataflow for PubSub to Big Query using template:
command = "gcloud dataflow jobs run {} --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery ".format(job_name)
command += "--parameters "
command += "outputTableSpec={}:{}.{},".format(project_id, dataset_name, table_name)
command += "inputSubscription=projects/{}/subscriptions/{} --format={}".format(project_name, subscription_name, "json")

result = subprocess.check_output(command, shell=True)
job_id = getJobId(result)

print("Job: {} is created successfully!".format(job_id))

Job: 2019-12-23_16_31_24-8684177415575336791 is created successfully!


In [12]:
#Create an event/message to publish

message_data = {
                "marketplace":"GCP-Pipeline",
                "customer_id":18778586,
                "review_id":"RDIJS7QYB6XNR",
                "product_id":"B00EDBY7X8",
                "product_parent":122952789,
                "product_title":"Sample Message",
                "product_category": "Toys",
                "star_rating":5,
                "helpful_votes":0,
                "total_votes":0,
                "vine": "False",
                "verified_purchase": "True",
                "review_headline": "Five Stars",
                "review_body": "Excellent!!!",
                "review_date": "2015-08-31"
                }

In [13]:
#Formatting message data before publishing:
message_data = json.dumps(message_data)
message_data = message_data.encode('utf-8')

#Publishing a message on the PubSub Topic Created:
response = publisher.publish(topic_path, message_data , origin='python-sample')

<img src="images/running job.png">

<img src="images/real-time-data.png">

# Batch Processing

Consider we already have historical data for our application which we need to analyse
For that we have to dump it to big query database

We have the data located on Local Machine, we can then use Google cloud SDK bq function to load data directly to big query database

In [15]:
input_folder = r"./Data/"
file_type = ".tsv.gz"

NUM_WORKERS = int(os.environ['NUMBER_OF_PROCESSORS'])
print("There are {} number of processors".format(NUM_WORKERS))

files = glob.glob(input_folder+"*.tsv.gz")
print("Total Number of file to upload: {}".format(len(files)))

There are 8 number of processors
Total Number of file to upload: 2


In [16]:
def uploadFileToBigQueryTable(filename):
    global project_id
    path, file = os.path.split(filename)
    print("Uploading file {} on big query \n".format(file))
    command = "bq --location=US load --null_marker=NULL --skip_leading_rows 1 --quote \"\" "
    command += "-E UTF-8 --source_format=CSV --field_delimiter \\t {}.{} {}".format(dataset_name, table_name, filename)
    
    setup_command = "gcloud config set project {}".format(project_id)
    result = subprocess.check_output(setup_command, shell=True)
    result = subprocess.check_output(command, shell=True)

In [17]:
start = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
    futures = {executor.submit(uploadFileToBigQueryTable, file) for file in files}
    concurrent.futures.wait(futures)
    
executor.shutdown()
end = time.time()
print("Total Time for uploading of files is {} secs!".format(end-start))

Uploading file amazon_reviews_us_Gift_Card_v1_00.tsv.gz on big query 
Uploading file amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz on big query 


Total Time for uploading of files is 41.91423273086548 secs!


<img src="images/bulk-data.png">

# Data Analytics

As our data is available in the Big Query table, we can connect to it through any analytics tool like GCP Data Explorer, Tableau, Power BI and so on.

If the connection to big query is live then every time we capture any new data through our real-time data streaming data flow, we can see the related anallytics on the dashboard instantly

# Teardown the Infrastrucuture

In [18]:
#Deleting table

client = bigquery.Client(project=project_id, credentials=cred)
client.delete_table(table_id, not_found_ok=True)
print("Deleted table '{}'.".format(table_id))

Deleted table 'gcp-bigdata-analytics-pipeline.AWS_Product_Review.Reviews'.


In [19]:
#Deleting Dataset

client = bigquery.Client(project=project_id, credentials=cred)
client.delete_dataset(
    dataset_id, delete_contents=True, not_found_ok=True
)  # Make an API request.

print("Deleted dataset '{}'.".format(dataset_id))

Deleted dataset 'gcp-bigdata-analytics-pipeline.AWS_Product_Review'.


In [20]:
#Deleting Running data flow

command = "gcloud dataflow jobs cancel {} --project {}".format(job_id, project_id)
subprocess.check_output(command, shell=True)

b''

In [21]:
# Deleting data storage

command = "gsutil rm -r gs://{}".format(bucket_name)
response = subprocess.check_output(command, shell=True)
print(response)

b''


In [22]:
#Deleting PubSub Subscription

subscriber = pubsub_v1.SubscriberClient(credentials = cred)
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(project_id, subscription_name)

subscription = subscriber.delete_subscription(subscription_path)
print('Subscription deleted: {}'.format(subscription_path))

Subscription deleted: projects/gcp-bigdata-analytics-pipeline/subscriptions/gcp-subscription


In [23]:
# Deleting Pubsub topic

publisher = pubsub_v1.PublisherClient(credentials=cred)
topic_path = publisher.topic_path(project_id, topic_name)
publisher.delete_topic(topic_path)
print("Topic deleted: {}".format(topic_path))

Topic deleted: projects/gcp-bigdata-analytics-pipeline/topics/gcp-topic
