# Packages Loading

In [1]:
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Spark Config Setting

In [2]:
SPARK_JAR_DIR = os.getenv("SPARK_JAR_DIR", "/opt/spark/jars")

SPARK_STAND_ALONE_CORES = {
    "spark.cores.max": 3
}

SPARK_DRIVER_CONFIGS = {
    "spark.driver.memory": "1g"
}

SPARK_EXECUTOR_CONFIGS = {
    "spark.executor.cores": 1,
    "spark.executor.instances": 1,
    "spark.executor.memory": "1g",
}

spark = SparkSession.builder \
    .appName("etl_snapshot") \
    .master("spark://spark-master:7077") \
    .config("spark.jars", f"{SPARK_JAR_DIR}/mysql-connector-java-8.0.28.jar") \
    .config("spark.ui.port", "4040") \
    .config(map=SPARK_DRIVER_CONFIGS) \
    .config(map=SPARK_EXECUTOR_CONFIGS) \
    .config(map=SPARK_STAND_ALONE_CORES) \
    .getOrCreate()

spark.sparkContext.setJobGroup("Monthly User Movements Snapshot", "Monthly Active Users (MAU) composed of new users, returning users, and existing users", interruptOnCancel=False)

# RDB Connection Info

In [3]:
DB_CONN_INFO = {
    "user": "root",
    "password": "root",
    "url": "jdbc:mysql://mysql:3306/mysql",
    "driver": "com.mysql.cj.jdbc.Driver"
}

# Data Sourcing

### 1. 검색 조건 일자 세팅 (Delta Load)

In [4]:
start_de = "1970-01-01"
end_de = "2022-07-31"

### 2. 검색 조건에 부합하는 전체 데이터 셋 집합 카운트 수 산출

In [5]:
count_sql = f"""
    SELECT MAX(CUST_ID)
      FROM dm.dm_f_ordr
     WHERE ORDR_DE BETWEEN '{start_de}' AND '{end_de}'
"""

# 검색할 범주의 전체 행의 개수 산출 (파티션 수를 균등하게 분할하여 읽기 위한 전체 개수 탐색)
row_count_sql =  spark.read.format("jdbc") \
    .options(**DB_CONN_INFO) \
    .option("query", count_sql) \
    .load()

row_count = row_count_sql.first()[0]

### 3. 여러 개의 파티션으로 균등 분할하여 데이터 Read

In [6]:
dbtable = f"""(
SELECT CUST_ID
     , DATE(YM01)                                                                          AS ORDR_YM_01
     , DATE_FORMAT(YM01, '%Y-%m')                                                          AS ORDR_YM
     , LAG(YM01, 1) OVER (PARTITION BY CUST_ID ORDER BY YM01)                              AS PRIOR_ORDR_YM
     , TIMESTAMPDIFF(MONTH, LAG(YM01, 1) OVER (PARTITION BY CUST_ID ORDER BY YM01), YM01)  AS ORDR_MONTH_DIFF
FROM (    
    SELECT DISTINCT CUST_ID
                  , DATE_FORMAT(ORDR_DE, '%Y-%m-01') AS YM01
      FROM dm.dm_f_ordr
     WHERE ORDR_DE BETWEEN '{start_de}' AND '{end_de}'
    ) AS user_monthly_order
) as subquery"""

sdf = spark.read.format("jdbc") \
    .options(**DB_CONN_INFO) \
    .option("numPartitions", 3) \
    .option("dbtable", dbtable) \
    .option("partitionColumn", "CUST_ID") \
    .option("lowerBound", 1) \
    .option("upperBound", row_count) \
    .load()

sdf.cache()

DataFrame[CUST_ID: int, ORDR_YM_01: date, ORDR_YM: string, PRIOR_ORDR_YM: string, ORDR_MONTH_DIFF: bigint]

In [7]:
# 파티션 개수
sdf.rdd.getNumPartitions()

3

In [8]:
# 파티션별 (파티션 인덱스, 데이터) 개수 => 3개 파티션으로 데이터를 분할해서 읽음 (각 파티션에는 customer_id별로 모아져있음)
sdf.rdd.mapPartitionsWithIndex(lambda idx, it: [(idx, sum(1 for _ in it))]).collect()

[(0, 190486), (1, 187891), (2, 187164)]

In [9]:
# 데이터프레임 스키마 확인
sdf.printSchema()

root
 |-- CUST_ID: integer (nullable = true)
 |-- ORDR_YM_01: date (nullable = true)
 |-- ORDR_YM: string (nullable = true)
 |-- PRIOR_ORDR_YM: string (nullable = true)
 |-- ORDR_MONTH_DIFF: long (nullable = true)



In [10]:
# Case 1 : ORDR_MONTH_DIFF => null. 그 달의 신규 회원.
new_user_sdf = sdf.filter(F.col("ORDR_MONTH_DIFF").isNull()).groupBy("ORDR_YM").agg(F.count("ORDR_YM").alias("new_user"))

# Case 2 : ORDR_MONTH_DIFF => 3 이상. 복귀 사용자
old_user_sdf = sdf.filter(F.col("ORDR_MONTH_DIFF") >= 3).groupBy("ORDR_YM").agg(F.count("ORDR_YM").alias("returning_user"))

# Case 3 : ORDR_MONTH_DIFF => 3 미만. 기존 사용자
exist_user_sdf = sdf.filter(F.col("ORDR_MONTH_DIFF") <= 2).groupBy("ORDR_YM").agg(F.count("ORDR_YM").alias("existing_user"))

In [11]:
new_old = new_user_sdf.join(other=old_user_sdf, on="ORDR_YM", how="full_outer")

In [12]:
mau_users_sdf = new_old.join(other=exist_user_sdf, on="ORDR_YM", how="full_outer")
mau_users_sdf = mau_users_sdf.na.fill(0)
mau_users_sdf = mau_users_sdf.withColumn("mau", F.col("new_user") + F.col("returning_user") + F.col("existing_user"))

In [13]:
mau_users_sdf.show(200)

+-------+--------+--------------+-------------+-----+
|ORDR_YM|new_user|returning_user|existing_user|  mau|
+-------+--------+--------------+-------------+-----+
|2016-06|       1|             0|            0|    1|
|2016-07|     268|             0|            0|  268|
|2016-08|     423|             0|           43|  466|
|2016-09|     435|             0|          150|  585|
|2016-10|     512|            10|          252|  774|
|2016-11|     488|            42|          372|  902|
|2016-12|     280|            54|          494|  828|
|2017-01|     498|           112|          532| 1142|
|2017-02|     471|           138|          620| 1229|
|2017-03|     516|           144|          746| 1406|
|2017-04|     498|           171|          886| 1555|
|2017-05|     463|           221|         1008| 1692|
|2017-06|     277|           258|         1105| 1640|
|2017-07|     981|           303|         1183| 2467|
|2017-08|     590|           346|         1393| 2329|
|2017-09|     592|          

In [14]:
mau_users_sdf.explain(mode="formatted")

== Physical Plan ==
AdaptiveSparkPlan (35)
+- Project (34)
   +- Project (33)
      +- SortMergeJoin FullOuter (32)
         :- Sort (22)
         :  +- Exchange (21)
         :     +- Project (20)
         :        +- SortMergeJoin FullOuter (19)
         :           :- Sort (9)
         :           :  +- HashAggregate (8)
         :           :     +- Exchange (7)
         :           :        +- HashAggregate (6)
         :           :           +- Project (5)
         :           :              +- Filter (4)
         :           :                 +- InMemoryTableScan (1)
         :           :                       +- InMemoryRelation (2)
         :           :                             +- * Scan JDBCRelation((
SELECT CUST_ID
     , DATE(YM01)                                                                          AS ORDR_YM_01
     , DATE_FORMAT(YM01, '%Y-%m')                                                          AS ORDR_YM
     , LAG(YM01, 1) OVER (PARTITION BY CUST_ID ORDE

# Data Sinks

In [15]:
# Saving data to a JDBC source
mau_users_sdf.repartition(4).write \
    .format("jdbc") \
    .options(**DB_CONN_INFO) \
    .option("dbtable", "dw.snap_user_movenet") \
    .mode("overwrite") \
    .save()

# Stop SparkSession

In [16]:
spark.stop()