In [None]:
# https://cloud.google.com/dataproc/docs/tutorials/python-library-example
# pip install --upgrade google-api-python-client

import googleapiclient.discovery
from google.cloud import storage
import pandas as pd
from datetime import datetime, timedelta
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
storage_client = storage.Client(project='manymoons-215635')
bucket_name = 'raw-events-prod'
bucket = storage_client.get_bucket(bucket_name)

## DataProc Functions to Access Hive Cluster

In [None]:
def execute_hive_query(queries, projectId='manymoons-215635', region='us-central1'):
    """
    1. Formats hive queries (prepends with settings)
    2. Submit query job to DataProc
    
    Return:
      res -- <dataproc.Job> dataproc job with metadata and accessors
    
    """
    formatted_queries = []
    for query in queries:
        updated_query = f"""
        SET hive.mapred.supports.subdirectories=TRUE; 
        SET mapred.input.dir.recursive=TRUE;
        {query}
        """
#         updated_query = query
        formatted_queries.append(updated_query)
    job = {
        "hiveJob": {
          "queryList": {
            "queries": formatted_queries
          }
        },
        'placement': {
            'clusterName': 'hive-cluster'
        }
    }
    
    job_details = {
        'projectId': projectId,
        'job': job
    }

    
    return dataproc.projects().regions().jobs() \
        .submit(body=job_details, projectId=projectId, region=region) \
        .execute()

In [None]:
def wait_for_job(job_id, project='manymoons-215635', region='us-central1'):
    """
    Waits for the job to be finished, and then returns result
    """
    print('Waiting for job to finish...')
    while True:
        result = dataproc.projects().regions().jobs().get(
            projectId=project,
            region=region,
            jobId=job_id).execute()
        # Handle exceptions
        if result['status']['state'] == 'ERROR':
            raise Exception(result['status']['details'])
        elif result['status']['state'] == 'DONE':
            print('Job finished.')
            return result
# [END wait]

In [None]:
def download_output(job_id, output_bucket, project_id='manymoons-215635', cluster_id='9a01738d-b251-419d-b63e-d14dd8771999'):
    """Downloads the output file from Cloud Storage and returns it as a
    string."""
    print('Downloading output file')
    client = storage.Client(project=project_id)
    bucket = client.get_bucket(output_bucket)
    output_blob = (
        'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
        .format(cluster_id, job_id))
    return bucket.blob(output_blob).download_as_string()

In [None]:
client = storage.Client(project='manymoons-215635')
bucket = client.get_bucket('raw-events-prod')

## Main Query Function

In [None]:
def query_hive(hive_query):
    """
    Main query helper function
    """
    if type(hive_query) != type([]):
        query = [hive_query]
    else:
        query = hive_query
    res = execute_hive_query(query)
    jobId = res['reference']['jobId']
    try:
        res = wait_for_job(jobId)
        output_bucket = res['driverOutputResourceUri'].split('//')[1].split('/')[0]
    except:
        print('Failed')
        output_bucket = 'dataproc-eac45f7a-3b5a-4fa4-91ea-e1dd2252fd56-us-central1'
    out = download_output(jobId, output_bucket)
    return out

In [None]:
def data_query(query):
    """
    When making data-returning queries
    """
    out = query_hive(query)
    columns = [i.strip().replace('events.', '') for i in str(out).split('\\n')[13].split('|') if len(i) > 0]
    output_data = []
    for i in str(out).split('\\n')[15:-5]:
        output_data.append([i.strip() for i in i.split('|')[1:-1]])
    return pd.DataFrame(output_data, columns=columns)
    

In [None]:
example_output = data_query("""
    DESCRIBE events;
""")

In [None]:
# example_output

In [None]:
query2 = """
    SELECT * FROM events WHERE schema >= 10 AND d1='2017-09-17-13' LIMIT 2;
"""

out = data_query(query2)

In [None]:
out

In [None]:
# To Add a Partition
# query = """
# ALTER TABLE events
#     ADD IF NOT EXISTS PARTITION (d1='2017-09-17-13')
#     LOCATION 'gs://raw-events-prod/data/2017/09/17/13'
# """

# out = query_hive(query)

# Scratch Work Below

# Note: Partitions created up to 2018/4/6/11

In [None]:
queries = []
for blob in list(set(blobs)):
    query = """
        ALTER TABLE events
        ADD IF NOT EXISTS PARTITION (d1='{}')
        LOCATION 'gs://{}'
    """.format(blob.split('data/')[-1].replace('/','-'), blob)
    queries.append(query)
#     print(query)

In [None]:
dts = []
for i in blobs:
    dts.append(datetime(*[int(i) for i in i.split('data/')[1].split('/')]))

In [None]:
# Bulk Partition Creator for Historical data (Scratch Work - Dont Run this)
# i = 1
# # int(len(queries)/200)
# for i in range(1, int(len(queries)/200) + 1):
# #     print(200*i, 200*(i+1))
# #     print(i)
#     input_queries = queries[200*i: 200*(i+1)]
# #     print(input_queries[0])
#     out = execute_hive_query(input_queries, projectId='manymoons-215635', region='us-central1')
# #     print(len(input_queries))

In [None]:
q = "SHOW partitions events"
out = query_hive(q)

In [None]:
str(out).split('\\n')[11:]

In [None]:
query = """
ALTER TABLE events2 DROP PARTITION (d1='all');

ALTER TABLE events2
    ADD PARTITION (d1='all')
    LOCATION 'gs://raw-events-prod/180337_2018-01-28_18/'
"""

out = query_hive(query)

In [None]:
out

In [None]:
query2 = """
    SELECT * FROM events LIMIT 2;
"""

out = query_hive(query2)

In [None]:
# out

In [None]:
query2 = """
    SELECT * FROM events WHERE schema >= 10 AND d1 >= '2017-09-17-13' LIMIT 2;
"""

df = data_query(query2)

In [None]:
df.event_type

In [None]:
df.columns.tolist()

In [None]:
columns = [i.strip().replace('events.', '') for i in str(out).split('\\n')[13].split('|') if len(i) > 0]

In [None]:
output_data = []
for i in str(out).split('\\n')[15:-5]:
    output_data.append([i.strip() for i in i.split('|')[1:-1]])
# [i.strip() for i in str(out).split('\\n')[15:-5][0].split('|')[1:-1]]

In [None]:
len(columns), len(output_data)

In [None]:
pd.DataFrame(output_data, columns=columns)

In [None]:
output_data

In [None]:
q_in = "\nDROP TABLE IF EXISTS events;\nCREATE EXTERNAL TABLE events\n      (tion_time STRING, user_id STRING, user_properties STRING, uuid STRING, version_name STRING, amplitude_attribution_ids STRING, amplitude_id STRING, app STRING, event_id STRING, session_id STRING, is_attribution_event STRING)\n      PARTITIONED BY (d1 STRING) \n      STORED AS PARQUET\n      LOCATION 'gs://raw-events-prod/data';\n"
print(q_in)

In [None]:
table_string = 'insert_id STRING, schema INT, adid STRING, amplitude_event_type STRING, amplitude_id INT, app INT, city STRING, client_event_time STRING, client_upload_time STRING, country STRING, data STRING, device_brand STRING, device_carrier STRING, device_family STRING, device_id STRING, device_manufacturer STRING, device_model STRING, device_type STRING, dma STRING, event_id INT, event_properties STRING, event_time STRING, event_type STRING, group_properties STRING, groups STRING, idfa STRING, ip_address STRING, language STRING, library STRING, location_lat STRING, location_lng STRING, os_name STRING, os_version STRING, paying STRING, platform STRING, processed_time STRING, region STRING, sample_rate STRING, server_upload_time STRING, session_id INT, start_version STRING, user_creation_time STRING, user_id STRING, user_properties STRING, uuid STRING, version_name STRING'
que = f"""
DROP TABLE IF EXISTS events;
CREATE EXTERNAL TABLE events
      ({table_string})
      PARTITIONED BY (d1 STRING) 
      STORED AS PARQUET
      LOCATION 'gs://raw-events-prod/data';
"""
que

In [None]:
out = query_hive(que)

In [None]:
out