In [1]:
from skt.ye import get_spark
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
import sys
from datetime import datetime, timedelta

In [2]:
from skt.gcp import load_bigquery_ipython_magic, \
                    bq_to_pandas, \
                    get_bigquery_client, \
                    bq_to_df

In [1]:
execution_dt = datetime.strptime(current_dt, '%Y%m%d')
execution_date_ago = (execution_dt - timedelta(days=1)).strftime('%Y/%m/%d')
execution_dt = execution_dt.strftime('%Y-%m-%d')
print(f'excution_dt: {execution_dt}')
print(f'execution_dt_ago: {execution_date_ago}')

# QUERY FOR APP LOG MUSIC HOME

In [71]:
query = f"""
WITH raw_data AS
(
    SELECT  timestamp,
            user_id,
            stat,
            JSON_EXTRACT_ARRAY(click) as click,
            service_cd
            
    FROM
    (
        SELECT  timestamp,
                JSON_VALUE(log, '$.BODY.USER_ID') as user_id,
                JSON_EXTRACT(SAFE.PARSE_JSON(log), '$.BODY') as body,
                STRING(JSON_EXTRACT(JSON_EXTRACT(SAFE.PARSE_JSON(log), '$.BODY'), '$.SERVICE_CD')) as service_cd,
                JSON_EXTRACT(JSON_EXTRACT(SAFE.PARSE_JSON(log), '$.BODY'), '$.STAT') as stat,
                JSON_EXTRACT(JSON_EXTRACT(JSON_EXTRACT(SAFE.PARSE_JSON(log), '$.BODY'), '$.STAT'), '$.CLICK')  as click
        FROM `skt-datahub.apollo_log.applog_prd` 
        WHERE TIMESTAMP_TRUNC(timestamp, day) = TIMESTAMP('{execution_dt}') and 
        log like '%music.home%'
    )
    WHERE service_cd ='apollo' and ARRAY_LENGTH(JSON_EXTRACT_ARRAY(click)) > 0
),
get_parsed_data as (
    SELECT  *
    FROM(
        SELECT  timestamp,
                user_id,
                click,
                service_cd,
                JSON_EXTRACT_ARRAY(JSON_EXTRACT(click, '$.ADDITIONAL_DIMENSION')) as ADDITIONAL_DIMENSION,
                JSON_VALUE(click, '$.CLICK_ACTION_CD') as CLICK_ACTION_CD,
                JSON_VALUE(click, '$.CLICK_CD') as CLICK_CD,
                JSON_VALUE(click, '$.CLICK_TM') as CLICK_TM,
                JSON_VALUE(click, '$.CLIENT_SEQ') as CLIENT_SEQ,
                JSON_VALUE(click, '$.COUNTRY_CODE') as COUNTRY_CODE,
                JSON_VALUE(click, '$.PAGE_CD') as PAGE_CD
        
        FROM(
            SELECT 
                   user_id,
                   timestamp,
                   click,
                   service_cd
            FROM raw_data,
            UNNEST(click) click
        )
        WHERE STRING(JSON_EXTRACT(click, '$.PAGE_CD')) = 'music.home' -- music home 조건
    )
)

SELECT  user_id,
        service_cd,
        Max(transactionId_value) AS transactionId,
        MAX(sectionId_value) AS sectionId,
        MAX(section_value) AS section,
        MAX(index_value) AS index,
        CLICK_ACTION_CD AS click_action_cd,
        CLICK_CD AS click_cd,
        CLICK_TM AS click_tm,
        PAGE_CD AS page_cd,
        COUNTRY_CODE AS country_code,
        timestamp,
        DATE(TIMESTAMP_TRUNC(timestamp, day)) as dt
FROM 
(
    SELECT  *,
            IF(JSON_EXTRACT_SCALAR(info, '$.DIMENSION_CD') IN ('TRANSACTION_ID'), JSON_EXTRACT_SCALAR(info, '$.DIMENSION_VALUE'), NULL) AS transactionId_value, 
            IF(JSON_EXTRACT_SCALAR(info, '$.DIMENSION_CD') IN ('SECTION_ID'), JSON_EXTRACT_SCALAR(info, '$.DIMENSION_VALUE'), NULL) AS sectionId_value,
            IF(JSON_EXTRACT_SCALAR(info, '$.DIMENSION_CD') IN ('SECTION'), JSON_EXTRACT_SCALAR(info, '$.DIMENSION_VALUE'), NULL) AS section_value,
            IF(JSON_EXTRACT_SCALAR(info, '$.DIMENSION_CD') IN ('INDEX'), JSON_EXTRACT_SCALAR(info, '$.DIMENSION_VALUE'), NULL) AS index_value

    FROM get_parsed_data CROSS JOIN UNNEST(additional_dimension) AS info
)
GROUP BY user_id, service_cd,click_action_cd, click_cd, click_tm, page_cd, country_code, timestamp
"""


In [4]:
music_home_log = bq_to_pandas(query)

In [None]:
music_home_log['more_flag'] = music_home_log['click_cd'].apply(lambda x: True if 'more' in x else False)
music_home_log['click_cd'] = music_home_log['click_cd'].apply(lambda x: x.split('_')[0] if 'more' in x else x)

In [6]:
music_home_log.iloc[0]        

In [2]:
from skt.gcp import df_to_bq_table
from pyspark.sql.types import DateType
import time

dest_dataset = "x1112436"
partitioned_dest_table = f"adot_musichome_app_log_stg"

get_bigquery_client().query(f"""
    CREATE TABLE IF NOT EXISTS {dest_dataset}.{partitioned_dest_table}
    (
        user_id STRING,
        service_cd STRING,
        transactionId STRING,
        sectionId STRING,
        section STRING,
        index STRING,
        click_action_cd STRING,
        click_cd STRING,
        click_tm STRING,
        page_cd STRING,
        more_flag Bool,
        country_code STRING,
        timestamp TIMESTAMP,
        dt DATE
    )
    PARTITION BY dt
""").result()

print(f"생성된 테이블 : {dest_dataset}.{partitioned_dest_table}")

In [81]:
from skt.gcp import pandas_to_bq, get_bigquery_client

In [82]:
pandas_to_bq(
             pd_df = music_home_log, 
             destination = f"skt-datahub.{dest_dataset}.{partitioned_dest_table}", 
             partition = 'dt',
             overwrite=True
)

In [3]:
get_bigquery_client().query(f"""
    DROP TABLE IF EXISTS {dest_dataset}.{partitioned_dest_table}"""
)