Upgrade requests to version >= 2.18.0 due to google-cloud-storage having to low dependency boundary. After upgrading you may need to restart the interpreter by shutting down session and open up the notebook again. The reason is that the old requests package version may be "cached" in sys.module causing errors in google-cloud-storage.

In [7]:
#Check that requests version is higher than 2.18.0
import requests
requests.__version__

'2.18.4'

In [8]:
#!pip install 'requests>= 2.18.0'

In [19]:
import time
import logging
import os
import sys

from google.cloud import bigquery
from google.cloud.storage import Client as StorageClient

from pandas import read_csv

from apiclient.http import MediaFileUpload

from oauth2client.client import GoogleCredentials
from googleapiclient import discovery

credentials = GoogleCredentials.get_application_default()

analytics = discovery.build('analytics', 'v3', credentials=credentials)

#Execute a asyncronous query job and store result in temporary table. Thereafter extract table to google cloud storage
def bq_to_cs(_query, _project, _blob, _bucket, _destination_format='CSV', _print_header=True, _write_disposition='WRITE_TRUNCATE', _legacy_sql=False):
    client = bigquery.Client(project=_project)        
    query_job = client.query(_query)
    query_job.result()
    
    table_ref = query_job.destination

    destination_uri = 'gs://{}/{}'.format(_bucket, _blob)
    extract_job = client.extract_table(
        table_ref, destination_uri)  # API request

    extract_job.result(timeout=100)  # Waits for job to complete.

#Download file from google cloud storage to datalab compute instance, prefix headers with 'ga:' and upload to google analytics via data import api.
def cs_to_ga(bucket_name, source_blob_name, destination_file_name, account_id, web_property_id, custom_data_source_id, project_id, prefix=True):
    #Download file from cloud storage
    storage_client = StorageClient(project=project_id)
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    #prefix column headers by reding to pandas and then back to csv-file            
    df = read_csv(destination_file_name)
    if prefix:
        df = df.add_prefix('ga:')
    df.to_csv(destination_file_name, index=False)

    #upload to GA data import
    media = MediaFileUpload(destination_file_name, mimetype='application/octet-stream', chunksize=1024*1024, resumable=True)
    try:
        daily_upload = analytics.management().uploads().uploadData(accountId=account_id, webPropertyId=web_property_id, customDataSourceId=custom_data_source_id, media_body=media).execute()
    except TypeError, error:
        # Handle errors in constructing a query.
        raise
    except Exception as e:
        raise
        
    #Delete file from local when finished
    os.remove(destination_file_name)

def main(query, project_id, bucket, blob, account_id, web_property_id, data_source_id):
    bq_to_cs(query, project_id, blob, bucket)
    cs_to_ga(bucket, blob, blob, account_id, web_property_id, data_source_id, project_id)

In [None]:
query = '''SELECT '/' AS pagePath, 'Author A' AS dimension1
UNION ALL
SELECT '/flatten-google-analytics-custom-dimensions-with-a-bigquery-udf/' AS pagePath, 'Author B' AS dimension1'''

main(query , "<GCP project>", "<GCS bucket>", "export.csv", "<GA account>", "<GA property>", "<GA Data Set ID>")

enable google analytics api on project
restart interpreter after installing requests>=2.18.0 to make sure that import doesn't check if module is in sys.module and returning old version