In [None]:
from azure.eventhub import EventHubConsumerClient
from google.cloud import bigquery
import json
from google.api_core.exceptions import NotFound

# Event Hub connection string and related configuration
EVENTHUB_CONN_STR = 'Endpoint=sb://factored-datathon.servicebus.windows.net/;SharedAccessKeyName=datathon_group_5;SharedAccessKey=bhbWAqLTLi5/DASs+HOCYuwO0Kf5QRS4x+AEhMZauUE=;EntityPath=factored_datathon_amazon_reviews_5'
CONSUMER_GROUP = 'inteligencia_colectiva' 
EVENTHUB_NAME = 'factored_datathon_amazon_reviews_5'

# Configuration variables for Google BigQuery
review_keys = ["asin", "overall", "reviewText", "reviewerID", "reviewerName", "style", "summary", "unixReviewTime", "verified", "vote"]
bigquery_client = bigquery.Client()
project_id = "datathon-intel-colectiva"
dataset_id = "ml_data"
table = "reviews"
table_id = "{}.{}.{}".format(project_id, dataset_id, table)

streaming_data = []
batch_size = 3000  # Number of events to save in one batch
event_count = 0
batch_count = 0
total = 0

def normalize_json(parsed_json):
    norm_json = {key: parsed_json[key] if key in parsed_json else "" for key in review_keys}
    if norm_json['vote'] == "":
        norm_json['vote'] = 1
    return norm_json

def load_bq(parsed_json_list):
    try:
        table = bigquery_client.get_table(table_id)
    except NotFound:
        # If the table does not exist, create it
        schema = [
            bigquery.SchemaField(name, 'STRING') for name in review_keys
        ]
        table = bigquery.Table(table_id, schema=schema)
        table = bigquery_client.create_table(table)

    # Insert data into the table
    errors = bigquery_client.insert_rows_json(table, parsed_json_list)

    if not errors:
        return f"Data uploaded successfully to {dataset_id}.{table_id}"
    else:
        print(errors)
        return "Errors occurred while uploading data to BigQuery"

def upload_batch():
    global streaming_data, event_count, batch_count, total

    if streaming_data:
        load_bq(streaming_data)
        print('Batch uploaded')
        total += event_count
        event_count = 0
        batch_count += 1
        streaming_data = []

def on_event(partition_context, event):
    global event_count, streaming_data

    try:
        parsed_json = json.loads(event.body_as_str())
    except json.JSONDecodeError:
        print("Failed to parse JSON data from the event body.")
        return

    row = normalize_json(parsed_json)
    print(row)
    streaming_data.append(row)
    event_count += 1

    if event_count >= batch_size:
        upload_batch()

# Create the EventHubConsumerClient
client = EventHubConsumerClient.from_connection_string(
    EVENTHUB_CONN_STR, consumer_group=CONSUMER_GROUP, eventhub_name=EVENTHUB_NAME
)

try:
    with client:
        # This will start the consumer to receive events
        client.receive(on_event=on_event, starting_position="-1")
except KeyboardInterrupt:
    # Upload any remaining events before exiting
    upload_batch()
    print("Receiving has stopped.")
