# Requirements

In [None]:
import boto3 
import botocore
from botocore.exceptions import BotoCoreError, ClientError, NoCredentialsError, PartialCredentialsError, ParamValidationError, WaiterError
import loguru
from loguru import logger
import os
import pandas as pd
import numpy as np
from datetime import timedelta
import statistics
import datetime
from datetime import timedelta
import plotly.express as px
import plotly.graph_objects as go

from scipy.stats import chi2_contingency


os.environ['AWS_ACCESS_KEY']=''    # replace with AWS creds
os.environ['AWS_ACCESS_SECRET']= ''   # replace with AWS creds

AWS_ACCESS_KEY_ID=os.environ['AWS_ACCESS_KEY']
AWS_SECRET_ACCESS_KEY=os.environ['AWS_ACCESS_SECRET']

# Functions

In [None]:

# ---------------------------------------
# FUNCTIONS
# ---------------------------------------

# FUNCTION TO EXECUTE ATHENA QUERY AND RETURN RESULTS
# ----------

def run_athena_query(query:str, database: str, region:str):

        
    # Initialize Athena client
    athena_client = boto3.client('athena', 
                                 region_name=region,
                                 aws_access_key_id=AWS_ACCESS_KEY_ID,
                                 aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

    # Execute the query
    try:
        response = athena_client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': database
            },
            ResultConfiguration={
                'OutputLocation': 's3://prymal-ops/athena_query_results/'  # Specify your S3 bucket for query results
            }
        )

        query_execution_id = response['QueryExecutionId']

        # Wait for the query to complete
        state = 'RUNNING'
        logger.info(f'Running query..')

        while (state in ['RUNNING', 'QUEUED']):
            response = athena_client.get_query_execution(QueryExecutionId = query_execution_id)
            
            if 'QueryExecution' in response and 'Status' in response['QueryExecution'] and 'State' in response['QueryExecution']['Status']:
                # Get currentstate
                state = response['QueryExecution']['Status']['State']

                if state == 'FAILED':
                    logger.error('Query Failed!')
                elif state == 'SUCCEEDED':
                    logger.info('Query Succeeded!')
            

        # OBTAIN DATA

        # --------------



        query_results = athena_client.get_query_results(QueryExecutionId=query_execution_id,
                                                MaxResults= 1000)
        


        # Extract qury result column names into a list  

        cols = query_results['ResultSet']['ResultSetMetadata']['ColumnInfo']
        col_names = [col['Name'] for col in cols]



        # Extract query result data rows
        data_rows = query_results['ResultSet']['Rows'][1:]



        # Convert data rows into a list of lists
        query_results_data = [[r['VarCharValue'] if 'VarCharValue' in r else np.NaN for r in row['Data']] for row in data_rows]



        # Paginate Results if necessary
        while 'NextToken' in query_results:
                query_results = athena_client.get_query_results(QueryExecutionId=query_execution_id,
                                                NextToken=query_results['NextToken'],
                                                MaxResults= 1000)



                # Extract quuery result data rows
                data_rows = query_results['ResultSet']['Rows'][1:]


                # Convert data rows into a list of lists
                query_results_data.extend([[r['VarCharValue'] if 'VarCharValue' in r else np.NaN for r in row['Data']] for row in data_rows])



        results_df = pd.DataFrame(query_results_data, columns = col_names)
        
        return results_df


    except ParamValidationError as e:
        logger.error(f"Validation Error (potential SQL query issue): {e}")
        # Handle invalid parameters in the request, such as an invalid SQL query

    except WaiterError as e:
        logger.error(f"Waiter Error: {e}")
        # Handle errors related to waiting for query execution

    except ClientError as e:
        error_code = e.response['Error']['Code']
        error_message = e.response['Error']['Message']
        
        if error_code == 'InvalidRequestException':
            logger.error(f"Invalid Request Exception: {error_message}")
            # Handle issues with the Athena request, such as invalid SQL syntax
            
        elif error_code == 'ResourceNotFoundException':
            logger.error(f"Resource Not Found Exception: {error_message}")
            # Handle cases where the database or query execution does not exist
            
        elif error_code == 'AccessDeniedException':
            logger.error(f"Access Denied Exception: {error_message}")
            # Handle cases where the IAM role does not have sufficient permissions
            
        else:
            logger.error(f"Athena Error: {error_code} - {error_message}")
            # Handle other Athena-related errors

    except Exception as e:
        logger.error(f"Other Exception: {str(e)}")
        # Handle any other unexpected exceptions





# Query shopify line item data from data lake

In [None]:

DATABASE = 'prymal-analytics'
REGION = 'us-east-1'

# QUERY ATHENA
# ------------------------------------------------------------------------------

QUERY = f"""SELECT a.*
            , b.sku_name
            , b.product_category
            , b.product_type
            FROM "prymal"."shopify_line_items"  a
            LEFT JOIN "prymal"."skus_shopify" b
            ON a.sku = b.sku 
            

            """

# Query datalake to get quantiy sold per sku for the last 120 days
# ----

result_df = run_athena_query(query=QUERY, database=DATABASE, region=REGION)


# Format Data & Calculate Retention Rate by Cohort Month

#### Set Hero SKU(s)

In [None]:
# Set Hero SKU (or sku to compare users that did / did not have in their first order)
hero_sku = ['Variety Pack - Kickstart']

#### Group by whether first order contained hero SKU(s), calculate retention rate by first order month cohort

In [None]:
# Copy of athena query results
data = result_df.copy()

# Convert 'order_date' to datetime
data['order_date'] = pd.to_datetime(data['order_date'].apply(lambda x: x[:11]))   # for cases where order_date is datatime, only use first 11 characters (YYYY-MM-DD)


# Identify First Orders
first_order = data.groupby('email',as_index=False)['order_date'].min()
first_order.columns = ['email', 'first_order_date']

first_order['cohort_month'] = pd.to_datetime(first_order['first_order_date']).dt.strftime('%Y-%m')

print(f'Length of first_order: {len(first_order)} ')

# Merge this information back to the main data
merged_data = pd.merge(data, first_order, on='email')

# Classify Customers Based on Hero SKU
merged_data['first_order_contains_hero_sku'] = False
merged_data.loc[(merged_data['product_type'].isin(hero_sku))&(merged_data['order_date']==merged_data['first_order_date']),'first_order_contains_hero_sku'] = True


# Identify Repeat Purchases
customer_purchases = merged_data.groupby('email').apply(
    lambda x: (x['order_date'] > x['first_order_date']).any()).reset_index()
customer_purchases.columns = ['email', 'made_subsequent_purchase']



# Prepare a DataFrame to store the results
retention_rate_by_cohort_df = pd.DataFrame(columns=['Cohort', 'Retention Rate', 'Group'])

# Calculate retention rate for each cohort's customers
for month in merged_data['cohort_month'].unique():
    period_data = merged_data[merged_data['cohort_month'] == month]

    # Repeat the classification and retention rate calculation for the period
    first_order_period = period_data.groupby('email')['first_order_date'].min().reset_index()
    customer_classification_period = pd.merge(first_order_period, customer_purchases, on='email')
    customer_classification_period = pd.merge(customer_classification_period, 
                                              period_data[['email', 'first_order_contains_hero_sku']].drop_duplicates(), 
                                              on='email')

    # Calculate retention rate for customers who had hero sku in first order 
    retention_rate_hero_sku = customer_classification_period[
        customer_classification_period['first_order_contains_hero_sku']]['made_subsequent_purchase'].mean()
    
    # Calculate retention rate for customers who did not have hero sku in first order 
    retention_rate_no_hero_sku = customer_classification_period[
        ~customer_classification_period['first_order_contains_hero_sku']]['made_subsequent_purchase'].mean()

    # Append results to DataFrame
    retention_rate_by_cohort_df = retention_rate_by_cohort_df.append({'Cohort': month, 'Retention Rate': retention_rate_hero_sku, 'Group': 'Hero SKU'}, ignore_index=True)
    retention_rate_by_cohort_df = retention_rate_by_cohort_df.append({'Cohort': month, 'Retention Rate': retention_rate_no_hero_sku, 'Group': 'No Hero SKU'}, ignore_index=True)

# Convert 'Period' to string for Plotly
retention_rate_by_cohort_df['Cohort'] = pd.to_datetime(retention_rate_by_cohort_df['Cohort']).dt.strftime('%Y-%m')

# Plot Retention Rate by Cohort Month

#### For Kickstart Bundle Specifically, split into old kickstarter bundle and new (sachet) kickstarter bundle based on order date - new sachet kickstarter bundle replaced old kickstarter bundle 2022-12-01

In [None]:
# Months < 2022-12 are old kickstarter bundle, > 2022-12 are sachet kickstarter bundle
retention_rate_by_cohort_df.loc[(retention_rate_by_cohort_df['Group']=='Hero SKU')&(pd.to_datetime(retention_rate_by_cohort_df['Cohort'])<pd.to_datetime('2022-11-30')),'Group'] = 'Hero SKU - Old Kickstarter Bundle'
retention_rate_by_cohort_df.loc[(retention_rate_by_cohort_df['Group']=='Hero SKU'),'Group'] = 'Hero SKU - Sachet Kickstarter Bundle'

#### Plot observed retention rate by cohort over time

In [None]:
# Create the Plotly Express line plot
fig = px.line(retention_rate_by_cohort_df.sort_values('Cohort',ascending=True), x='Cohort', y='Retention Rate', color='Group', title='First Time Customer Retention Rate Over Time')

fig.update_xaxes(type='category',title_text='Cohort Month')

# Update y-axis to display as percentage
fig.update_layout(yaxis_tickformat = '.0%')

fig.show()

# Check for statistical significane

#### Format data for chi-square test

In [None]:
# Compile customer-level classification
customer_classification = pd.merge(first_order, customer_purchases, on='email')
customer_classification = pd.merge(customer_classification, 
                                   merged_data[['email', 'first_order_contains_hero_sku']].drop_duplicates(), 
                                   on='email')

# Subset to just 2023 (excluding 2024 through February to avoid customers who haven't had enough time to repeat)
customer_classification_2023 = customer_classification.loc[(customer_classification['first_order_date']>pd.to_datetime('2023-01-01'))&(customer_classification['first_order_date']<pd.to_datetime('2024-01-01'))].copy()

# Months < 2022-12 are old kickstarter bundle, > 2022-12 are sachet kickstarter bundle
customer_classification_2023.loc[(customer_classification_2023['first_order_contains_hero_sku']==False),'Group'] = 'No Hero SKU'
customer_classification_2023.loc[(customer_classification_2023['first_order_contains_hero_sku']==True) & (pd.to_datetime(customer_classification_2023['first_order_date'])< pd.to_datetime('2022-12-01')),'Group'] = 'Hero SKU - Old Kickstarter Bundle'
customer_classification_2023.loc[(customer_classification_2023['first_order_contains_hero_sku']==True)& (pd.to_datetime(customer_classification_2023['first_order_date'])>= pd.to_datetime('2022-12-01')),'Group'] = 'Hero SKU - Sachet Kickstarter Bundle'

# Aggregate results for each group
results_by_group = customer_classification_2023.groupby(['Group','made_subsequent_purchase'],as_index=False)['email'].nunique()

# Pivot the data to create a contingency table
contingency_table = results_by_group.pivot_table(index='Group', columns='made_subsequent_purchase', values='email', aggfunc='sum').reset_index()

# Drop the 'Group' column to match the format needed for chi2_contingency
contingency_matrix = contingency_table.drop('Group', axis=1).values

#### Chi-square test for independence

In [None]:



hero_sku_group_rate = contingency_matrix[0][1] / contingency_matrix[0][0]
no_hero_sku_group_rate = contingency_matrix[1][1] / contingency_matrix[1][0]

print(f"Hero SKU population observed retention rate: {hero_sku_group_rate}")
print(f"No-Hero SKU population observed retention rate: {no_hero_sku_group_rate}")

# Perform the Chi-square test
chi2, p, dof, expected = chi2_contingency(contingency_matrix)

print(f"Chi-square value: {chi2}")
print(f"P-value: {p}")
print(f"Degrees of freedom: {dof}")
print(f"Expected frequencies: \n{expected}")

# Interpret the p-value
alpha = 0.05  # significance level
if p < alpha:
    print("There is a significant association between the group and retention status (reject H0).")
else:
    print("There is no significant association between the group and retention status (fail to reject H0).")