In [1]:
import boto3
import botocore
import os
import psycopg2
from dotenv import load_dotenv #pip install python-dotenv
from psycopg2 import connect, sql
from os import environ as env
import pandas as pd

load_dotenv()
conn_string = os.getenv('conn_string')
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')



if 'conn_string' in env:
    print(env['conn_string'][:35])

dbname='etl_bites' user='joemiller'


## 05 Challenge

Find the most visited URL per country per day during a week of your choice (e.g. 2018-04-01 until 2018-04-08).

For the data about the browser, we are going to assume the right column is event_url


Connect to Amazon S3 Bucket using boto3

In [47]:
import boto3
import time
import psycopg2


# Set up the Athena client
athena_client = boto3.client(
    'athena',
    region_name='eu-west-2',
    aws_access_key_id= AWS_ACCESS_KEY,
    aws_secret_access_key= AWS_SECRET_KEY,
    )

# Write the SQL query
sql_query = """
SELECT
  t.server_request_country_code as country_code,
  t.event_url,
  t.day,
  t.num_visits
FROM
  (
    SELECT
      event_url,
      server_request_country_code,
      day,
      COUNT(event_url) AS num_visits,
      ROW_NUMBER() OVER (PARTITION BY day, server_request_country_code ORDER BY COUNT(event_url) DESC) AS rn
    FROM
      vod_clickstream
    WHERE
      datetime >= CAST('2018-04-01' AS timestamp) AND datetime < CAST('2018-04-08' AS timestamp)
    GROUP BY
      event_url, server_request_country_code, day
  ) t
WHERE
  t.rn = 1
ORDER BY
  t.num_visits DESC;
"""

# Execute the Athena query
query_execution = athena_client.start_query_execution(
    QueryString=sql_query,
    QueryExecutionContext={
        "Database": "joe-athena_parquet"
    },
    ResultConfiguration={
        "OutputLocation": "s3://athena-learners-etl-bite05/joe" # <= This will be different for you, refer to the Amazon Athena pill for more information.
    }
)

# Poll the query status until it is either successful or failed
query_status = "QUEUED"
query_execution_id = query_execution["QueryExecutionId"]

while query_status in ["QUEUED", "RUNNING"]:
    query_execution = athena_client.get_query_execution(
        QueryExecutionId=query_execution_id
    )
    query_status = query_execution["QueryExecution"]["Status"]["State"]
    if query_status == "FAILED":
        raise Exception("Athena query failed!")
    time.sleep(1)



# Get Results

results = []
# Geti initial set of results (first page)
query_results = athena_client.get_query_results(QueryExecutionId=query_execution_id)

# add results to the results list
results.extend(query_results["ResultSet"]["Rows"])

# Check for pagination, if NextToken in query_results then there is another page of results
if "NextToken" in query_results:
        next_token = query_results["NextToken"]

while "NextToken" in query_results:
    # Retrieve the query results with pagination
    query_results = athena_client.get_query_results(
        QueryExecutionId=query_execution_id,
        NextToken=next_token
        )

    # Add the current set of results to the overall results
    results.extend(query_results["ResultSet"]["Rows"])

    # Check if there are more results to retrieve
    if "NextToken" in query_results:
        next_token = query_results["NextToken"]
    else:
        break



In [13]:
len(results)

1387

Convert the results to a Pandas DataFrame

In [39]:
# Process the query results
results_df = pd.DataFrame(columns=["country_code", "event_url", "day", "num_visits"])

for row in results[1:]:
    
    if not row["Data"][0]:
        # Check for missing country codes and if missing skip row
        print(f"Skipping row: {row}")
        continue
    country_code = row["Data"][0]["VarCharValue"]
    event_url = row["Data"][1]["VarCharValue"]
    day = int(row["Data"][2]["VarCharValue"])
    num_visits = int(row["Data"][3]["VarCharValue"])

    values = pd.DataFrame({
        "country_code" : [country_code], 
        "event_url": [event_url], 
        "day": [day],
        "num_visits": [num_visits]
              })
    
    results_df = pd.concat([results_df, values], ignore_index=True)

results_df


Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '02'}, {'VarCharValue': '1311'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '01'}, {'VarCharValue': '1226'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '03'}, {'VarCharValue': '1096'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '04'}, {'VarCharValue': '970'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '06'}, {'VarCharValue': '820'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '05'}, {'VarCharValue': '815'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '07'}, {'VarCharValue': '770'}]}


Unnamed: 0,country_code,event_url,day,num_visits
0,US,https://www.netflix.com/browse,7,39973
1,US,https://www.netflix.com/browse,2,39847
2,US,https://www.netflix.com/browse,1,38516
3,US,https://www.netflix.com/browse,6,38056
4,US,https://www.netflix.com/browse,3,37753
...,...,...,...,...
1374,BQ,https://www.netflix.com/watch/70196273?trackId...,1,1
1375,SC,https://www.netflix.com/ca/,6,1
1376,NE,https://www.netflix.com/watch/70213488?trackId...,5,1
1377,AS,https://www.netflix.com/,3,1


In [55]:
sorted_df = results_df.sort_values(['day', 'num_visits'], ascending=[True, False])
sorted_df.head(20)

Unnamed: 0,country_code,event_url,day,num_visits
2,US,https://www.netflix.com/browse,1,38516
7,CA,https://www.netflix.com/browse,1,10237
16,GB,https://www.netflix.com/browse,1,5570
17,BR,https://www.netflix.com/browse,1,5500
32,AU,https://www.netflix.com/browse,1,3898
35,MX,https://www.netflix.com/browse,1,3787
47,DE,https://www.netflix.com/browse,1,2544
48,AR,https://www.netflix.com/browse,1,2542
56,IN,https://www.netflix.com/browse,1,2137
61,SE,https://www.netflix.com/browse,1,1864


In [57]:
results_df.loc[results_df['day']==1].head(20)

Unnamed: 0,country_code,event_url,day,num_visits
2,US,https://www.netflix.com/browse,1,38516
7,CA,https://www.netflix.com/browse,1,10237
16,GB,https://www.netflix.com/browse,1,5570
17,BR,https://www.netflix.com/browse,1,5500
32,AU,https://www.netflix.com/browse,1,3898
35,MX,https://www.netflix.com/browse,1,3787
47,DE,https://www.netflix.com/browse,1,2544
48,AR,https://www.netflix.com/browse,1,2542
56,IN,https://www.netflix.com/browse,1,2137
61,SE,https://www.netflix.com/browse,1,1864


### LOAD

In [62]:
# Connect to the local Postgres database
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()

# Create the table if it doesn't exist
cursor.execute("""
    DROP TABLE IF EXISTS netflix_url_visits CASCADE;
    CREATE TABLE IF NOT EXISTS netflix_url_visits (
        country_code VARCHAR(2),
        event_url VARCHAR,
        day INTEGER,
        num_visits INTEGER
    );
""")

conn.commit()
            
# Process the query results
for row in results[1:]:
    if not row["Data"][0] or not row["Data"][1]:
        # You could also print what a `row` has if you are curious!
        print(f"Skipping row: {row}")
        continue

    country_code = row["Data"][0]["VarCharValue"]
    event_url = row["Data"][1]["VarCharValue"]
    day = int(row["Data"][2]["VarCharValue"])
    num_visits = int(row["Data"][3]["VarCharValue"])

    # Insert the data into the local PostgreSQL database
    insert_query = """
        INSERT INTO netflix_url_visits (country_code, event_url, day, num_visits)
        VALUES (%s, %s, %s, %s);
    """

    try:
        cursor.execute(insert_query, (country_code, event_url, day, num_visits))
        
    except Exception as e:
        print("Error occurred inserting into analytical DB: %s"% e)
        conn.rollback()  # Rollback the transaction if there's an error

# Commit the changes and close the cursor and connection outside the loop
conn.commit()
cursor.close()
conn.close()

Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '02'}, {'VarCharValue': '1311'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '01'}, {'VarCharValue': '1226'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '03'}, {'VarCharValue': '1096'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '04'}, {'VarCharValue': '970'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '06'}, {'VarCharValue': '820'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '05'}, {'VarCharValue': '815'}]}
Skipping row: {'Data': [{}, {'VarCharValue': 'https://www.netflix.com/browse'}, {'VarCharValue': '07'}, {'VarCharValue': '770'}]}


In [63]:
%load_ext sql

In [64]:
%sql postgresql+psycopg2://joemiller:@localhost:5432/etl_bites

In [67]:
%%sql

SELECT *
FROM netflix_url_visits
LIMIT 30;

 * postgresql+psycopg2://joemiller:***@localhost:5432/etl_bites
30 rows affected.


country_code,event_url,day,num_visits
US,https://www.netflix.com/browse,7,39973
US,https://www.netflix.com/browse,2,39847
US,https://www.netflix.com/browse,1,38516
US,https://www.netflix.com/browse,6,38056
US,https://www.netflix.com/browse,3,37753
US,https://www.netflix.com/browse,5,37124
US,https://www.netflix.com/browse,4,37041
CA,https://www.netflix.com/browse,1,10237
CA,https://www.netflix.com/browse,7,10085
CA,https://www.netflix.com/browse,2,10045
