# Athena and Boto3

#### Import BOTO3

In [105]:
import boto3
import pandas as pd
import time

#### Create Session

In [106]:
session = boto3.Session(
    aws_access_key_id='',
    aws_secret_access_key='',
    region_name='')

In [107]:
athena_client = session.client('athena')

#### Define table properties and Query

In [108]:
database = 'dbname'
table_name = 'tblname'

In [109]:
create_table_query = f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {database}.{table_name} (
      `customerid` string,
      `customername` string,
      `segment` string,
      `age` int,
      `country` string,
      `city` string,
      `state` string,
      `postalcode` int,
      `region` string
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
    WITH SERDEPROPERTIES ('field.delim' = ',')
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://your-bucket/path-csv/'
    TBLPROPERTIES (
      'classification' = 'csv',
      'skip.header.line.count' = '1'
    )
"""



#### Submit Request

In [110]:
output_location = 's3://your-bucket/output_path/'
response = athena_client.start_query_execution(
    QueryString=create_table_query,
    QueryExecutionContext={'Database': database},
    ResultConfiguration={'OutputLocation': output_location}
)

#### Sample Query from Table

In [111]:
query = "SELECT * FROM tblname LIMIT 5"

In [112]:
response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={'Database': database},
    ResultConfiguration={'OutputLocation': output_location}
)

In [113]:
query_execution_id = response['QueryExecutionId']

#### Wait query completion

In [114]:
def wait_for_query_completion(query_execution_id):
    while True:
        response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
        state = response['QueryExecution']['Status']['State']
        
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        
        # Wait for a few seconds before checking the query status again
        time.sleep(2)

    return state

In [115]:
query_state = wait_for_query_completion(query_execution_id)

In [116]:
if query_state == 'SUCCEEDED':
    print("Query execution completed successfully.")
else:
    print("Query execution failed or was cancelled.")

Query execution completed successfully.


In [118]:
if query_state == 'SUCCEEDED':
    # Get the query results
    result = athena_client.get_query_results(QueryExecutionId=query_execution_id)

    # Extract column names
    columns = [col['Label'] for col in result['ResultSet']['ResultSetMetadata']['ColumnInfo']]

    # Extract row data
    rows = result['ResultSet']['Rows'][1:]  # Exclude the header row

    # Convert query result to Pandas DataFrame
    data = []
    for row in rows:
        data.append([val['VarCharValue'] for val in row['Data']])
    df = pd.DataFrame(data, columns=columns)
else:
    print(f"Query execution failed. Status: {status}")

In [119]:
df.head()

Unnamed: 0,customerid,customername,segment,age,country,city,state,postalcode,region
0,BH-11710,Brosina Hoffman,Consumer,20,United States,Los Angeles,California,90032,West
1,ZD-21925,Zuschuss Donatelli,Consumer,66,United States,San Francisco,California,94109,West
2,EH-13945,Eric Hoffmann,Consumer,21,United States,Los Angeles,California,90049,West
3,RA-19885,Ruben Ausman,Corporate,51,United States,Los Angeles,Cakifornia,90049,West
4,KM-16720,Kunst Miller,Consumer,69,United States,Los Angeles,California,90004,West
