In [None]:
import sys
import os

sys.path.append('./')
sys.path.append('../')

import yaml
import pathlib
import json
from src.predictions.profiles_mlcorelib.utils.logger import logger

In [None]:
try:
    from src.predictions.profiles_mlcorelib.connectors.BigQueryConnector import BigQueryConnector
except Exception as e:
    logger.warning(f"Could not import BigQueryConnector")

In [None]:
homedir = os.path.expanduser("~")

with open(os.path.join(homedir, ".pb/siteconfig.yaml"), "r") as f:
    creds = yaml.safe_load(f)["connections"]["test-bq"]["outputs"]["dev"]

if creds["type"] == "snowflake":
    print(
        f"Using {creds['schema']} schema in snowflake account: {creds['account']}"
    )
elif creds["type"] == "redshift":
    print(f"Using {creds['schema']} schema in Redshift account: {creds['host']}")
elif creds["type"] == "bigquery":
    print(
        f"Using {creds['schema']} schema in BigQuery project: {creds['project_id']}"
    )
else:
    raise Exception(f"Unknown database type: {creds['type']}")

In [None]:
## Use schema from where we are fetching data in projects

creds["schema"] = "<NAME OF SCHEMA HAVING INPUT TABLES>"

In [None]:
connector = BigQueryConnector(creds, "./")
session = connector.build_session(creds)

In [None]:
type(session)

In [None]:
pages = "pages"
tracks = "tracks"
identifies = "identifies"
order_completed = "order_completed"

strings_to_remove = ['ANONYMOUS_ID', 'USER_ID', 'TIMESTAMP', 'EMAIL']
uppercase_list = lambda features: [feature.upper() for feature in features]

# For Pages Table:

## fetch all pages table column names as strings

In [None]:
fetch_column_names_query = f"select column_name from `{creds['project_id']}.{creds['schema']}.INFORMATION_SCHEMA.COLUMNS` where table_name = '{pages}'"
column_names_list = [row.column_name for row in connector.run_query(fetch_column_names_query)]
column_names_list = uppercase_list(column_names_list)

In [None]:
for string in strings_to_remove:
    if string in column_names_list:
        column_names_list.remove(string)

column_name_string = ', '.join(column_names_list)

## Creating pages table copy

In [None]:
query_create_temp = f"""
    CREATE TABLE `{creds['schema']}.{pages}_1` 
    AS (
        SELECT 
            ANONYMOUS_ID, 
            USER_ID, 
            TIMESTAMP as TIMESTAMP, 
            {column_name_string}
        FROM `{creds['schema']}.{pages}`
    );
"""
connector.run_query(query_create_temp, response=False)

print("Created temp tables successfully.")

In [None]:
iter = 1
data = {"100k": 100000,} # "500k": 500000, "1mn": 1000000}
query_count_row = f"select count(distinct ANONYMOUS_ID) as ROW_COUNT from `{creds['schema']}.{pages}_1`"

In [None]:
for postfix, limit in data.items():
    # reaching limits one by one.
    while int(connector.run_query(query_count_row)[0].ROW_COUNT) < limit:
        query_extend_tmp = f"""
            INSERT INTO `{creds['schema']}.{pages}_1` (
                (select 
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(ANONYMOUS_ID AS STRING), {str(iter)})))) as ANONYMOUS_ID,
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(USER_ID AS STRING), {str(iter)})))) as USER_ID, 
                    TIMESTAMP_ADD(T.TIMESTAMP, INTERVAL CAST(RAND() * 28 AS INT64) DAY) AS TIMESTAMP,
                    {column_name_string}
                from `{creds['schema']}.{pages}_1` T)
            );
        """
        connector.run_query(query_extend_tmp, response=False)
        iter += 1
    
    #saving the limit results.
    query_save_limit = f"""
        CREATE TABLE `{creds['schema']}.{pages}_{postfix}`
        AS (
            SELECT 
                ANONYMOUS_ID, 
                USER_ID, 
                TIMESTAMP as TIMESTAMP, 
                {column_name_string}
            FROM `{creds['schema']}.{pages}_1`
        );
    """
    connector.run_query(query_save_limit, response=False)
    print(f"Saved {limit} rows successfully in table {pages}_{postfix}")

## Dropping extra tables

In [None]:
query_drop_temp = f"DROP TABLE `{creds['schema']}.{pages}_1`"
connector.run_query(query_drop_temp, response=False)

print("Dropped temp tables successfully.")

# For Tracks Table:

In [None]:
fetch_column_names_query = f"select column_name from `{creds['project_id']}.{creds['schema']}.INFORMATION_SCHEMA.COLUMNS` where table_name = '{tracks}'"
column_names_list = [row.column_name for row in connector.run_query(fetch_column_names_query)]
column_names_list = uppercase_list(column_names_list)

In [None]:
for string in strings_to_remove:
    if string in column_names_list:
        column_names_list.remove(string)

column_name_string = ', '.join(column_names_list)

## Creating tracks table copy

In [None]:
query_create_temp = f"""
    CREATE TABLE `{creds['schema']}.{tracks}_1` 
    AS (
        SELECT 
            ANONYMOUS_ID, 
            USER_ID, 
            TIMESTAMP as TIMESTAMP, 
            {column_name_string}
        FROM `{creds['schema']}.{tracks}`
    );
"""
connector.run_query(query_create_temp, response=False)

print("Created temp tables successfully.")

In [None]:
iter = 1
data = {"100k": 100000,}    # "500k": 500000, "1mn": 1000000}
query_count_row = f"select count(distinct ANONYMOUS_ID) as ROW_COUNT from `{creds['schema']}.{tracks}_1`"

In [None]:
for postfix, limit in data.items():
    # reaching limits one by one.
    while int(connector.run_query(query_count_row)[0].ROW_COUNT) < limit:
        query_extend_tmp = f"""
            INSERT INTO `{creds['schema']}.{tracks}_1` (
                (select 
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(ANONYMOUS_ID AS STRING), {str(iter)})))) as ANONYMOUS_ID,
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(USER_ID AS STRING), {str(iter)})))) as USER_ID, 
                    TIMESTAMP_ADD(T.TIMESTAMP, INTERVAL CAST(RAND() * 28 AS INT64) DAY) AS TIMESTAMP,
                    {column_name_string}
                from `{creds['schema']}.{tracks}_1` T)
            );
        """
        connector.run_query(query_extend_tmp, response=False)
        iter += 1
    
    #saving the limit results.
    query_save_limit = f"""
        CREATE TABLE `{creds['schema']}.{tracks}_{postfix}`
        AS (
            SELECT 
                ANONYMOUS_ID, 
                USER_ID, 
                TIMESTAMP as TIMESTAMP, 
                {column_name_string}
            FROM `{creds['schema']}.{tracks}_1`
        );
    """
    connector.run_query(query_save_limit, response=False)
    print(f"Saved {limit} rows successfully in table {tracks}_{postfix}")

## Dropping extra tables

In [None]:
query_drop_temp = f"DROP TABLE `{creds['schema']}.{tracks}_1`"
connector.run_query(query_drop_temp, response=False)

print("Dropped temp tables successfully.")

# For identifies Table:

In [None]:
fetch_column_names_query = f"select column_name from `{creds['project_id']}.{creds['schema']}.INFORMATION_SCHEMA.COLUMNS` where table_name = '{identifies}'"
column_names_list = [row.column_name for row in connector.run_query(fetch_column_names_query)]
column_names_list = uppercase_list(column_names_list)

In [None]:
for string in strings_to_remove:
    if string in column_names_list:
        column_names_list.remove(string)

column_name_string = ', '.join(column_names_list)

## Creating identifies table copy

In [None]:
query_create_temp = f"""
    CREATE TABLE `{creds['schema']}.{identifies}_1` 
    AS (
        SELECT 
            ANONYMOUS_ID, 
            USER_ID, 
            EMAIL,
            TIMESTAMP as TIMESTAMP, 
            {column_name_string}
        FROM `{creds['schema']}.{identifies}`
    );
"""
connector.run_query(query_create_temp, response=False)

print("Created temp tables successfully.")

In [None]:
iter = 1
data = {"100k": 100000,}    # "500k": 500000, "1mn": 1000000}
query_count_row = f"select count(distinct ANONYMOUS_ID) as ROW_COUNT from `{creds['schema']}.{identifies}_1`"

In [None]:
for postfix, limit in data.items():
    # reaching limits one by one.
    while int(connector.run_query(query_count_row)[0].ROW_COUNT) < limit:
        query_extend_tmp = f"""
            INSERT INTO `{creds['schema']}.{identifies}_1` (
                (select 
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(ANONYMOUS_ID AS STRING), {str(iter)})))) as ANONYMOUS_ID,
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(USER_ID AS STRING), {str(iter)})))) as USER_ID, 
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(EMAIL AS STRING), {str(iter)})))) as EMAIL, 
                    TIMESTAMP_ADD(T.TIMESTAMP, INTERVAL CAST(RAND() * 28 AS INT64) DAY) AS TIMESTAMP,
                    {column_name_string}
                from `{creds['schema']}.{identifies}_1` T)
            );
        """
        connector.run_query(query_extend_tmp, response=False)
        iter += 1
    
    #saving the limit results.
    query_save_limit = f"""
        CREATE TABLE `{creds['schema']}.{identifies}_{postfix}`
        AS (
            SELECT 
                ANONYMOUS_ID, 
                USER_ID, 
                EMAIL,
                TIMESTAMP as TIMESTAMP, 
                {column_name_string}
            FROM `{creds['schema']}.{identifies}_1`
        );
    """
    connector.run_query(query_save_limit, response=False)
    print(f"Saved {limit} rows successfully in table {identifies}_{postfix}")

## Dropping extra tables

In [None]:
query_drop_temp = f"DROP TABLE `{creds['schema']}.{identifies}_1`"
connector.run_query(query_drop_temp, response=False)

print("Dropped temp tables successfully.")

# For order_completed Table:

In [None]:
fetch_column_names_query = f"select column_name from `{creds['project_id']}.{creds['schema']}.INFORMATION_SCHEMA.COLUMNS` where table_name = '{order_completed}'"
column_names_list = [row.column_name for row in connector.run_query(fetch_column_names_query)]
column_names_list = uppercase_list(column_names_list)

In [None]:
for string in strings_to_remove:
    if string in column_names_list:
        column_names_list.remove(string)

column_name_string = ', '.join(column_names_list)

## Creating order_completed table copy

In [None]:
query_create_temp = f"""
    CREATE TABLE `{creds['schema']}.{order_completed}_1` 
    AS (
        SELECT 
            ANONYMOUS_ID, 
            USER_ID,
            TIMESTAMP as TIMESTAMP, 
            {column_name_string}
        FROM `{creds['schema']}.{order_completed}`
    );
"""
connector.run_query(query_create_temp, response=False)

print("Created temp tables successfully.")

In [None]:
iter = 1
data = {"100k": 100000, "500k": 500000, "1mn": 1000000}
query_count_row = f"select count(distinct ANONYMOUS_ID) as ROW_COUNT from `{creds['schema']}.{order_completed}_1`"

In [None]:
for postfix, limit in data.items():
    # reaching limits one by one.
    while int(connector.run_query(query_count_row)[0].ROW_COUNT) < limit:
        query_extend_tmp = f"""
            INSERT INTO `{creds['schema']}.{order_completed}_1` (
                (select 
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(ANONYMOUS_ID AS STRING), {str(iter)})))) as ANONYMOUS_ID,
                    LOWER(TO_HEX(SHA1(CONCAT(CAST(USER_ID AS STRING), {str(iter)})))) as USER_ID,
                    TIMESTAMP_ADD(T.TIMESTAMP, INTERVAL CAST(RAND() * 28 AS INT64) DAY) AS TIMESTAMP,
                    {column_name_string}
                from `{creds['schema']}.{order_completed}_1` T)
            );
        """
        connector.run_query(query_extend_tmp, response=False)
        iter += 1
    
    #saving the limit results.
    query_save_limit = f"""
        CREATE TABLE `{creds['schema']}.{order_completed}_{postfix}`
        AS (
            SELECT 
                ANONYMOUS_ID, 
                USER_ID,
                TIMESTAMP as TIMESTAMP, 
                {column_name_string}
            FROM `{creds['schema']}.{order_completed}_1`
        );
    """
    connector.run_query(query_save_limit, response=False)
    print(f"Saved {limit} rows successfully in table {order_completed}_{postfix}")

## Dropping extra tables

In [None]:
query_drop_temp = f"DROP TABLE `{creds['schema']}.{order_completed}_1`"
connector.run_query(query_drop_temp, response=False)

print("Dropped temp tables successfully.")