In [None]:
from skt.gcp import (
    PROJECT_ID,
    bq_insert_overwrite,
    bq_to_df,
    bq_to_pandas,
    get_bigquery_client,
    bq_table_exists,
    get_max_part,
    load_query_result_to_table,
    pandas_to_bq,
    pandas_to_bq_table,
    load_bigquery_ipython_magic,
    get_bigquery_client,
    _print_query_job_results,
    load_query_result_to_partitions
    
)

from skt.ye import (
    get_hdfs_conn,
    get_spark,
    hive_execute,
    hive_to_pandas,
    pandas_to_parquet,
    slack_send,
    get_secrets
)

In [None]:
from google.cloud.bigquery.job import QueryJobConfig

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    row_number, 
    col, 
    lit, 
    count, 
    log, 
    exp, 
    sum as spark_sum
)
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType

In [None]:
import pandas as pd
from datetime import datetime, date, timedelta

In [None]:
execution_dt = datetime.strptime(current_dt, '%Y-%m-%d')
execution_dt_one_ago = (execution_dt - timedelta(days=1)).strftime('%Y-%m-%d')
execution_dt_next = (execution_dt + timedelta(days=1))
current_dt_next = execution_dt_next.strftime('%Y-%m-%d')

In [None]:
print(f'current_dt: {current_dt}')
print(f'current_dt_next: {current_dt_next}')
print(f'state: {state}')


In [None]:
db_name = 'adot_reco_dev'
temp_db = 'temp_1d'

In [None]:
pivot_table_nm = "adotServiceMultiProfilesPivotTable"

In [None]:
bq_client = get_bigquery_client()

In [None]:
table_exists = bq_table_exists(table=f"{db_name}.{pivot_table_nm}", project_id = PROJECT_ID)

In [None]:
if table_exists:
    try:
        max_pivot_dt = get_max_part(f"{db_name}.{pivot_table_nm}")
        query = f"""
            SELECT *
            FROM {db_name}.adotServiceMultiProfilesPivotTable
            WHERE dt <='{max_pivot_dt}'
        """

        job_config = QueryJobConfig()
        temp_history_pivot_table = f'{PROJECT_ID}.{temp_db}.temp_history_pivot'
        job_config.destination = temp_history_pivot_table
        job_config.write_disposition = 'WRITE_TRUNCATE'
        query_job = bq_client.query(query, job_config=job_config)
        query_job.result() 
    except ValueError as e:
        is_first = True
        pass

In [None]:
pivot_query = f"""
SELECT  profile_templates,
        source_domain,
        user_keys
FROM  (
        SELECT  profile_templates,
                source_domain,
                ARRAY_AGG(user_key) as user_keys
        FROM (

                SELECT  distinct svc_mgmt_num,
                                luna_id,
                                profile_templates,
                                source_domain,
                                ARRAY_TO_STRING([svc_mgmt_num, luna_id, source_domain], '_') as user_key
                FROM {db_name}.adotServiceProfile_templated_xdr
                WHERE dt = '{current_dt}' and luna_id is not null and profile_templates is not null and profile_templates!=''
        ) 
        GROUP BY profile_templates, source_domain

        UNION ALL

        SELECT  profile_templates,
                source_domain,
                ARRAY_AGG(user_key) as user_keys
        FROM (
                SELECT distinct svc_mgmt_num,
                                luna_id,
                                profile_templates,
                                source_domain,
                                ARRAY_TO_STRING([svc_mgmt_num, luna_id, source_domain], '_') as user_key

                FROM {db_name}.adotServiceProfile_templated_tdeal
                WHERE dt = '{current_dt}' and luna_id is not null and profile_templates is not null and profile_templates!=''
        )
        GROUP BY profile_templates, source_domain

        UNION ALL

        SELECT  profile_templates,
                source_domain,
                ARRAY_AGG(user_key) as user_keys
        FROM (
                SELECT  distinct svc_mgmt_num, 
                                luna_id,
                                profile_templates,
                                source_domain,
                                ARRAY_TO_STRING([svc_mgmt_num, luna_id, source_domain], '_') as user_key
                FROM {db_name}.adotServiceProfile_templated_adot
                WHERE dt = '{current_dt}' and luna_id is not null and profile_templates is not null and profile_templates!=''
        )
        GROUP BY profile_templates, source_domain

        UNION ALL

        SELECT  profile_templates,
                source_domain,
                ARRAY_AGG(user_key) as user_keys
        FROM (
                SELECT  distinct svc_mgmt_num, 
                                luna_id,
                                profile_templates,
                                source_domain,
                                ARRAY_TO_STRING([svc_mgmt_num, luna_id, source_domain], '_') as user_key
                FROM {db_name}.adotServiceProfile_templated_tmap
                WHERE dt = '{current_dt}' and luna_id is not null and profile_templates is not null and profile_templates!=''
        )
        GROUP BY profile_templates, source_domain 

        UNION ALL
        
        SELECT  profile_templates,
                source_domain,
                ARRAY_AGG(user_key) as user_keys
        FROM (
                SELECT  distinct svc_mgmt_num, 
                                luna_id,
                                profile_templates,
                                source_domain,
                                ARRAY_TO_STRING([svc_mgmt_num, luna_id, source_domain], '_') as user_key
                FROM {db_name}.adotServiceProfile_templated_tmbr
                WHERE dt = '{current_dt}' and luna_id is not null and profile_templates is not null and profile_templates!=''
        )
        GROUP BY profile_templates, source_domain 
) 
"""

In [None]:
job_config = QueryJobConfig()
current_pivot_table = f'{PROJECT_ID}.{temp_db}.temp_current_pivot_table'
job_config.destination = current_pivot_table
job_config.write_disposition = 'WRITE_TRUNCATE'
query_job = bq_client.query(pivot_query, job_config=job_config)
query_job.result() 

In [None]:
if is_first:
    start_query = f"""WITH PREVIOUS_PIVOT_TALBE AS (
        SELECT *
        FROM  {PROJECT_ID}.{temp_db}.temp_history_pivot
        )
    """
else:
    start_query = """WITH PREVIOUS_PIVOT_TALBE AS (
        SELECT
            CAST(NULL AS INT) AS profile_id,
            CAST(NULL AS STRING) AS profile_templates,
            CAST(NULL AS STRING) AS source_domain
        FROM UNNEST([]) AS dummy
        WHERE
            FALSE
        )
    """

In [None]:

query = f"""
{start_query},

LastIndex AS (
        SELECT COALESCE(MAX(profile_id), 0) AS max_idx
        FROM   PREVIOUS_PIVOT_TALBE
),
CURRENT_PIVOT_TALBE AS (
        SELECT  profile_templates,
                source_domain
        FROM   {PROJECT_ID}.{temp_db}.temp_current_pivot
)

SELECT  profile_templates,
        source_domain,
        (profile_id + (SELECT max_id FROM LastIndex)) AS profile_id, -- Add max_idx to each row's idx
        PARSE_DATE('%Y-%m-%d', '{current_dt_next}') as dt           
FROM (
        SELECT  *,
                ROW_NUMBER() OVER (ORDER BY profile_templates, source_domain) AS profile_id
        FROM (
                SELECT  A.*,
                        B.profile_id
                FROM PREVIOUS_PIVOT_TALBE AS A
                LEFT JOIN (
                        SELECT *
                        FROM CURRENT_PIVOT_TALBE
                ) AS B
                ON A.profile_templates = B.profile_templates AND A.source_domain = B.source_domain
        ) 
        WHERE profile_id is null
)
"""

In [None]:
# 증분량 빅쿼리 테이블로 우선 저장
bq_insert_overwrite(sql=query, destination=f'{PROJECT_ID}.{db_name}.adotServiceMultiProfilesPivotTable', partition='dt')

In [None]:
hdfs_root_path = "/data/temp/ca_recsys"
current_root_path = f"{hdfs_root_path}/{state}/user_retriev/{current_dt_next}"
hdfs_data_path =  f"{current_root_path}/data/profiles"

In [None]:
# 추론용 하둡 패스에 저장
profiles_table = bq_to_df(query)
profiles_table.write.mode("overwrite").parquet(hdfs_data_path)

In [None]:
query = f"""
WITH explodedByLuna AS (
        SELECT  ARRAY_AGG(SPLIT(user_key, '_')[OFFSET(0)]) as svc_mgmt_num,
                ARRAY_AGG(SPLIT(user_key, '_')[OFFSET(1)]) as luna_id,
                ARRAY_AGG(SPLIT(user_key, '_')[OFFSET(2)]) as source_domain,
                profile_templates,
                profile_id,
        FROM    {PROJECT_ID}.{temp_db}.temp_current_pivot_table,
                UNNEST(luna_ids) AS unique_key
)
SELECT  svc_mgmt_num,
        luna_id,
        ARRAY_AGG(DISTINCT source_domain) AS source_domains,
        ARRAY_AGG(DISTINCT profile_id) AS profile_ids,
        PARSE_DATE('%Y-%m-%d', '{current_dt_next}') as dt 

FROM explodedByLuna
GROUP BY svc_mgmt_num, luna_id
"""

In [None]:
bq_insert_overwrite(sql=query, destination=f'{PROJECT_ID}.{db_name}.adotServiceUnionUserProfiles', partition='dt')

# add ttl

In [None]:
ttl_query = f"""
ALTER TABLE
  {PROJECT_ID}.{db_name}.adotServiceUnionUserProfiles
SET
  OPTIONS(partition_expiration_days={ttl})
"""
bq_client.query(ttl_query).result()

# DROP TEMP TABLE

In [None]:
temp_table_list = [f"{PROJECT_ID}.{temp_db}.temp_current_pivot_table", f'{PROJECT_ID}.{temp_db}.temp_history_pivot']
for temp_table in temp_table_list:
    drop_query = f"""Drop table if exists {temp_table}"""
    bq_client.query(drop_query).result()