In [28]:
##nodejs:  https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/setting-up-node-on-ec2-instance.html

# !pip install "jupyterlab>=3" "ipywidgets>=7.6"
# !pip install jupyter-dash
# !jupyter lab build


# !pip install snowflake --user
# !pip install snowflake-connector-python --user
# !pip install category_encoders
# !pip install xgboost
# !pip install lightgbm --user
import os
import sys
path=!pwd
sys.path.append(os.path.join(path[0], '..'))
sys.path.append('/home/ec2-user/SageMaker/jupyter-notebooks/')
from utils import *
import snowflake.connector
from datetime import timedelta
import json
from abc import ABCMeta, abstractmethod
import boto3
import logging 

logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)

from category_encoders import OneHotEncoder
import xgboost as xgb
# import lightgbm as lgbm
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error as MSE
from sklearn.metrics import mean_absolute_percentage_error as MAPE
from sklearn.metrics import mean_absolute_error as MAE
from sklearn.metrics import r2_score as r2_score
import sklearn.model_selection

class Credentials(metaclass=ABCMeta):
    pass
    
    
class SSMPSCredentials(Credentials):
    def __init__(self, secretid: str):
        self._secretid = secretid
        self._secrets = {}
        
    def get_keys(self):
        """
        credential fetching 
        """
        _aws_sm_args = {'service_name': 'secretsmanager', 'region_name': 'us-east-1'}
        secrets_client = boto3.client(**_aws_sm_args)
        get_secret_value_response = secrets_client.get_secret_value(SecretId=self._secretid)
        return get_secret_value_response
    
    
class BaseConnector(metaclass=ABCMeta):
    @abstractmethod
    def connect(self):
        raise NotImplementedError
        

class SnowflakeConnector(BaseConnector):
    def __init__(self, credentials: Credentials):
        keys = credentials.get_keys()
        self._secrets = json.loads(keys.get('SecretString', "{}"))

    def connect(self, dbname: str, schema: str = 'DEFAULT'):
        ctx = snowflake.connector.connect(
            user=self._secrets['login_name'],
            password=self._secrets['login_password'],
            account=self._secrets['account'],
            warehouse=self._secrets['warehouse'],
            database=dbname,
            schema=schema
        )

        return ctx


def run_query(query, dbname, schema):
    SF_CREDS = 'datascience-max-dev-sagemaker-notebooks'

    conn=SnowflakeConnector(SSMPSCredentials(SF_CREDS))
    ctx=conn.connect(dbname,schema)
    cursor = ctx.cursor()
    cursor.execute(query)
    df = pd.DataFrame(cursor.fetchall(), columns = [desc[0] for desc in cursor.description])
    df.columns= df.columns.str.lower()
    return df    
## Credentials
SF_CREDS = 'datascience-max-dev-sagemaker-notebooks'

## Snowflake connection 
conn=SnowflakeConnector(SSMPSCredentials(SF_CREDS))
ctx=conn.connect("MAX_PROD","DATASCIENCE_STAGE")
cur = ctx.cursor()

def cvdf_to_snowflake(df, table_name):
    stage = '@HBO_OUTBOUND_DATASCIENCE_CONTENT_DEV'
    output_bucket = "hbo-outbound-datascience-content-dev"
    filename ='psi/' + table_name + '.csv'
    dbname, schema = 'MAX_DEV', 'workspace'
    
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index = False)
    content = csv_buffer.getvalue()
    client = boto3.client('s3')
    client.put_object(Bucket=output_bucket, Key=filename, Body=content)

    print ('Create Table: ' + table_name)
    run_query('''
    create or replace table {table_name}(
    title_name varchar,
    tier int,
    season_number int, 
    category varchar,
    effective_start_date varchar,
    imdb_title_name varchar,
    imdb_title_id varchar,
    content_category varchar
    )
    '''.format(table_name = table_name), dbname, schema)

    print ('Begin Uploading')
    run_query('''
    insert into max_dev.workspace.{table_name}

    select 
          $1, $2, $3, $4, $5, $6, $7, $8
    from {stage}/psi/{file_name}
    
     (FILE_FORMAT => CSV_ED)

    '''.format(stage = stage, table_name = table_name,
              file_name = table_name+'.csv')
            , dbname, schema)

    print ('Finish Uploading')   
    
    
class Utils():
    @staticmethod
    def to_csv_s3(content, bucket, key_path, filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        csv_buffer = StringIO()
        content.to_csv(csv_buffer)
        client.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())
        logger.info(f'Saved to {bucket}/{key}')
    
    @staticmethod
    def to_pkl_s3(content, bucket, key_path, filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        obj = pickle.dumps(content)
        client.put_object(Bucket=bucket, Key=key, Body=obj)
        logger.info(f'Saved model to {os.path.join(bucket, key)}')
        logger.info(f'Saved to {bucket}/{key}')

    @staticmethod
    def read_csv_s3(bucket, key_path,filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        logger.info(f'Reading from {bucket}/{key}')
        obj = client.get_object(Bucket=bucket, Key=key)
        df = pd.read_csv(obj['Body'], na_values="\\N")
        return df
        
    @staticmethod
    def read_pkl_s3(bucket, key_path,filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        logger.info(f'Reading from {bucket}/{key}')
        obj = client.get_object(Bucket=bucket, Key=key)
        body = obj['Body'].read()
        model = pickle.loads(body)
        return model

    
# df_fp.tier = df_fp.tier.astype('int')
cvdf_to_snowflake(df_fp, 'future_title_imdb_map')

INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.4, Python Version: 3.6.13, Platform: Linux-4.14.299-152.520.amzn1.x86_64-x86_64-with-glibc2.9
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.4, Python Version: 3.6.13, Platform: Linux-4.14.299-152.520.amzn1.x86_64-x86_64-with-glibc2.9
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.


Create Table: future_title_imdb_map


INFO:snowflake.connector.cursor:query: [create or replace table future_title_imdb_map( title_name varchar, tier int, sea...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.4, Python Version: 3.6.13, Platform: Linux-4.14.299-152.520.amzn1.x86_64-x86_64-with-glibc2.9
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.


Begin Uploading


INFO:snowflake.connector.cursor:query: [insert into max_dev.workspace.future_title_imdb_map  select $1, $2, $3, $4, $5, ...]
INFO:snowflake.connector.cursor:query execution done


Finish Uploading


In [16]:
df_fp.head()

Unnamed: 0,title_name,tier,season_number,category,effective_start_date,imdb_title_name,imdb_title_id,content_category
0,The Bridge,3,1,Unscripted Series,2021-02-11,,tt13027548,series
1,Westworld,1,4,Scripted Drama Series,2022-06-26,Westworld S4,tt0475784,series
2,Tig Notaro: Drawn,3,0,Specials,2021-07-24,,,special
3,Odo,3,3,Kids & Family,2022-04-07,,tt16538364,series
4,The First Year,3,0,Documentary Features,2022-07-05,,,movies


In [13]:
##nodejs:  https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/setting-up-node-on-ec2-instance.html

# !pip install "jupyterlab>=3" "ipywidgets>=7.6"
# !pip install jupyter-dash
# !jupyter lab build


# !pip install snowflake --user
# !pip install snowflake-connector-python --user
# !pip install category_encoders
# !pip install xgboost
# !pip install lightgbm --user
import os
import sys
path=!pwd
sys.path.append(os.path.join(path[0], '..'))
sys.path.append('/home/ec2-user/SageMaker/jupyter-notebooks/')
from utils import *
import snowflake.connector
from datetime import timedelta
import json
from abc import ABCMeta, abstractmethod
import boto3
import logging 

logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)

from category_encoders import OneHotEncoder
import xgboost as xgb
# import lightgbm as lgbm
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error as MSE
from sklearn.metrics import mean_absolute_percentage_error as MAPE
from sklearn.metrics import mean_absolute_error as MAE
from sklearn.metrics import r2_score as r2_score
import sklearn.model_selection

class Credentials(metaclass=ABCMeta):
    pass
    
    
class SSMPSCredentials(Credentials):
    def __init__(self, secretid: str):
        self._secretid = secretid
        self._secrets = {}
        
    def get_keys(self):
        """
        credential fetching 
        """
        _aws_sm_args = {'service_name': 'secretsmanager', 'region_name': 'us-east-1'}
        secrets_client = boto3.client(**_aws_sm_args)
        get_secret_value_response = secrets_client.get_secret_value(SecretId=self._secretid)
        return get_secret_value_response
    
    
class BaseConnector(metaclass=ABCMeta):
    @abstractmethod
    def connect(self):
        raise NotImplementedError
        

class SnowflakeConnector(BaseConnector):
    def __init__(self, credentials: Credentials):
        keys = credentials.get_keys()
        self._secrets = json.loads(keys.get('SecretString', "{}"))

    def connect(self, dbname: str, schema: str = 'DEFAULT'):
        ctx = snowflake.connector.connect(
            user=self._secrets['login_name'],
            password=self._secrets['login_password'],
            account=self._secrets['account'],
            warehouse=self._secrets['warehouse'],
            database=dbname,
            schema=schema
        )

        return ctx


def run_query(query, dbname, schema):
    SF_CREDS = 'datascience-max-dev-sagemaker-notebooks'

    conn=SnowflakeConnector(SSMPSCredentials(SF_CREDS))
    ctx=conn.connect(dbname,schema)
    cursor = ctx.cursor()
    cursor.execute(query)
    df = pd.DataFrame(cursor.fetchall(), columns = [desc[0] for desc in cursor.description])
    df.columns= df.columns.str.lower()
    return df    
## Credentials
SF_CREDS = 'datascience-max-dev-sagemaker-notebooks'

## Snowflake connection 
conn=SnowflakeConnector(SSMPSCredentials(SF_CREDS))
ctx=conn.connect("MAX_PROD","DATASCIENCE_STAGE")
cur = ctx.cursor()

def cvdf_to_snowflake(df, table_name):
    stage = '@HBO_OUTBOUND_DATASCIENCE_CONTENT_DEV'
    output_bucket = "hbo-outbound-datascience-content-dev"
    filename ='psi/' + table_name + '.csv'
    dbname, schema = 'MAX_DEV', 'CONTENT_DATASCIENCE'
    
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index = False)
    content = csv_buffer.getvalue()
    client = boto3.client('s3')
    client.put_object(Bucket=output_bucket, Key=filename, Body=content)

    print ('Create Table: ' + table_name)
    run_query('''
    create or replace table {table_name}(
    title_name varchar,
    tier int,
    season_number int, 
    category varchar,
    effective_start_date varchar,
    imdb_title_name varchar,
    imdb_title_id varchar,
    content_category varchar
    )
    '''.format(table_name = table_name), dbname, schema)

    print ('Begin Uploading')
    run_query('''
    insert into max_dev.content_datascience.{table_name}

    select 
          $1, $2, $3, $4, $5, $6, $7, $8
    from {stage}/psi/{file_name}

     (FILE_FORMAT => csv_v2)

    '''.format(stage = stage, table_name = table_name,
              file_name = table_name+'.csv')
            , dbname, schema)

    print ('Finish Uploading')   
    
    
class Utils():
    @staticmethod
    def to_csv_s3(content, bucket, key_path, filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        csv_buffer = StringIO()
        content.to_csv(csv_buffer)
        client.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())
        logger.info(f'Saved to {bucket}/{key}')
    
    @staticmethod
    def to_pkl_s3(content, bucket, key_path, filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        obj = pickle.dumps(content)
        client.put_object(Bucket=bucket, Key=key, Body=obj)
        logger.info(f'Saved model to {os.path.join(bucket, key)}')
        logger.info(f'Saved to {bucket}/{key}')

    @staticmethod
    def read_csv_s3(bucket, key_path,filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        logger.info(f'Reading from {bucket}/{key}')
        obj = client.get_object(Bucket=bucket, Key=key)
        df = pd.read_csv(obj['Body'], na_values="\\N")
        return df
        
    @staticmethod
    def read_pkl_s3(bucket, key_path,filename):
        client = boto3.client('s3')
        key = os.path.join(key_path, filename)
        logger.info(f'Reading from {bucket}/{key}')
        obj = client.get_object(Bucket=bucket, Key=key)
        body = obj['Body'].read()
        model = pickle.loads(body)
        return model


INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.4, Python Version: 3.6.13, Platform: Linux-4.14.299-152.520.amzn1.x86_64-x86_64-with-glibc2.9
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.


In [14]:
## Upload manually entered data to snowflake 
import io
from io import StringIO
import pandas as pd


# df_fp = Utils.read_csv_s3('hbo-ingest-datascience-content-dev', 'psi_first_views/dev','future_program_imdb_id_full_202204.csv')
# df_fp = df_fp.rename(columns={'premiere_date':'effective_start_date',
#                              'title_name_imdb':'imdb_title_name',
#                              'imdb_id':'imdb_title_id',
#                              'program_type':'content_category'})

# df_fp.loc[df_fp.content_category=='movie','content_category'] = 'movies'
# df_fp['title_id'] = 0
# df_fp['first_views'] = 0
# display(df_fp.head())
# df_fp.loc[df_fp.title_name=='Aquaman and the Lost Kingdom', 'imdb_title_id'] = 'tt9663764'

# Utils.to_csv_s3(df_fp, 'hbo-ingest-datascience-content-dev', 'psi_first_views/dev','future_program_imdb_id_full_202204.csv')
# df_fp = df_fp[['title_name', 'tier', 'season_number', 'category',
#        'effective_start_date', 'imdb_title_name','imdb_title_id', 'content_category']]

cvdf_to_snowflake(df_fp, 'future_title_imdb_map')


INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.4, Python Version: 3.6.13, Platform: Linux-4.14.299-152.520.amzn1.x86_64-x86_64-with-glibc2.9
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.


Create Table: future_title_imdb_map


INFO:snowflake.connector.cursor:query: [create or replace table future_title_imdb_map( title_name varchar, tier int, sea...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 2.7.4, Python Version: 3.6.13, Platform: Linux-4.14.299-152.520.amzn1.x86_64-x86_64-with-glibc2.9
INFO:snowflake.connector.connection:This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.


Begin Uploading


INFO:snowflake.connector.cursor:query: [insert into max_dev.content_datascience.future_title_imdb_map  select $1, $2, $3...]
INFO:snowflake.connector.cursor:query execution done
INFO:snowflake.connector.connection:closed
INFO:snowflake.connector.connection:No async queries seem to be running, deleting session


ProgrammingError: 100431 (22000): 01aa3f06-0606-bec5-00f8-4103e83371f3: Failed to access remote file: object in invalid storage class. The file may be in Glacier or Deep Archive. Change the storage class before accessing it. File: psi/future_program_imdb_map.csv

In [20]:
df_fp[df_fp.title_name.str.contains('Gilded')]

Unnamed: 0,title_name,tier,season_number,category,effective_start_date,imdb_title_name,imdb_title_id,content_category
149,The Gilded Age,1,2,Scripted Drama Series,2023-04-17,The Gilded Age S1,tt4406178,series
534,The Gilded Age / Say Her Name,1,3,Scripted Drama Series,2024-04-15,,,series
768,The Gilded Age,1,1,Scripted Drama Series,2022-01-24,The Gilded Age S1,tt4406178,series
