## Experiment with different thresholds (eg.45,60,75,90) and observe the effects
Join 'dimensional.f_aa_hits_detail_engagement’ to ‘dimensional.f_aa_engagement_by_site’ accurately in order to add information about if the visit has been marked as engaged so I can then recreate this column using the time on site column to check it matches up, before creating new has engagement columns using 45, 60, 75 and 90 seconds to observe the effect this has.
- dimensional.f_aa_hits_detail_engagement is per hit
- dimensional.f_aa_engagement_by_site is per visit - made up of multiple hits
- so need to group dimensional.f_aa_hits_detail_engagement into visits - by visit id, site_url and date
- then for each visit add post_evar_link_type (if none - null, if mutliple then a list), has_engagement (sum (as engagament sum) and then has engagement if 0,-> 0 and if >0 then 1), same for has_clickout

In [1]:
pip install snowflake-snowpark-python --quiet

Note: you may need to restart the kernel to use updated packages.


In [2]:
#connecting and reading to and from snowflake
import boto3
import snowflake.connector
from pathlib import Path
from snowflake.connector.pandas_tools import write_pandas
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
from snowflake.connector import DictCursor
import pandas as pd
from snowflake.snowpark import functions as fn
from snowflake.snowpark import types as type
from snowflake.snowpark import DataFrame as df

   
def get_snowflake_credentials():
    """ iTech credentials service for Sagemaker
   
    iTech has a service account for Sagemaker notebooks that has access permission to snowflake.
    We need to use this service account every time we need to build a connection, to obtain the right credentials.
    """
    params = [
        '/sagemaker/snowflake/user_id',
        '/sagemaker/snowflake/password',
        '/sagemaker/snowflake/account_id'
    ]
 
    sm = boto3.client('secretsmanager', "eu-west-1")
 
    param_values = {}
    for param in params:
        param_values[param.replace('/sagemaker/snowflake/','').replace('_id','')] = sm.get_secret_value(SecretId = param)['SecretString']
    return param_values

def create_snowflake_connection(schema='DIMENSIONAL'):
    """
    With the created connection object `conn` you can connect to Snowflake and obtain data with the following way:
 
    ```python
    with conn.cursor() as c:
        df = c.execute("SQL_QUERY_STRING_HERE").fetch_pandas_all()
    ```
    """
    credentials = get_snowflake_credentials()
    return snowflake.connector.connect(
        user=credentials['user'],
        password=credentials['password'],
        account=credentials['account'],
        role="PRD_ANALYST",
        warehouse='PRD_WH',
        database='PRD_DWH',
        schema=schema
    )

def download_from_snowflake(sql_query):
    with create_snowflake_connection().cursor(DictCursor) as cur:
            data = cur.execute(sql_query).fetchall()
            data = pd.DataFrame(data).rename(columns=str.lower)
    return data

def read_sql_query(sql_file):
    return sql_file.read_text()
 

def write_to_table(df, table_name):
    drop_sql = f'DROP TABLE IF EXISTS {table_name.upper()}'
    with create_snowflake_connection(schema='SANDBOX') as conn:
        conn.cursor().execute(drop_sql)
        write_pandas(conn, df, table_name.upper())

  warn_incompatible_dep(


In [3]:
import numpy as np

def is_outlier(points, thresh=3.5):
    """
    Returns a boolean array with True if points are outliers and False 
    otherwise.

    Parameters:
    -----------
        points : An numobservations by numdimensions array of observations
        thresh : The modified z-score to use as a threshold. Observations with
            a modified z-score (based on the median absolute deviation) greater
            than this value will be classified as outliers.

    Returns:
    --------
        mask : A numobservations-length boolean array.

    References:
    ----------
        Boris Iglewicz and David Hoaglin (1993), "Volume 16: How to Detect and
        Handle Outliers", The ASQC Basic References in Quality Control:
        Statistical Techniques, Edward F. Mykytka, Ph.D., Editor. 
    """
    if len(points.shape) == 1:
        points = points[:,None]
    median = np.median(points, axis=0)
    diff = np.sum((points - median)**2, axis=-1)
    diff = np.sqrt(diff)
    med_abs_deviation = np.median(diff)

    modified_z_score = 0.6745 * diff / med_abs_deviation

    return modified_z_score > thresh

In [4]:
from snowflake.snowpark import Session

credentials = get_snowflake_credentials()
connection_parameters = {
    "account": str(credentials['account']),
    "user": str(credentials['user']),
    "password" : str(credentials['password']),
    "role": "PRD_ANALYST",
    "warehouse": "PRD_WH",
    "database": "PRD_DWH",
    "schema": "SANDBOX"
}

test_session = Session.builder.configs(connection_parameters).create()
#test_session.sql('select current_date, current_warehouse(), current_database(), current_schema()').collect()
#test_session.close()

In [5]:
f_aa_engagement_by_site = test_session.table('dimensional.f_aa_engagement_by_site')

In [6]:
f_aa_hits_detail_engagement = test_session.table('dimensional.f_aa_hits_detail_engagement')

In [7]:
f_aa_hits_detail_engagement.columns

['VISIT_DATE_UTC',
 'D_SITE_HIST_SK',
 'SITE_URL',
 'VISIT_DATE_TIME_UTC',
 'VISITOR_ID',
 'VISIT_NUM',
 'VISIT_ID',
 'DEVICE_TYPE',
 'CONNECTION_TYPE',
 'D_USER_LANGUAGE_SK',
 'D_COUNTRY_HIST_SK',
 'STATE_CODE',
 'VISIT_REFERRER_TYPE',
 'VISIT_SEARCH_ENGINE',
 'VISIT_START_PAGE_URL',
 'VISIT_START_D_PAGE_HIST_SK',
 'HIT_TIME_UTC',
 'PAGE_URL',
 'D_PAGE_HIST_SK',
 'VISIT_PAGE_NUM',
 'IS_LANDING_PAGE',
 'HAS_CLICKOUT',
 'HAS_ENGAGEMENT',
 'OUTBOUND_LINK',
 'PAGE_VERTICAL',
 'CHANNEL_NAME',
 'TRANSACTION_ID']

In [23]:
grouped_hits_detail = f_aa_hits_detail_engagement.groupBy('VISIT_ID', 'VISIT_START_PAGE_URL', 'VISIT_DATE_UTC').agg(
    fn.array_agg("OUTBOUND_LINK").alias("OUTBOUND_LINK_values"),
    fn.array_agg("HAS_ENGAGEMENT").alias("HAS_ENGAGEMENT_values"),
    fn.array_agg("HAS_CLICKOUT").alias("HAS_CLICKOUT_values"))


In [24]:
grouped_hits_detail.columns

['VISIT_ID',
 'VISIT_START_PAGE_URL',
 'VISIT_DATE_UTC',
 'OUTBOUND_LINK_VALUES',
 'HAS_ENGAGEMENT_VALUES',
 'HAS_CLICKOUT_VALUES']

In [None]:
grouped_hits_detail.show(5)

In [18]:
grouped_engagement_by_site = f_aa_engagement_by_site.groupBy('VISIT_ID', 'VISIT_START_PAGE_URL', 'VISIT_DATE_UTC').agg(
    fn.array_agg("DEVICE_TYPE").alias("DEVICE_TYPE_values"),
    fn.array_agg("D_COUNTRY_HIST_SK").alias("D_COUNTRY_HIST_SK_values"),
    fn.array_agg("TIME_ON_SITE_SECONDS").alias("TIME_ON_SITE_SECONDS_values"),
    fn.array_agg("CHANNEL_NAME").alias("CHANNEL_NAME_values"),
    fn.array_agg("HAS_PAGE_VIEW_POKER").alias("HAS_PAGE_VIEW_POKER_values"),
    fn.array_agg("HAS_PAGE_VIEW_CASINO").alias("HAS_PAGE_VIEW_CASINO_values"),
    fn.array_agg("HAS_PAGE_VIEW_SPORTS").alias("HAS_PAGE_VIEW_SPORTS_values"),
    )

In [19]:
grouped_engagement_by_site.columns

['VISIT_ID',
 'VISIT_START_PAGE_URL',
 'VISIT_DATE_UTC',
 'DEVICE_TYPE_VALUES',
 'D_COUNTRY_HIST_SK_VALUES',
 'TIME_ON_SITE_SECONDS_VALUES',
 'CHANNEL_NAME_VALUES',
 'HAS_PAGE_VIEW_POKER_VALUES',
 'HAS_PAGE_VIEW_CASINO_VALUES',
 'HAS_PAGE_VIEW_SPORTS_VALUES']

In [None]:
grouped_engagement_by_site.show(5)

In [30]:
result = grouped_engagement_by_site.join(
    grouped_hits_detail,
    (
        (grouped_engagement_by_site.VISIT_ID == grouped_hits_detail.VISIT_ID) &
        (grouped_engagement_by_site.VISIT_START_PAGE_URL == grouped_hits_detail.VISIT_START_PAGE_URL)&
        (grouped_engagement_by_site.VISIT_DATE_UTC == grouped_hits_detail.VISIT_DATE_UTC)
    ),
    how = 'left'
)

In [31]:
result.columns

['"l_x1z0_VISIT_ID"',
 '"l_x1z0_VISIT_START_PAGE_URL"',
 '"l_x1z0_VISIT_DATE_UTC"',
 'DEVICE_TYPE_VALUES',
 'D_COUNTRY_HIST_SK_VALUES',
 'TIME_ON_SITE_SECONDS_VALUES',
 'CHANNEL_NAME_VALUES',
 'HAS_PAGE_VIEW_POKER_VALUES',
 'HAS_PAGE_VIEW_CASINO_VALUES',
 'HAS_PAGE_VIEW_SPORTS_VALUES',
 '"r_3kfy_VISIT_ID"',
 '"r_3kfy_VISIT_START_PAGE_URL"',
 '"r_3kfy_VISIT_DATE_UTC"',
 'OUTBOUND_LINK_VALUES',
 'HAS_ENGAGEMENT_VALUES',
 'HAS_CLICKOUT_VALUES']

In [32]:
result.count()

347787470

In [28]:
grouped_engagement_by_site.count()

347787470

In [29]:
grouped_hits_detail.count()

347849791

In [33]:
#filter unmatched rows
unmatched_rows = result.filter(result["TIME_ON_SITE_SECONDS_VALUES"].isNull())

# count the number of unmatched rows
num_unmatched_rows = unmatched_rows.count()

# print the result
print(f"There were {num_unmatched_rows} rows that didn't match.")


There were 0 rows that didn't match.


In [None]:
result.show(5)

In [7]:
# has engagement calculation 
#CASE
        #WHEN has_clickout OR h.post_evar_link_type = 'Internal' OR NVL(TRY_TO_NUMBER(REPLACE(h.post_prop_time_on_page, '+', '')), 0) >= 30
          #THEN 1
        #ELSE 0
      #END AS has_engagement

In [8]:
#join the table and its columns below to the exisitng snowpark table

#post_evar_link_type
#has_engagement
#has_clickout
#post_prop_time_on_page

#join by url/site, visit id and date?

In [9]:
f_aa_hits_detail_engagement = test_session.table('dimensional.f_aa_hits_detail_engagement')

In [10]:
f_aa_hits_detail_engagement.columns

['VISIT_DATE_UTC',
 'D_SITE_HIST_SK',
 'SITE_URL',
 'VISIT_DATE_TIME_UTC',
 'VISITOR_ID',
 'VISIT_NUM',
 'VISIT_ID',
 'DEVICE_TYPE',
 'CONNECTION_TYPE',
 'D_USER_LANGUAGE_SK',
 'D_COUNTRY_HIST_SK',
 'STATE_CODE',
 'VISIT_REFERRER_TYPE',
 'VISIT_SEARCH_ENGINE',
 'VISIT_START_PAGE_URL',
 'VISIT_START_D_PAGE_HIST_SK',
 'HIT_TIME_UTC',
 'PAGE_URL',
 'D_PAGE_HIST_SK',
 'VISIT_PAGE_NUM',
 'IS_LANDING_PAGE',
 'HAS_CLICKOUT',
 'HAS_ENGAGEMENT',
 'OUTBOUND_LINK',
 'PAGE_VERTICAL',
 'CHANNEL_NAME',
 'TRANSACTION_ID']

In [11]:
f_aa_hits_detail_engagement_cols = f_aa_hits_detail_engagement.select('visit_id','VISIT_DATE_TIME_UTC','VISIT_START_PAGE_URL','OUTBOUND_LINK','has_engagement','has_clickout')

In [12]:
f_aa_hits_detail_engagement_cols.columns

['VISIT_ID',
 'VISIT_DATE_TIME_UTC',
 'VISIT_START_PAGE_URL',
 'OUTBOUND_LINK',
 'HAS_ENGAGEMENT',
 'HAS_CLICKOUT']

In [13]:
f_aa_engagement_by_site.columns

['VISIT_DATE_UTC',
 'D_SITE_HIST_SK',
 'VISIT_DATE_TIME_UTC',
 'VISIT_ID',
 'DEVICE_TYPE',
 'CONNECTION_TYPE',
 'D_COUNTRY_HIST_SK',
 'STATE_CODE',
 'D_USER_LANGUAGE_SK',
 'VISIT_REFERRER_TYPE',
 'VISIT_SEARCH_ENGINE',
 'TIME_ON_SITE_SECONDS',
 'SITE_VISIT_CONVERSION',
 'SITE_VISIT_UNQIUE_CLICKOUT',
 'SITE_VISIT_TOTAL_CLICKOUT',
 'SITE_VISIT_ENGAGEMENT',
 'SITE_VISIT_BOUNCE',
 'HAS_PAGE_VIEW_POKER',
 'SITE_VISIT_CONVERSION_POKER',
 'SITE_VISIT_UNIQUE_CLICKOUT_POKER',
 'SITE_VISIT_TOTAL_CLICKOUT_POKER',
 'SITE_VISIT_ENGAGEMENT_POKER',
 'SITE_VISIT_BOUNCE_POKER',
 'HAS_PAGE_VIEW_CASINO',
 'SITE_VISIT_CONVERSION_CASINO',
 'SITE_VISIT_UNIQUE_CLICKOUT_CASINO',
 'SITE_VISIT_TOTAL_CLICKOUT_CASINO',
 'SITE_VISIT_ENGAGEMENT_CASINO',
 'SITE_VISIT_BOUNCE_CASINO',
 'HAS_PAGE_VIEW_SPORTS',
 'SITE_VISIT_CONVERSION_SPORTS',
 'SITE_VISIT_UNIQUE_CLICKOUT_SPORTS',
 'SITE_VISIT_TOTAL_CLICKOUT_SPORTS',
 'SITE_VISIT_ENGAGEMENT_SPORTS',
 'SITE_VISIT_BOUNCE_SPORTS',
 'ETL_LOAD_TIME',
 'ETL_DAG_ID',
 'ETL_TAS

In [14]:
result = f_aa_hits_detail_engagement_cols.join(
    f_aa_engagement_by_site,
    (
        (f_aa_hits_detail_engagement_cols.VISIT_ID == f_aa_engagement_by_site.VISIT_ID) &
        (f_aa_hits_detail_engagement_cols.VISIT_START_PAGE_URL == f_aa_engagement_by_site.VISIT_START_PAGE_URL)&
        (f_aa_hits_detail_engagement_cols.VISIT_DATE_TIME_UTC == f_aa_engagement_by_site.VISIT_DATE_TIME_UTC)
    ),
    how = 'inner'
)


In [15]:
result.columns

['"l_4c3i_VISIT_ID"',
 '"l_4c3i_VISIT_DATE_TIME_UTC"',
 '"l_4c3i_VISIT_START_PAGE_URL"',
 'OUTBOUND_LINK',
 'HAS_ENGAGEMENT',
 'HAS_CLICKOUT',
 'VISIT_DATE_UTC',
 'D_SITE_HIST_SK',
 '"r_b7ig_VISIT_DATE_TIME_UTC"',
 '"r_b7ig_VISIT_ID"',
 'DEVICE_TYPE',
 'CONNECTION_TYPE',
 'D_COUNTRY_HIST_SK',
 'STATE_CODE',
 'D_USER_LANGUAGE_SK',
 'VISIT_REFERRER_TYPE',
 'VISIT_SEARCH_ENGINE',
 'TIME_ON_SITE_SECONDS',
 'SITE_VISIT_CONVERSION',
 'SITE_VISIT_UNQIUE_CLICKOUT',
 'SITE_VISIT_TOTAL_CLICKOUT',
 'SITE_VISIT_ENGAGEMENT',
 'SITE_VISIT_BOUNCE',
 'HAS_PAGE_VIEW_POKER',
 'SITE_VISIT_CONVERSION_POKER',
 'SITE_VISIT_UNIQUE_CLICKOUT_POKER',
 'SITE_VISIT_TOTAL_CLICKOUT_POKER',
 'SITE_VISIT_ENGAGEMENT_POKER',
 'SITE_VISIT_BOUNCE_POKER',
 'HAS_PAGE_VIEW_CASINO',
 'SITE_VISIT_CONVERSION_CASINO',
 'SITE_VISIT_UNIQUE_CLICKOUT_CASINO',
 'SITE_VISIT_TOTAL_CLICKOUT_CASINO',
 'SITE_VISIT_ENGAGEMENT_CASINO',
 'SITE_VISIT_BOUNCE_CASINO',
 'HAS_PAGE_VIEW_SPORTS',
 'SITE_VISIT_CONVERSION_SPORTS',
 'SITE_VISIT_UNIQU

In [16]:
f_aa_hits_detail_engagement_cols.count()

8822887396

In [17]:
f_aa_engagement_by_site.count()

345038879

In [18]:
result.count()

3802264787

In [None]:
#3802264787 without inner

In [33]:
#filter unmatched rows
unmatched_rows = result.filter(result["HAS_CLICKOUT"].isNull())

# count the number of unmatched rows
num_unmatched_rows = unmatched_rows.count()

# print the result
print(f"There were {num_unmatched_rows} rows that didn't match.")


There were 0 rows that didn't match.


In [41]:
result.select(f_aa_engagement_by_site['VISIT_ID'],f_aa_hits_detail_engagement_cols['VISIT_ID'],f_aa_engagement_by_site['VISIT_DATE_TIME_UTC'],f_aa_hits_detail_engagement_cols['VISIT_DATE_TIME_UTC'],f_aa_engagement_by_site['VISIT_START_PAGE_URL'],f_aa_hits_detail_engagement_cols['VISIT_START_PAGE_URL'],'OUTBOUND_LINK','has_engagement','has_clickout','TIME_ON_SITE_SECONDS').show(25)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"r_6z27_VISIT_ID"                                   |"l_sjb8_VISIT_ID"                                   |"r_6z27_VISIT_DATE_TIME_UTC"  |"l_sjb8_VISIT_DATE_TIME_UTC"  |"r_6z27_VISIT_START_PAGE_URL"                       |"l_sjb8_VISIT_START_PAGE_URL"                       |"OUTBOUND_LINK"   |"HAS_ENGAGEMENT"  |"HAS_CLICKOUT"  |"TIME_ON_SITE_SECONDS"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
result.count()

3802264787

In [None]:
345038879

In [21]:
notequal = result.filter(f_aa_engagement_by_site['VISIT_ID']!=f_aa_hits_detail_engagement_cols['VISIT_ID'])
notequal.count()

0