# Create Cloud Functions on GCP using Python

## Services needed

Cloud functions
Stackdriver logging
Pub/Sub

## Requirements.txt
cachetools==3.0.0
certifi==2018.10.15
chardet==3.0.4
Click==7.0
Flask==1.0.2
google-api-core==1.6.0
google-api-python-client==1.7.4
google-auth==1.6.1
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.2.0
google-cloud-core==1.0.3
google-cloud-datastore==1.7.1
google-cloud-storage==1.13.0
google-cloud-bigquery==1.19.0 
google-resumable-media==0.3.1
googleapis-common-protos==1.5.5
grpcio==1.16.1
httplib2==0.12.0
idna==2.7
itsdangerous==1.1.0
Jinja2==2.10
MarkupSafe==1.1.0
numpy==1.15.4
oauthlib==2.1.0
pandas==0.23.4
protobuf==3.6.1
pyasn1==0.4.4
pyasn1-modules==0.2.2
python-dateutil==2.7.5
pytz==2019.1
requests==2.20.1
requests-oauthlib==1.0.0
rsa==4.0
six==1.11.0
uritemplate==3.0.0
urllib3==1.24.1
Werkzeug==0.14.1

# stackdriver sink filter
resource.type="bigquery_resource" protoPayload.methodName = "jobservice.jobcompleted" protoPayload.serviceData.jobCompletedEvent.eventName="load_job_completed" protoPayload.authenticationInfo.principalEmail="analytics-processing-dev@system.gserviceaccount.com" protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"ga_sessions_" NOT protoPayload.serviceData.jobCompletedEvent.job.jobConfiguration.load.destinationTable.tableId:"ga_sessions_intraday"

Sink name: BQ_export
Sink Destination : BQ_export

# Create Pub/Sub subscription
name: BQ_export

This function performs a query in BQ, saves query results to a new BQ table and then exports the results to a cloud storage bucket. This will run daily since tables of BQ exports from GA are created once every day 

In [None]:
from google.cloud import storage, bigquery
from google.oauth2 import service_account
import datetime as dt
import pytz
import io
import json
import logging

service_account_info = {
 "type": "service_account",
 "project_id": "genesis-energy",
 "private_key_id": "34cec11bf395743016ab258f273cd8b014904da9",
 "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC+sJO9t/OrUUlE\noEebL0qb1/AQyGE7wmHTKDI2nh6byxawUlLU6YRFoe5Vhnb9iFX+lkd5SLJem08b\nimIPMkVYMgUz0PmiZsK4eQcHDqJWIEW6wf/sYwPjdE0/4Qm9peqr+bvv2FNKnvRg\n9IofWNjWOa0R7py71RbIw4krld2K0U7c1FItPuJ0PuNW9Mqug8dg3VlNR5aPZga3\n0AR9oE5arPpyMrLjRCXoQxiKU8o0yDCWP7ENk7fTLK6R7kOE8OBXgIMWarMYe9wV\nc1OUy5C51ecwswz3nAsTepTHR8vy2d1Q62j0V8zpbct954pBH97PO6r7ZfX6v8wg\nmwevN0GfAgMBAAECggEAFHjoo6moI3BqqU974JFHNUwNnf+lUMEht4XNPkfjiV5Y\nZT2xXuJjLQahi9UP2VBVyytfKQpA/B84cwB8Xp1LDJjyLgn1gxX9OcjklEFh9FUq\np05Ep6BJvpWx5XXHkfDKLQR2xN04s/40xmP6oDrtrLIsfoxbj17lP1Ce3WsNieWX\nNJbXFggyv+Om3oV1R5jlIpQqF8DdbERMNftj0/iOJkE3UgF8sxyoeGV75theTPHS\n6GihiyogV1aYAgmDo/99rGtFoQ+q99Thc4xHd2H47nQdg+usflOM7+j4boh16jNG\nNGh2NAFenCtvFi/D63HzRnmuEAH2GPt6TLBc8VukQQKBgQDvrkTN5fZo2x3snd1p\nb58g357gPaNV+Q3lmpwgHiqIrVWxPIqumfaKSRULighuhqCnCXEyFHzE3S/S1N7u\nC8/ea1OMYyGJtf7EVRR060iV7hZtpXqL45eId5Wht13XhU2KcAINiXK/h4kKYXP1\nLv9nZSJn77Lv/X4AVIYfLoOShwKBgQDLrGA6mtj3DE6P5QgbDu6qdBkw4KvHBcNf\nd3NHzE7WKoasfoqFEKPH+Oe+1e7meZxouREUvP6pjoysRTo00VspXc5KOgGOZ+EH\nugytVluHFAq3+l4u7COJUsPmayIdqecfznoSV0ND5yGdr8bmBKtbBOphHEoo9Odx\njtvxm+VmKQKBgHY6uKX4cfx3WeOGW9nOvegxZuWmghO+ryQ7iBgvyTtyhXrQKnjG\nad78/eybQl2C+2xqM/RINuPFV/ngF7fFC9zjhGsT209a/zlJ3rVt+ng64cVkg3bN\n5cnjc2Q6V4ggsYdAiUK0Mm5YJTmBqPmhHUubj8VFcDcCDE2NZDutbimxAoGBAJ/M\nR9fTRCVYz47QeHKaXaD45AbYT44gNHCDjLwpTTV1OiK+SborGwU7gb/Vo9HKWnBD\nu2YU/o1T3YUP76IfHDIxejf8mNn9IK5qR6X8GmW50MsWMa0I8ry8Zh0zsxmpA0CO\nRHJ9arYs8tAAaMJxqWwJSEcW5gyKUchj+Kbw1KQ5AoGBAJLIakiNiBl2USEzMQho\nlWpgJb8euWhvGAi2fr/F12cvWBf8wOuzkbip75CRghTneY/+HZfp+X3Rvho76B0L\nhC3zVqoX0jvZ1nPWONCwstD6onBq4QgjvQh/8MYNE+8p8fvfxUQhpfAT8IKRBOr+\nXUHfnigSxDc34B+UwNTEFCc5\n-----END PRIVATE KEY-----\n",
 "client_email": "google-bigquery-test@genesis-energy.iam.gserviceaccount.com",
 "client_id": "107841356764843347345",
 "auth_uri": "https://accounts.google.com/o/oauth2/auth",
 "token_uri": "https://oauth2.googleapis.com/token",
 "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
 "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/google-bigquery-test%40genesis-energy.iam.gserviceaccount.com"
}

def save_to_bq_table(data, context):
    # Credentials take the form of a json file which you can obtain by creating a service account key from
    # https://cloud.google.com/docs/authentication/getting-started#setting_the_environment_variable
    credentials = service_account.Credentials.from_service_account_info(
            service_account_info)
    project_id = 'genesis-energy'
    client = bigquery.Client(credentials=credentials,project=project_id)

    # Perform a query.
    query = "SELECT fullvisitorid, \
        clientid, \
        (SELECT value FROM unnest(t.customDimensions) WHERE index = 3) as EIQ_username, \
        (SELECT value FROM unnest(hits.customDimensions) WHERE index = 46) as customer_number, \
        (SELECT value FROM unnest(t.customDimensions) WHERE index = 5) as account_type, \
        hits.eventInfo.eventAction as eventLabel, \
        FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', TIMESTAMP_SECONDS(SAFE_CAST(visitStartTime+hits.time/1000 AS INT64)), 'Pacific/Auckland') as date_time, \
        device.deviceCategory as deviceCategory, \
        'web' as platform \
        FROM `genesis-energy.82716702.ga_sessions_*` t \
        left join unnest(hits) as hits \
        WHERE _TABLE_SUFFIX = FORMAT_DATETIME('%Y%m%d', DATETIME_ADD(CURRENT_DATETIME('Pacific/Auckland'),INTERVAL -1 DAY)) \
        and hits.eventInfo.eventCategory = 'Custom Dimension' \
        and hits.eventInfo.eventLabel = 'My Account' \
        GROUP BY fullvisitorid,clientid , EIQ_username, customer_number, account_type, eventLabel, date_time, deviceCategory, platform" 

    # Saving query to a new BQ table
    dataset_id = 'eiq_interactions'
    timezone = pytz.timezone("Pacific/Auckland")
    current_date = (dt.datetime.now(timezone) - dt.timedelta(days = 1)).strftime("%Y%m%d")
    table_id = "daily_export_" + current_date 

    job_config = bigquery.QueryJobConfig() 
    table_ref = client.dataset(dataset_id).table(table_id)
    job_config.destination = table_ref
    job_config.allow_large_results = True
    query_job = client.query(query, location='US', job_config=job_config) 
    query_job.result()

     # Save to Cloud Storage Bucket
    bucketname = 'genesis-energy-eiq_interactions'
    full_table_id = "`" + project_id + "." + dataset_id + "." + table_id + "`"
    
    query = "SELECT format_timestamp('%Y%m%d', timestamp(max(date_time))) max_date from " + full_table_id 
    query_job = client.query(query)
    export_date = query_job.to_dataframe()
    gcs_export_date = export_date["max_date"]
    gcs_filename = "eiq_export_" + gcs_export_date
    destination_uri = 'gs://{}/{}'.format(bucketname, gcs_filename[0] + '.csv')
    dataset_ref = client.dataset(dataset_id, project=project_id)
    table_id = "daily_export_" + current_date
    table_ref = dataset_ref.table(table_id)
    extract_job = client.extract_table(table_ref,destination_uri, location='US') # API request # Location must match that of the source table
    extract_job.result() # Waits for job to complete.
    client.delete_table(table_ref) # API request-