In [2]:
# !pip install google-cloud
# !pip install google-cloud-pubsub
# !pip install google.cloud_bigquery
# !pip install google.cloud_storage

In [51]:
from google.oauth2 import service_account #For GCP Account connection
from google.cloud import pubsub_v1 # For PubSub Client
from google.cloud import bigquery # For BigQuery Client
from google.cloud import storage # For Cloud Storage Client
import json # For Message syntax
import os

In [4]:
#Setting up the credential file
cred = service_account.Credentials.from_service_account_file('../Configuration/INFO7250-b2d76e3086d3.json')

In [54]:
#Setting up the Configuration Variables:
project_id = "info7250"
project_name = "info7250"
topic_name = "info7250-topic"
bucket_name = "aws-review-data"
subscription_name = "info7374-subscription"
dataset_name = "AWS_Product_Review"
table_name = "Reviews"
job_name = "INFO7250_DataFlow"

In [11]:

#Creating a Cloud Storag Bucket:
storage_client = storage.Client(project=project_id,credentials=cred)
bucket = storage_client.create_bucket(bucket_name)
print('Bucket {} created.'.format(bucket.name))


Bucket aws-review-data created.


In [13]:

#Creating PubSub Topic:
publisher = pubsub_v1.PublisherClient(credentials=cred)
topic_path = publisher.topic_path(project_id, topic_name)
topic = publisher.create_topic(topic_path)
print('Topic created: {}'.format(topic))


Topic created: name: "projects/info7250/topics/info7250-topic"



In [14]:

#Creating PubSub Subscription:
subscriber = pubsub_v1.SubscriberClient(credentials = cred)
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(project_id, subscription_name)
subscription = subscriber.create_subscription(subscription_path, topic_path)
print('Subscription created: {}'.format(subscription))


Subscription created: name: "projects/info7250/subscriptions/info7374-subscription"
topic: "projects/info7250/topics/info7250-topic"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}
expiration_policy {
  ttl {
    seconds: 2678400
  }
}



In [17]:

#Creating a BigQuery Dataset:
client = bigquery.Client(project=project_id, credentials=cred)
dataset_id = client.project+"."+dataset_name
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset)
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))


Created dataset info7250.AWS_Product_Review


In [18]:
#Import Table schema from a JSON file:
schema = json.loads(open('../Configuration/Reviews').read())
table_schema = []
for line in schema:
    col = bigquery.SchemaField(name=line['name'], field_type=line['type'], mode=line['mode'], description=line['description'])
    table_schema.append(col)

In [19]:

#Creating a BigQuery Table under the Datset created:
table_id = project_id+"."+dataset_name+"."+table_name
table = bigquery.Table(table_id, schema=table_schema)
table = client.create_table(table)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))


Created table info7250.AWS_Product_Review.Reviews


gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --parameters \
inputSubscription=projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME,\
outputTableSpec=YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME

In [67]:
#Creating a Dataflow for PubSub to Big Query using template:
command = "gcloud dataflow jobs run {} --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery ".format(job_name)
command += "--parameters "
command += "outputTableSpec={}:{}.{},".format(project_id, dataset_name, table_name)
command += "inputSubscription=projects/{}/subscriptions/{}".format(project_name, subscription_name)


print(command)

os.system(command)

gcloud dataflow jobs run INFO7250_DataFlow --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery --parameters outputTableSpec=info7250:AWS_Product_Review.Reviews,inputSubscription=projects/info7250/subscriptions/info7374-subscription


0

In [69]:
#Considering you have already created Streaming Dataflow manually using GCP Console for PubSub Subscription to BiqQuery Table.
#Publish a real-time event/message on PubSub Topic.

message_data = {
                "marketplace":"Sunil", 
                "customer_id":18778586, 
                "review_id":"RDIJS7QYB6XNR", 
                "product_id":"B00EDBY7X8", 
                "product_parent":122952789, 
                "product_title":"Pramod",
                "product_category": "Toys",
                "star_rating":5, 
                "helpful_votes":0, 
                "total_votes":0, 
                "vine": False, 
                "verified_purchase": True, 
                "review_headline": "Five Stars", 
                "review_body": "Excellent!!!", 
                "review_date": "2015-08-31"
                }

In [70]:
#Formatting message data before publishing:
message_data = json.dumps(message_data)
message_data = message_data.encode('utf-8')

#Publishing a message on the PubSub Topic Created:
response = publisher.publish(topic_path, message_data , origin='python-sample')