In [1]:
import boto3
import datetime as dt
import json
import numpy as np
import pandas as pd
import snowflake.connector
pd.set_option('display.max_rows', 1000)
pd.set_option('display.max_columns', 1000)
pd.options.display.float_format = '{:,.2f}'.format

In [2]:
from abc import ABCMeta, abstractmethod

class Credentials(metaclass=ABCMeta):
    pass
    
    
class SSMPSCredentials(Credentials):
    def __init__(self, secretid: str):
        self._secretid = secretid
        self._secrets = {}
        
    def get_keys(self):
        """
        credential fetching 
        """
        _aws_sm_args = {'service_name': 'secretsmanager', 'region_name': 'us-east-1'}
        secrets_client = boto3.client(**_aws_sm_args)
        get_secret_value_response = secrets_client.get_secret_value(SecretId=self._secretid)
        return get_secret_value_response
    
    
class BaseConnector(metaclass=ABCMeta):
    @abstractmethod
    def connect(self):
        raise NotImplementedError
    

class SnowflakeConnector(BaseConnector):
    def __init__(self, credentials: Credentials):
        keys = credentials.get_keys()
        self._secrets = json.loads(keys.get('SecretString', "{}"))

    def connect(self, dbname: str, schema: str = 'DEFAULT'):
        ctx = snowflake.connector.connect(
            user=self._secrets['login_name'],
            password=self._secrets['login_password'],
            account=self._secrets['account'],
            warehouse=self._secrets['warehouse'],
            database=dbname,
            schema=schema
        )

        return ctx
    
## Credentials
SF_CREDS = 'datascience-max-dev-sagemaker-notebooks'

## Snowflake connection 
conn=SnowflakeConnector(SSMPSCredentials(SF_CREDS))
ctx=conn.connect("MAX_DEV","WORKSPACE")

def run_query(query):
    cursor = ctx.cursor()
    cursor.execute(query)
    df = pd.DataFrame(cursor.fetchall(), columns = [desc[0] for desc in cursor.description])
    df.columns= df.columns.str.lower()
    return df



In [3]:
## WRITE OR READ FROM S3 ####
import boto3
import io

s3 = boto3.resource('s3')
output_bucket = "hbo-outbound-datascience-content-dev"
input_bucket = "hbo-ingest-datascience-content-dev"
bucket = s3.Bucket(input_bucket)

def write_to_sf(df, file_name):
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index = False)
    content = csv_buffer.getvalue()
    filename = 'title_hours_viewed_retention/{}.csv'.format(file_name)
    client = boto3.client('s3')
    client.put_object(Bucket=output_bucket, Key=filename, Body=content)
    
def write_to_input(df, file_name):
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index = False)
    content = csv_buffer.getvalue()
    filename = 'title_hours_viewed_retention/{}.csv'.format(file_name)
    client = boto3.client('s3')
    client.put_object(Bucket=input_bucket, Key=filename, Body=content)
    
def read_from_s3(filename, input_bucket = input_bucket):
    for obj in bucket.objects.filter(Prefix='title_hours_viewed_retention/'+filename): #churn_metric_0811
            key = obj.key 
            body = obj.get()['Body']
            print('Reading {0} features'.format(key))
            df = pd.read_csv(body, na_values = [r'\\\\N'])
    return df



In [6]:
user_stream_60d_genpop = run_query('''
with subs as(
select c.user_id, c.hurley_user_id, up.profile_id
, is_cancel, sub_month, is_voluntary
, cycle_start_date
, cycle_expire_date
from max_dev.workspace.user_retain_churn_list_test_wbd_max c
LEFT join MAX_PROD.SUBSCRIPTION_GOLD.MAX_PROFILE_DIM_CURRENT up
    on c.user_id = up.USER_ID
where up.default_profile_ind = True
and cycle_expire_date between '2024-01-01' and '2024-01-31'
LIMIT 3000000
)

, streaming_subset as
(
select
      ss.user_id
    , ss.hurley_user_id
    , ss.profile_id
    , ss.is_cancel
    , is_voluntary
    , ss.sub_month
    , hb.PROGRAM_ID_OR_VIEWABLE_ID as ckg_program_id
    , hb.CONTENT_MINUTES_WATCHED/3600 as hours_viewed
from subs ss
left join max_prod.content_analytics.combined_video_stream hb
    on hb.WBD_MAX_PROFILE_ID = ss.profile_id
where DATEDIFF('day', hb.request_date_pst, ss.cycle_expire_date) <= 60
    and hb.request_date_pst between '2023-11-01' and '2024-01-31'
    and hb.PROGRAM_ID_OR_VIEWABLE_ID IS NOT NULL 
    and hb.CONTENT_MINUTES_WATCHED >= 15
    and hb.video_type = 'main'
    and hb.territory = 'HBO MAX DOMESTIC'
)

select
      s.user_id
    , s.profile_id
    , s.is_cancel
    , is_voluntary
    , s.sub_month
    , rad.program_type as program_type
    , rad.content_category as content_category
    , rad.REPORTING_PRIMARY_GENRE as genre
    , case when datediff('day', rad.air_date, '2023-01-01') >=365 
        then 'library' else 'current' end as old_new
    , sum(s.hours_viewed) as hours_viewed
    , count(distinct rad.ckg_series_id) as titles_viewed
from streaming_subset s
left join INT_DAI_PROD_SHARE.CONTENT_METADATA_GOLD.REPORTING_ASSET_DIM_COMBINED rad
     on s.ckg_program_id = rad.ckg_program_id
where rad.asset_type!='PROMOTION'
group by 1,2,3,4,5,6,7,8,9                                                                 
''')

In [7]:
write_to_sf(user_stream_60d_genpop, 'churn_user_stream60d_genpop_20240101.csv')