In [0]:
from pyspark.sql.functions import regexp_extract, col, from_json, to_timestamp, struct, transform, coalesce, array, size, expr
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, ArrayType

from delta.tables import DeltaTable

In [0]:
def save_to_delta(df, table_name):
    base_delta_path = f"s3://treenod-analytic-line-cleansed-data/delta/pkpk_line_improved/{table_name}"
    replace_dt = date_path.replace("/", "-")
    
    df.write.format("delta") \
        .partitionBy("dt") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .option("optimizeWrite", "true") \
        .option("replaceWhere", f"dt = '{replace_dt}'") \
        .option("path", f"{base_delta_path}") \
        .saveAsTable(f"line.pkpk_line_improved.{table_name}")

In [0]:
def restructure_schema(df, json_schema):
    col_expr = []

    for column in df.columns:
        match column:
            case "params":
                col_expr.append(from_json(col(column), json_schema).alias(column))
            case "ts": 
                col_expr.append(to_timestamp(col(column)).alias(column))
            case _: 
                col_expr.append(col(column))

    temp_df = df.select(col_expr)

    final_struct_exprs = []
    
    for field in json_schema.fields:
        field_name = field.name
        new_field_name = field_name[2:].lower() if field_name.startswith("L_") else field_name.lower()
        
        if isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
            struct_field_exprs = []
            
            for struct_field in field.dataType.elementType.fields:
                orig_name = struct_field.name
                new_name = orig_name[2:].lower() if orig_name.startswith("L_") else orig_name.lower()
                struct_field_exprs.append(f"'{new_name}', element.{orig_name}")
            
            array_transform_expr = f"transform(params.{field_name}, element -> named_struct({', '.join(struct_field_exprs)})) as {new_field_name}"
            final_struct_exprs.append(array_transform_expr)
        else:
            final_struct_exprs.append(f"params.{field_name} as {new_field_name}")
    
    final_expr = [col for col in df.columns if col != "params"]
    final_expr.append(expr(f"struct({', '.join(final_struct_exprs)}) as params"))
    
    return temp_df.select(final_expr)


In [0]:
def run_achievement(df_origin):
    df = df_origin.filter(col("event_name") == "ACHIEVEMENT")

    params_schema = StructType([
        StructField("L_TAG", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_ANM", StringType()),
        StructField("L_ARLT", StringType()),
        StructField("L_CDT", StringType()),
        StructField("L_STR1", StringType()),
        StructField("L_STR2", StringType())
    ])
    
    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())
        
    save_to_delta(df, "achievement")

In [0]:
def run_advertisement(df_origin):
    df = df_origin.filter(col("event_name") == "ADVERTISEMENT")

    params_schema = StructType([
        StructField("L_TAG", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_PLACE", StringType()),
        StructField("L_ADUNIT", StringType()),
        StructField("L_ACTION", StringType()),
        StructField("L_CDT", StringType())
    ])

    
    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "advertisement")

In [0]:
def run_item(df_origin):
    df = df_origin.filter(col("event_name") == "ITEM")

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType()),
        StructField("L_TAG", StringType()),
        StructField("L_MID", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_ITEM", StringType()),
        StructField("L_ILV", StringType()),
        StructField("L_ITEM_C", IntegerType()),
        StructField("L_EVT", StringType()),
        StructField("L_CDT", StringType()),
        StructField("L_STR1", StringType()),
        StructField("L_STR2", StringType()),
        StructField("L_STR3", StringType()),
        StructField("L_STR4", StringType()),
        StructField("L_STR5", StringType()),
        StructField("L_STR6", StringType()),
        StructField("L_STR7", ArrayType(
            StructType([
                StructField("L_NAME", StringType()),
                StructField("L_VALUE", StringType())
                ])
            )),
        StructField("L_NUM1", StringType()),
    ])
    
    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "item")

In [0]:
def run_money(df_origin):
    df = df_origin.filter(col("event_name") == "MONEY")

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType()),
        StructField("L_TAG", StringType()),
        StructField("L_MID", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_MRSN", StringType()),
        StructField("L_MRSN_DTL", StringType()),
        StructField("L_EVT", StringType()),
        StructField("L_PMONEY_C", IntegerType()),
        StructField("L_FMONEY_C", IntegerType()),
        StructField("L_PMONEY_A", IntegerType()),
        StructField("L_FMONEY_A", IntegerType()),
        StructField("L_CDT", StringType()),
        StructField("L_STR1", StringType()),
        StructField("L_STR2", StringType())
    ])

    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "money")

In [0]:
def run_playend(df_origin):
    df = df_origin.filter(col("event_name") == "PLAYEND")

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType(), False),
        StructField("L_TAG", StringType(), False),
        StructField("L_ULV", StringType(), False),
        StructField("L_ULV_S", StringType()),
        StructField("L_GMOD", StringType()),
        StructField("L_SCR", IntegerType()),
        StructField("L_CASH_G", IntegerType()),
        StructField("L_GOLD_G", IntegerType()),
        StructField("L_EXPR_G", IntegerType()),
        StructField("L_PTM", IntegerType()),
        StructField("L_STG", StringType()),
        StructField("L_PWIN", StringType()),
        StructField("L_CDT", StringType()),
        StructField("L_REP", StringType()),
        StructField("L_IREP", StringType()),
        StructField("L_RESTRTBUT", StringType()),
        StructField("L_CNTNE_ITEM", ArrayType(
            StructType([
                StructField("L_CIID", StringType()),
                StructField("L_PRE_CLEAR", StringType()),
                StructField("L_1ST", IntegerType()),
                StructField("L_2ND", IntegerType())
            ])
        )),
        StructField("L_REPRSLT", IntegerType()),
        StructField("L_CHAR", ArrayType(
            StructType([
                StructField("L_CID", StringType()),
                StructField("L_CLV", StringType()),
                StructField("L_HP", IntegerType()),
                StructField("L_CAGE", IntegerType())
            ])
        )),
        StructField("L_ITEM", ArrayType(
            StructType([
                StructField("L_IID", StringType()),
                StructField("L_FREE", StringType()),
                StructField("L_CNT", IntegerType()),
                StructField("L_CONTINUE", IntegerType())
            ])
        )),
        StructField("L_SITEM", ArrayType(
            StructType([
                StructField("L_SIID", StringType()),
                StructField("L_FREE", StringType()),
                StructField("L_TIME", StringType()),
                StructField("L_BOOSTING", StringType()),
                StructField("L_CNT", IntegerType())
            ])
        )),
        # 24.10.24) 문서에는 number로 되어있으나 "Y" / "N"이 들어오는 것으로 확인
        StructField("L_IS_BEST_SCORE", StringType()),
        StructField("L_STAGE_PCNT", IntegerType()),
        StructField("L_STAGE_CLEARCNT", IntegerType()),
        StructField("L_USER_GP", ArrayType(
            StructType([
                StructField("L_GID", StringType())
            ])
        )),
        # 실제 리스트로 들어오고 있음. L_ITEM : 아이템 종류 / L_ITEM_C : 아이템 변동량 / etc ..
        # 문서와 상이한 구성으로 어떤 KEY가 들어올지 예상이 불가하여 udf로 해결할 수 밖에 없는데, 자원이 과하게 소모될 여지가 있음
        # 1. playend는 매일 주기적으로 들어오는 데이터이고, 각각의 레코드는 서로 다른 key를 가지고 있을 가능성이 있음
        # 2. 모든 레코드에 대해 L_STR1을 파싱하여 KEY, VALUE를 잡아내는 작업을 해야함
        # 3. 이렇게 잡아낸 KEY로 동적인 스키마를 구성해야함
        # 4. 읽는 시점에는 schema-on-read로 인해 존재하지 않는 값에 대해 쿼리는 실행 가능하며 NULL을 반환하게 될 것
        # 24.11.28) StringType()으로 결정
        StructField("L_STR1", StringType()),
        StructField("L_STR2", StringType()),
        StructField("L_STR3", StringType()),
        StructField("L_STR4", StringType()),
        StructField("L_STR5", StringType()),
        # L_STR6인지 L_GOAL인지 확인 필요 -> 노튼 확인 결과 이 경우 엑셀 내에서 왼쪽에 컬럼으로 명시된 것이 더 정확도가 높다고 함.
        StructField("L_STR6", ArrayType( 
            StructType([
                StructField("L_NAME", StringType()),
                StructField("L_INITIAL", IntegerType()),
                StructField("L_END", IntegerType())
            ])
        )),
        StructField("L_STR7", StringType()),
        StructField("L_STR8", StringType()),
        StructField("L_STR9", ArrayType(
            StructType([
                StructField("L_NAME", StringType()),
                # 24.10.24) Integer로 기재되어 있지만 실제 String or Integer 입력 -> String으로 지정하고, Integer는 변환해서 써야할듯 함
                StructField("L_VALUE", StringType())
            ])
        )),
        StructField("L_NUM1", IntegerType()),
        StructField("L_NUM2", IntegerType()),
        StructField("L_NUM3", IntegerType()),
        StructField("L_NUM4", IntegerType()),
        StructField("L_NUM5", IntegerType()),
        StructField("L_CREATE_BOMB", StringType())
    ])

    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "playend")

In [0]:
def run_playstart(df_origin):
    df = df_origin.filter(col("event_name") == "PLAYSTART")

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType(), False),
        StructField("L_TAG", StringType(), False),
        StructField("L_ULV", StringType(), False),
        StructField("L_GMOD", StringType()),
        StructField("L_SCR", IntegerType()),
        StructField("L_STG", StringType()),
        StructField("L_PWIN", StringType()),
        StructField("L_REP", StringType()),
        StructField("L_RESTRTBUT", StringType()),
        # 24.10.24) 문서에는 number로 되어있으나 "0", "1" 등의 String으로 들어오는 것 확인. PLAYEND 이벤트에서는 number로 들어옴
        StructField("L_REPRSLT", StringType()),
        StructField("L_CHAR", ArrayType(
            StructType([
                StructField("L_CID", StringType()),
                StructField("L_CLV", StringType()),
                StructField("L_HP", IntegerType()),
                StructField("L_CAGE", IntegerType())
            ])
        )),
        StructField("L_SITEM", ArrayType(
            StructType([
                StructField("L_SIID", StringType()),
                StructField("L_FREE", StringType()),
                StructField("L_TIME", StringType()),
                StructField("L_BOOSTING", StringType()),
                StructField("L_CNT", IntegerType())
            ])
        )),
        StructField("L_STAGE_PCNT", IntegerType()),
        StructField("L_STAGE_CLEARCNT", IntegerType()),
        StructField("L_CHPMISS_CLEARYN", StringType()),
        StructField("L_USER_GP", ArrayType(
            StructType([
                StructField("L_GID", StringType())
            ])
        )),
        StructField("L_CDT", StringType()),
        StructField("L_STR3", StringType()),
        StructField("L_STR4", StringType()),
        StructField("L_STR5", StringType()),
        StructField("L_STR6", StringType()),
        StructField("L_STR7", StringType()),
        StructField("L_STR8", ArrayType(
            StructType([
                StructField("L_NAME", StringType()),
                StructField("L_VALUE", StringType())
            ])
        ))
    ])

    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "playstart")

In [0]:
def run_social(df_origin):
    df = df_origin.filter(col("event_name") == "SOCIAL")

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType()),
        StructField("L_TAG", StringType()),
        StructField("L_MID", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_FRN_MID", StringType()),
        StructField("L_CDT", StringType()),
        StructField("L_STR1", StringType())
    ])

    df = restructure_schema(df, params_schema) \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "social")

In [0]:
def run_specific_gacha(df_origin):
    df = df_origin.filter((col("event_name") == "SPECIFIC"))

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType()),
        StructField("L_TAG", StringType()),
        StructField("L_MID", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_CDT", StringType()),
        StructField("L_STR1", StringType())
    ])

    df = restructure_schema(df, params_schema) \
        .filter(col("params.logtype") == "GACHA") \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "specific_gacha")


In [0]:
def run_specific_level(df_origin):
    df = df_origin.filter((col("event_name") == "SPECIFIC"))

    params_schema = StructType([
        StructField("L_LOGTYPE", StringType()),
        StructField("L_TAG", StringType()),
        StructField("L_MID", StringType()),
        StructField("L_ULV", StringType()),
        StructField("L_CDT", StringType()),
        StructField("L_STR1", ArrayType(
            StructType([
                StructField("L_IID", StringType()),
                StructField("L_FREE", StringType()),
                StructField("L_CNT", StringType())
            ])
        )),
        StructField("L_STR2", StringType()),
        StructField("L_STR3", StringType()),
    ])

    df = restructure_schema(df, params_schema) \
        .filter(col("params.logtype") == "LEVEL") \
        .filter(col("dt").isNotNull())

    save_to_delta(df, "specific_level")

In [0]:
date_path = dbutils.widgets.get("date_path")
# date_path = "2025/03/01"
input_path = f"s3a://treenod-line-game-data/LGPKPK/EVENT/{date_path}/"

df = spark.read.format("json").options(
            inferSchema="true",
            compression="snappy",
            pathGlobFilter="*.snappy",
            recursiveFileLookup="true"
        ).load(input_path) \
        .distinct()

In [0]:
df = df.select(
    "*",
    regexp_extract(col("ts"), r"(\d{4}-\d{2}-\d{2})T", 1).alias("dt")
)

# serverless에서 동작하지 않음
df.cache()

In [0]:
run_achievement(df)

In [0]:
run_advertisement(df)

In [0]:
run_item(df)

In [0]:
run_money(df)

In [0]:
run_playend(df)

In [0]:
run_playstart(df)

In [0]:
run_social(df)

In [0]:
run_specific_gacha(df)

In [0]:
run_specific_level(df)

In [0]:
df.unpersist()