# Query Athena using standar SQL syntax
Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon Simple Storage Service (Amazon S3) using standard SQL. With a few actions in the AWS Management Console, you can point Athena at your data stored in Amazon S3 and begin using standard SQL to run ad-hoc queries and get results in seconds.

Athena is serverless, so there is no infrastructure to set up or manage, and you pay only for the queries you run. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets and complex queries.



In [None]:
# Needed modules
import pandas as pd
import json
import boto3
import time
import io

In [None]:
# Verify AWS Credentials
session = boto3.Session()
sts = session.client('sts')
response = sts.get_caller_identity()
my_username = response['Arn'].split('/')[1]
print(my_username)

In [None]:
# We are going to use 3 AWS services: glue, athena and s3, so create their clients
glue = boto3.client('glue')
athena = boto3.client('athena')
s3c = session.client('s3')

## Query an Athena table step-by-step

### Select the right database

In [None]:
databases = glue.get_databases()
databases
db_lst = []
for i in databases['DatabaseList']:
    #print(i['Name'])
    db_lst.append(i['Name'])
print('Existing databases in Athena:',db_lst)
# Our data is in the 'world-bank-indicators', in index [2] from the list. Let's store that value in a variable
db = db_lst[2]
print('Our selected database:',db)

### See what tables are available in the database
I have already used an AWS Glue crawler to search the files and find any common structures and create tables. This resulted in two tables:
- gdp: The World Bank indicator, GDP, which has a row for each country's gross domestic product in the years 2060-2020
- pop: The World Bank indicator, POP, which has a row for each country's population in the years 2060-2020

In [None]:
tables = glue.get_tables(DatabaseName = db)
# Print out all the tabes in this database
for t in tables['TableList']:
    print(t['Name'])

### Setup parameters needed to connect to Athena and run a SQL query.

In [None]:
# We need to have an S3 location to store our query results. Set it. This is just a string variable.
output = 's3://gse580/athena/results'

### Setup and run the query.

In [None]:
# Define a query using Standard SQL syntax. It needs to be a string.
query = 'SELECT * FROM pop WHERE value < 1000000 and date = 2020'

In [None]:
# Start the query using the start_query_execution() function. 
athenaQuery = athena.start_query_execution(
    # Specify the query
    QueryString = query,
    # Specify the database
    QueryExecutionContext = {
        'Database': db
    }, 
    # Send the results to the output location on S3
    ResultConfiguration = { 'OutputLocation': output}
)
# Athena reports back details about this query. As usual, this is a dictionary (or JSON object)
athenaQuery

In [None]:
# Extract the QueryExecutionId from the athenaQuery variable
# An important item to know is the id of the query
qid = athenaQuery['QueryExecutionId']
print('Query ID:',qid)

In [None]:
# The query may take some time, so make sure it has 'SUCCEEDED' before moving on
#
# Request the query's current status using the get_query_execution() function.
response = athena.get_query_execution(QueryExecutionId=qid)
# Print the current status
response['QueryExecution']['Status']['State']

### Loading the results of a query into a pandas DataFrame

In [None]:
# Athena stored the result of the query in a .csv file in our S3 output location: 's3://gse580/athena/results'
bucket = 'gse580'
# Parse the reqponse variable to get just the key to the file
key = response['QueryExecution']['ResultConfiguration']['OutputLocation'].split('gse580/')[1]
print('Bucket:',bucket)
print('Key:',key)

In [None]:
# Call the 'get_object' function from boto3. We did this earlier in the term.
response = s3c.get_object(Bucket=bucket, Key=key)
#
# Get the HTTPStatusCode from the response
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status == 200:
    # If all OK, then create the DataFrame
    print(f"Successful S3 get_object response. Status - {status}")
    # Create the df from the get_object call above by getting only the 'Body' (which is the data)
    df = pd.read_csv(response.get('Body'))
else:
    # See what the response is and troubleshoot
    print(f"Unsuccessful S3 get_object response. Status - {status}")
#
# Assuming it worked, show the df.head()
df.head()

### While that was a lot of steps, we can write a Python function to do it all for us.
Let's setup the function to work like this:
1. Define a query string
2. Call the function with the query as a parameter and return a pandas DataFrame

You will call the function like this:
- query = 'Write a SQL query here"
- df = query_athena(query)

In [None]:
# Define a sample query for a test
query = "SELECT * FROM gdp WHERE countryiso3code = 'USA'"
print(query)

In [None]:
# Define a function that takes the query as a parameter and returns a DataFrame.
# This is the same code used above, just included in one function
def query_athena(query):
    # Define variables that are needed
    db = 'wb-indicators'
    output = 's3://gse580/athena/results'
    bucket = 'gse580'
    # Start the query
    athenaQuery = athena.start_query_execution(
        QueryString = query,
        QueryExecutionContext = {
            'Database': db
        }, 
        ResultConfiguration = { 'OutputLocation': output}
    )
    while True:
        # Wait for a second for query to complete.
        time.sleep(1)
        qid = athenaQuery['QueryExecutionId']
        try:
            # Check status
            response = athena.get_query_execution(QueryExecutionId = qid)
            # Test to see if query was successful
            if response['QueryExecution']['Status']['State'] == 'SUCCEEDED':
                print('Successful query')
                break
            else:
                # Print the status
                print('Still waiting for the query:',response['QueryExecution']['Status']['State'])
                # If the state is 'FAILED', then exit while loop. 
                if response['QueryExecution']['Status']['State'] == 'FAILED':
                    print('Failed query, investigate error')
                    break
        except:
            print('Query not yet done, waiting for a second')
    # Parse the query reqponse to get just the key to the file
    key = response['QueryExecution']['ResultConfiguration']['OutputLocation'].split('gse580/')[1]
    #
    # Call the S3 'get_object' function from boto3.
    response = s3c.get_object(Bucket=bucket, Key=key)
    #
    # Get the HTTPStatusCode from the response
    status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

    if status == 200:
        # If all OK, then create the DataFrame
        #print(f"Successful S3 get_object response. Status - {status}")
        # Create the df from the get_object call above by getting only the 'Body' (which is the data)
        df = pd.read_csv(response.get('Body'))
    else:
        # See what the response is and troubleshoot
        print(f"Unsuccessful S3 get_object response. Status - {status}")
    #
    # Assuming it worked, return the df
    return df

### Use the function

In [None]:
# Now just call the query function and load the results into a variable.
query_df = query_athena(query)
query_df.head()

### Let's combine the two tables into a single table with both POP and GDP for all years.

In [None]:
# Let's get everything from the population table with useful column names
query = "SELECT pop.countryiso3code AS country,pop.date,pop.value AS pop FROM pop"
print(query)
pop_df = query_athena(query)
print(pop_df.head(2))
# Show the dimension of the dataframe
pop_df.shape

In [None]:
# Let's get everything from the gdp table with useful column names
query = "SELECT gdp.countryiso3code AS country,gdp.date,gdp.value AS gdp FROM gdp"
print(query)
gdp_df = query_athena(query)
print(gdp_df.head(2))
# Show the dimension
gdp_df.shape

In [None]:
# As usual, the data is not very clean, lots of NaNs.
pop_df[pop_df.isnull().any(axis=1)]

In [None]:
# Clean it up
# Drop any NaNs
pop_df = pop_df.dropna()
gdp_df = gdp_df.dropna()
# Let's reset the DataFrames index so they are increasing 1,2,3...
pop_df.reset_index()
gdp_df.reset_index()
# See the size of each df
print(pop_df.shape)
print(gdp_df.shape)

In [None]:
# Merge the 2 dfs into single df
merged_df = pop_df.merge(gdp_df, on=['country','date'])
print(merged_df.head(5))
merged_df.shape

## Assignment:
- Create your own, different query of the tables, creating a new dataframe.  The query can be whatever you want.
- Save that dataframe to a new .csv on the 'gse580' bucket at the location:<BR>
        /gsb580/yourusername/data/athena_results.csv
