# PP S3 Query Demo
* StellarAlgo Data Science
* Peter Morrison
* June 27, 2022

This notebook demonstrates 3 ways to query S3

To read these from Legacy environment you will need to asssume the Explore-US role. See an example of assuming the Legacy role here https://github.com/stellaralgo/data-sci-retention/blob/main/lambdas/inference/post_process_scores/post_process_scores.py#L43

In [2]:
import boto3
import pandas as pd
import pyodbc
import psycopg2

from pycaret.clustering import *

In [None]:
IAM_ROLE = ""

configs = [
    {
        "name": "galaxy"
    },
]

date = datetime.date.today

In [None]:
# This lets you assume the Explore-US Role

role_arn = "" # will need to add role_arn for Explore-US

client_sts = boto3.client("sts")

sts_response = client_sts.assume_role(
    RoleArn=role_arn,
    RoleSessionName="ds_session",
)

ds_session_id = sts_response["Credentials"]["AccessKeyId"]
ds_session_key = sts_response["Credentials"]["SecretAccessKey"]
ds_session_token = sts_response["Credentials"]["SessionToken"]

boto3.setup_default_session(
    region_name="us-east-1",
    aws_access_key_id=ds_session_id,
    aws_secret_access_key=ds_session_key,
    aws_session_token=ds_session_token,
)


## 1. Querying the S3 Bucket to get a Dataframe of the Scores
Pros
* Allows you to manipulate data before writing to Redshift

Cons
* Have to handle querying the folder structure by date and team name
* Have to handle and maintain the list of team names (like if a team is added)
* Poor performance

In [None]:

for config in configs:
    url = f"s3://explore-us-curated-data-sci-product-propensity-us-east-1-u8gldf/product-propensity-scores/date={date}/{config['name']}.parquet"

    df = pd.read_parquet(url)
    # have to write to redshift
    


## 2. Redshift Copy
Pros
* Writes directly to Redshift
* Good performance

Cons
* Have to handle querying the folder structure by date and team name
* Have to handle and maintain the list of team names (like if a team is added)
* Can't do data manipulations before pushing to Redshift

In [None]:
dbname = ""
host = ""
user = ""
password = ""

conn = psycopg2.connect(dbname=dbname, host=host, port='5439', user=user, password=password)
cur = conn.cursor()

# Begin your transaction
cur.execute("begin;")

for config in configs:
    s3_path = f"s3://explore-us-curated-data-sci-product-propensity-us-east-1-u8gldf/product-propensity-scores/date={date}/{config['name']}.parquet"
    cur.execute(f"copy score_data from '{s3_path}' IAM_ROLE '{IAM_ROLE}' parquet;")


# Commit your transaction
cur.execute("commit;")
print("Copy executed fine!")

## 3. Athena Interface
Pros
* Don't have to handle querying the folder structure by date and team name
* Don't have to handle and maintain the list of team names (like if a team is added)
  * Both of the above are done by Athena
* Better performance than option 1
* Allows you to manipulate data like option 1

Cons
* Can't do data manipulations before pushing to Redshift


In [None]:


client = boto3.client("athena")

# This will read the data
query_start = client.start_query_execution(
    QueryString = "Select * FROM product_propensity_scores",
    QueryExecutionContext = {
        "Database": "data-sci"
    }
)

# To get the status of the query:
query_execution = client.get_query_execution(QueryExecutionId=query_start['QueryExecutionId'])

# Waits until the query succeeds. Would need a FAILED and CANCELLED check.
while query_execution.Status.State != "SUCCEEDED":
    time.sleep(15)

# To get the data in Lambda. Specify the MaxResults for number of rows per page.
results = client.get_query_results(QueryExecutionId=query_start['QueryExecutionId'], MaxResults=1000000)

next_token = results["NextToken"]
while next_token:
    results = client.get_query_results(QueryExecutionId=query_start['QueryExecutionId'], MaxResults=1000000, NextToken=next_token)
    next_token = results["NextToken"]
    # Have to write results to Redshift