In [1]:
import boto3
import time
import psycopg2
from dotenv import load_dotenv
import os

load_dotenv()

AWS_ACCESS_KEY = os.getenv('ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('SECRET_ACCESS_KEY')

# Extract

In [11]:
# Set up the Athena client
athena_client = boto3.client(
    'athena',
    region_name='eu-west-2',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY)

# Write the SQL query
sql_query = """
    SELECT server_request_country_code, COUNT(*) as total_visits
    FROM vod_clickstream
    WHERE datetime >= CAST('2018-01-01' AS timestamp) AND datetime < CAST('2018-02-01' AS timestamp)
    GROUP BY server_request_country_code;
"""


# Execute the Athena query
query_execution = athena_client.start_query_execution(
    QueryString=sql_query,
    QueryExecutionContext={
        "Database": "jack-athena-parquet"
    },
    ResultConfiguration={
        "OutputLocation": "s3://athena-learners-etl-bite05/jack" # <= This will be different for you, refer to the Amazon Athena pill for more information.
    }
)

# Poll the query status until it is either successful or failed
query_status = "QUEUED"
query_execution_id = query_execution["QueryExecutionId"]

while query_status in ["QUEUED", "RUNNING"]:
    query_execution = athena_client.get_query_execution(
        QueryExecutionId=query_execution_id
    )
    query_status = query_execution["QueryExecution"]["Status"]["State"]
    if query_status == "FAILED":
        raise Exception("Athena query failed!")
    time.sleep(1)

# Retrieve the query results
results = athena_client.get_query_results(
    QueryExecutionId=query_execution_id
)["ResultSet"]["Rows"]

In [12]:
results

[{'Data': [{'VarCharValue': 'server_request_country_code'},
   {'VarCharValue': 'total_visits'}]},
 {'Data': [{'VarCharValue': 'EG'}, {'VarCharValue': '30524'}]},
 {'Data': [{'VarCharValue': 'GM'}, {'VarCharValue': '61'}]},
 {'Data': [{'VarCharValue': 'DJ'}, {'VarCharValue': '41'}]},
 {'Data': [{'VarCharValue': 'TW'}, {'VarCharValue': '63454'}]},
 {'Data': [{'VarCharValue': 'ZW'}, {'VarCharValue': '2566'}]},
 {'Data': [{'VarCharValue': 'IT'}, {'VarCharValue': '146365'}]},
 {'Data': [{'VarCharValue': 'LR'}, {'VarCharValue': '489'}]},
 {'Data': [{'VarCharValue': 'YT'}, {'VarCharValue': '339'}]},
 {'Data': [{'VarCharValue': 'TZ'}, {'VarCharValue': '2474'}]},
 {'Data': [{'VarCharValue': 'GN'}, {'VarCharValue': '337'}]},
 {'Data': [{'VarCharValue': 'SM'}, {'VarCharValue': '291'}]},
 {'Data': [{'VarCharValue': 'DO'}, {'VarCharValue': '116842'}]},
 {'Data': [{'VarCharValue': 'PE'}, {'VarCharValue': '182824'}]},
 {'Data': [{'VarCharValue': 'KY'}, {'VarCharValue': '2236'}]},
 {'Data': [{'VarCha

# Transform and Load

In [15]:
# Connect to the local Postgres database
conn = psycopg2.connect(database="etl_bites", user="jackdench", host="localhost", port="5432")
cursor = conn.cursor()

# Create the table if it doesn't exist
cursor.execute("""
    CREATE TABLE IF NOT EXISTS visits_per_country (
        country_code VARCHAR(2) PRIMARY KEY,
        total_visits INTEGER
    );
""")
            
# Process the query results
for row in results[1:]:
    if not row["Data"][0] or not row["Data"][1]:
        # You could also print what a `row` has if you are curious!
        print(f"Skipping row: {row}")
        continue

    country_code = row["Data"][0]["VarCharValue"]
    total_visits = int(row["Data"][1]["VarCharValue"])

    # Insert the data into the local PostgreSQL database
    insert_query = """
        INSERT INTO visits_per_country (country_code, total_visits)
        VALUES (%s, %s)
        ON CONFLICT (country_code)
        DO UPDATE SET total_visits = EXCLUDED.total_visits;
    """

    try:
        cursor.execute(insert_query, (country_code, total_visits))
        
    except Exception as e:
        print("Error occurred inserting into analytical DB: %s"% e)
        conn.rollback()  # Rollback the transaction if there's an error

# Commit the changes and close the cursor and connection outside the loop
conn.commit()
cursor.close()
conn.close()

Skipping row: {'Data': [{}, {'VarCharValue': '243775'}]}
