## Select your AWS Profile

In [2]:
%env AWS_PROFILE=staging

env: AWS_PROFILE=staging


In [3]:
import hashlib
import json

import boto3
import pandas as pd

## Util functions

In [4]:
def transform_entry(var_char_value, col_type):
    if col_type == "varchar":
        return var_char_value
    elif col_type == "date":
        # TODO: Parse date correctly?
        return var_char_value
    elif col_type == "integer":
        return int(var_char_value)
    elif col_type == "bigint":
        return int(var_char_value)
    elif col_type == "double":
        return float(var_char_value)
    elif col_type == "json":
        # return json.dumps(var_char_value)
        return var_char_value
    else:
        raise NotImplementedError(f"Don't know how to parse {col_type}")
        
def extract_table(response):
    column_info_list = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
    col_headers = [c['Name'] for c in column_info_list]
    col_types = [c['Type'] for c in column_info_list]
    rows = []
    for i, raw_row in enumerate(response['ResultSet']['Rows']):
        if i == 0:
            continue # skip header row
        row = []
        for i, col in enumerate(raw_row['Data']):
            col_type = col_types[i]
            row.append(transform_entry(col.get('VarCharValue'), col_type))
        rows.append(row)

    return col_headers, rows

def table_to_df(col_headers, rows):
    num_cols = len(col_headers)
    cols = [[] for _ in range(num_cols)]
    for row in rows:
        for i in range(num_cols):
            try:
                row_i = row[i]
                cols[i].append(row_i)
            except IndexError:
                cols[i].append(None)
    results = {}
    for i, col_header in enumerate(col_headers):
        results[col_header] = cols[i]
    return pd.DataFrame(results)

# TODO: use a waiter instead?
def wait_for_query_to_complete(query_execution_id):
    client = get_athena_client()
    while True:
        response = client.get_query_execution(QueryExecutionId=query_execution_id)
        status = response["QueryExecution"]["Status"]["State"]
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            return status
        time.sleep(1)

def describe_query_execution_performance(query_execution_id):
    response = get_athena_client().get_query_execution(QueryExecutionId=query_execution_id)
    status = response["QueryExecution"]["Status"]["State"] == "SUCCEEDED"
    assert status, f"Query must have succeeded to get performance numbers. Query is currently {status}"
    stats = response["QueryExecution"]["Statistics"]
    return stats["EngineExecutionTimeInMillis"], stats["DataScannedInBytes"]

## Holla at Athena

In [5]:
athena = boto3.client("athena")

## Your SQL

In [6]:
query = (
    """
    WITH
      hashes as (
        -- each bucket has its own manifests table, this is my staging bucket
        -- the first bit of code is just to exract the package hash
        SELECT element_at(slice(split("$path", '/'), -1, 1), 1) as hash, user_meta FROM manifests_bio_staging
        -- look for particular metadata facet value
        -- version='v0' is just to help the query optimizer; we only need the first line of the manifest
        WHERE json_extract_scalar(user_meta, '$.owner') LIKE 'Kevin%' and version='v0'
      ),
      named_hashes as (
        -- this table is created by quilt in the database BUCKET_analyticsbucket_GUID
        SELECT * FROM "quilt_t4_staging_analyticsbucket_1bmma65zlcfa1"."package_hashes"
      )

    SELECT *
    FROM hashes
    JOIN named_hashes ON hashes."hash"=named_hashes."hash"
    """
)

## Hash query for idempotent execution

In [7]:
qid = hashlib.sha256(query.encode()).hexdigest()
qid

'641d534f8dc59b32231031f52918660e24df1f68933084e30fd374f426dfb0d1'

In [8]:
xid = athena.start_query_execution(
    QueryString=query,
    # QEC is the database
    QueryExecutionContext={
        "Database": "default"
    },
    ResultConfiguration={
        "OutputLocation": "s3://aws-athena-query-results-712023778557-us-east-1/",
    }
)

In [9]:
xid

{'QueryExecutionId': '49583562-17d0-4270-bc47-e23e64ac5092',
 'ResponseMetadata': {'RequestId': '035b2ca4-98d0-4ab1-a760-314665d2447e',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',
   'date': 'Tue, 01 Feb 2022 01:53:25 GMT',
   'x-amzn-requestid': '035b2ca4-98d0-4ab1-a760-314665d2447e',
   'content-length': '59',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

## Check on async execution
* TODO sometimes response may be truncated and you need to paginate

In [10]:
status = athena.get_query_execution(QueryExecutionId=xid['QueryExecutionId'])
status['QueryExecution']['Status']['State']

'SUCCEEDED'

In [11]:
results = athena.get_query_results(
    QueryExecutionId=xid['QueryExecutionId']
)

In [12]:
results['ResponseMetadata']

{'RequestId': '9cedaf20-e9ae-4549-8da7-b35c47af1aa1',
 'HTTPStatusCode': 200,
 'HTTPHeaders': {'content-type': 'application/x-amz-json-1.1',
  'date': 'Tue, 01 Feb 2022 01:53:39 GMT',
  'x-amzn-requestid': '9cedaf20-e9ae-4549-8da7-b35c47af1aa1',
  'content-length': '51028',
  'connection': 'keep-alive'},
 'RetryAttempts': 0}

## Results as dataframe

In [13]:
df = table_to_df(*extract_table(results))
df

Unnamed: 0,hash,user_meta,bucket,name
0,091db004214d84593e6189610d46749d1505341bcc84ae...,"{""owner"":""Kevin Moore"",""date"":""2021-02-01"",""na...",quilt-bio-staging,aneeshtemp/virdemo
1,6bdc974c7e1cb706b075e18070b496ab7127c72633ebd3...,"{""owner"":""Kevin Moore"",""date"":""2048-12-12"",""st...",quilt-bio-staging,fiskus/test
2,ef2a874a79ae61db01ae7d568b48c6e2689bcce1d913a7...,"{""owner"":""Kevin Moore"",""date"":""2049-01-01"",""no...",quilt-bio-staging,fiskus/sandbox
3,dfb5862eae8497997b2764338e7fcc09f0daf6f95e1b61...,"{""owner"":""Kevin Moore"",""date"":""2021-11-04"",""na...",quilt-bio-staging,kevin/tome
4,d6e8795a3f37ae7b53a7c92ddb35a09653decfb93c6dc0...,"{""owner"":""Kevin Moore"",""date"":""2049-01-01"",""no...",quilt-bio-staging,fiskus/2020-12-01
...,...,...,...,...
56,a320436781b0d8bc48312870ca73d36ed03ea7c1a42b73...,"{""owner"":""Kevin Moore"",""date"":""2077-13-32"",""na...",quilt-bio-staging,akarve/crispr
57,3df066ed2becf2b572db2ef944c63679bd98d30e5d23a4...,"{""owner"":""Kevin Moore"",""date"":""2020-12-12"",""a""...",quilt-bio-staging,fiskus/2020-11-17
58,2bddfa73e6abf319ae24847fe84fbba5a73594e1ac1613...,"{""owner"":""Kevin Moore"",""date"":""2049-01-01"",""no...",quilt-bio-staging,fiskus/2020-12-04
59,e0378399e8bb7763fca3e4985406bd1ef4e11609b0feed...,"{""owner"":""Kevin Moore"",""date"":""2020-11-16"",""na...",quilt-bio-staging,akarve/rhtml


In [14]:
sdict = df['user_meta'][0]
import json
ndict = json.loads(sdict)
ndict

{'owner': 'Kevin Moore',
 'date': '2021-02-01',
 'name': 'Aneesh Karve',
 'type': 'Flow Cytometry'}