In [None]:
#QUESTION 1 Data Ingestion

# In this case I'll write the solutions as if the data is STREAMED rather than batched, will comment on the README for possible swaps 

# INFRAESTRUCTURE REQUIREMENTS AND DETAILS

#Bigquery dataset and table setup

from google.cloud import bigquery

client = bigquery.Client()
dataset_id = "project-test-nw.RAW_TRAVEL"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "us-central1"
dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
print(f"Created dataset { dataset_id }")

#Create the input tables with clustering and partitioning (as requirements)

table_id = "project-test-nw.RAW_TRAVEL.RAW_INPUT_TRAVEL"

schema = [
    bigquery.SchemaField("region", "STRING"),
    bigquery.SchemaField("origin_coord", "GEOGRAPHY"),
    bigquery.SchemaField("destination_coord", "GEOGRAPHY"),
    bigquery.SchemaField("datetime", "TIMESTAMP"),
    bigquery.SchemaField("datasource", "STRING")
]

table = bigquery.Table(table_id, schema=schema)
table.clustering_fields = ["datetime", "origin_coord","destination_coord"]
table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="datetime",  # name of column to use for partitioning
    expiration_ms=7776000000,
)  # 90 days worth of partitioning
table = client.create_table(table)  # Make an API request.
print(
    "Created clustered and partitioned table {}".format(
        table_id
    )
)

#subscription creation, so there IS a pubsub to bigquery subscription without dataflow 
#that i will opt to exploit because i'm lazy, and also because it can handle tons of data
# Source: https://cloud.google.com/pubsub/docs/subscriber

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy

project_id = "project-test-nw"
topic_id = "input-topic"
dead_letter_topic_id="deadletter-input-topic"
subscription_id = "data-raw-travel"
bigquery_table_id = f"{project_id}.RAW_TRAVEL.RAW_INPUT_TRAVEL"

dead_letter_topic_path=f"projects/{project_id}/topics/{dead_letter_topic_id}"
max_delivery_attempts=5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
            "dead_letter_policy": dead_letter_policy
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")


#Create the deadletter subscription to the assigned table (GCP asumes the structure of the pub sub message as the bigquery table schema)
from google.cloud import pubsub_v1

bigquery_dead_table_id = f"{project_id}.RAW_TRAVEL.RAW_INPUT_TRAVEL_DEADLETTER"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, dead_letter_topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

bigquery_config = pubsub_v1.types.BigQueryConfig(
    table=bigquery_table_id, write_metadata=True
)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "bigquery_config": bigquery_config,
        }
    )

print(f"BigQuery subscription created: {subscription}.")
print(f"Table for subscription is: {bigquery_table_id}")






: 

In [None]:
#Service weekly (cloud FUNction)

#deploy this script to a HTTP cloud function so it can act as a service

import functions_framework
from google.cloud import bigquery

from google.oauth2 import service_account

@functions_framework.http
def cors_enabled_function(request):
    try:
        
        credentials = service_account.Credentials.from_service_account_file('path/to/file.json')

        project_id = "project-test-nw"
        bigquery_table_id = f"{project_id}.RAW_TRAVEL.RAW_INPUT_TRAVEL"

        client = bigquery.Client(credentials= credentials,project=project_id)
        

        #Example request
        #request={
        #    "version": "1.0.0",
        #    "min_x": 535,
        #    "min_y": 340,
        #    "max_x": 964,
        #    "max_y": 684, 
        #    "region": "Poopland"
        #}


        query=''' 
        WITH 
            box as ( 
                SELECT STRUCT(
                {min_lat} AS min_lat,
                {max_lat} AS max_lat,
                {min_lon} AS min_lon,
                {max_lon} AS max_lon
            ))

            travels AS (
            SELECT t.orgin_coord,t.destination_coord,t.datetime 
            FROM `{table}` AS t 
            WHERE t.region="{region}" 
            AND t.datetime >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 8 WEEK)
            AND ST_WITHIN(t.orgin_coord, box)
            AND ST_WITHIN(t.destination_coord, box)
            ), 
            weeks AS (
            SELECT
            DATE_TRUNC(datetime, WEEK) AS week_start_date,
            COUNT(1) AS row_count
            FROM travels
            GROUP BY week_start_date
            ORDER BY week_start_date;   
            )
            
        SELECT AVG(row_count) FROM weeks;
            
        

        '''.format(table=bigquery_table_id,region=request["region"],min_lat=request["min_x"],max_lat=request["max_x"],min_lon=request["min_y"],max_lon=request["max_y"])
        
        query_job = client.query(query)
        results = query_job.result()
        return (results, 200)
    except Exception as e:
        return ("Something went wrong!",500)



In [None]:
#Get status of ingestion data flow (cloud FUNction)
#In this case I opted for a service that compares good/bad entries in the last hour and then serve a percentage of success, if there are no bad messages in the last hour, the service will return 1 (meaning 100% ok messages in the last hour)

import functions_framework
from google.cloud import bigquery
from google.oauth2 import service_account

@functions_framework.http
def cors_enabled_function(request):
    try:
        
        credentials = service_account.Credentials.from_service_account_file('path/to/file.json')

        project_id = "project-test-nw"
        bigquery_table_id = f"{project_id}.RAW_TRAVEL.RAW_INPUT_TRAVEL"
        bigquery_table_deadletter_id = f"{project_id}.RAW_TRAVEL.RAW_INPUT_TRAVEL"

        client = bigquery.Client(credentials= credentials,project=project_id)

        query=''' SELECT DIV(count(ti.datetime),SUM(count(ti.datetime),count(td.datetime)) )  
        FROM `{table_input}` ti, `{table_deadletter}` td 
        WHERE ti.datetime >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR) 
        AND td.datetime >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

          '''.format(table_input=bigquery_table_id,table_deadletter=bigquery_table_deadletter_id)
        query_job = client.query(query)
        results = query_job.result()
        return (results, 200)
    except Exception as e:
        return ("Something went wrong!",500)
          