# 스파크를 이용한 기본 지표 생성 예제
> 기본 지표를 생성하는 데에 있어, 정해진 틀을 그대로 따라하기 보다, 가장 직관적인 방법을 지속적으로 개선하는 과정을 설명하기 위한 예제입니다. 
첫 번째 예제인 만큼 지표의 복잡도를 줄이기 위해 해당 서비스를 오픈 일자는 2020/10/25 이며, 지표를 집계하는 시점은 2020/10/26 일 입니다

* 원본 데이터를 그대로 읽는 방법
* dataframe api 를 이용하는 방법
* spark.sql 을 이용하는 방법
* 기본 지표 (DAU, PU)를 추출하는 예제 실습
* 날짜에 대한 필터를 넣는 방법
* 날짜에 대한 필터를 데이터 소스에 적용하는 방법
* 기본 지표 (ARPU, ARPPU)를 추출하는 예제 실습
* 스칼라 값을 가져와서 다음 질의문에 적용하는 방법
* 누적 금액을 구할 때에 단순한 방법
* 서비스 오픈 일자의 디멘젼 테이블을 생성하는 방법
* 널 값에 대한 처리하는 방법
* 생성된 데이터 프레임을 저장하는 방법
* 전 일자 데이터를 가져오는 방법
* 요약 지표를 생성할 때에 단순한 방법
* 팩트 테이블을 활용하는 방법

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Data Engineer Basic Day3") \
    .config("spark.dataengineer.basic.day3", "tutorial-1") \
    .getOrCreate()

In [10]:
spark.read.option("inferSchema", "true").option("header", "true").csv("data/log_access.csv").withColumn("a_time", expr("from_unixtime(a_time)")).show()

+-------------------+-----+------+
|             a_time|a_uid|  a_id|
+-------------------+-----+------+
|2020-10-25 17:00:00|    1| login|
|2020-10-25 17:33:20|    1|logout|
|2020-10-25 18:06:40|    2| login|
|2020-10-25 18:23:20|    2|logout|
|2020-10-25 19:13:20|    2| login|
|2020-10-25 20:20:00|    3| login|
|2020-10-25 20:53:20|    3|logout|
|2020-10-25 21:10:00|    4| login|
|2020-10-25 22:16:40|    4|logout|
|2020-10-25 22:21:40|    4| login|
|2020-10-25 22:55:00|    5| login|
|2020-10-25 23:45:00|    5|logout|
|2020-10-26 00:01:40|    6| login|
|2020-10-26 00:51:40|    7| login|
|2020-10-26 01:08:20|    8| login|
|2020-10-26 01:25:00|    9| login|
+-------------------+-----+------+



In [11]:
sc = spark.sparkContext
spark.read.option("inferSchema", "true").option("header", "true").csv("data/tbl_user.csv").createOrReplaceTempView("user")

pWhere=""
spark.read.option("inferSchema", "true").option("header", "true").csv("data/tbl_purchase.csv").withColumn("p_time", expr("from_unixtime(p_time)")).createOrReplaceTempView("purchase")

aWhere=""
spark.read.option("inferSchema", "true").option("header", "true").csv("data/log_access.csv").withColumn("a_time", expr("from_unixtime(a_time)")).createOrReplaceTempView("access")

In [12]:
spark.sql("desc user").show()
spark.sql("desc purchase").show()
spark.sql("desc access").show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|    u_id|      int|   null|
|  u_name|   string|   null|
|u_gender|   string|   null|
|u_signup|      int|   null|
+--------+---------+-------+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  p_time|   string|   null|
|   p_uid|      int|   null|
|    p_id|      int|   null|
|  p_name|   string|   null|
|p_amount|      int|   null|
+--------+---------+-------+

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  a_time|   string|   null|
|   a_uid|      int|   null|
|    a_id|   string|   null|
+--------+---------+-------+



### 과제 1. 주어진 데이터를 이용하여 2020/10/25 기준의 DAU, PU 를 구하시오
* DAU : Daily Active User, 일 별 접속자 수
  - log_access 를 통해 unique 한 a_uid 값을 구합니다
* PU : Purchase User, 일 별 구매자 수
  - tbl_purchase 를 통해 unique 한 p_uid 값을 구합니다

> 값을 구하기 전에 Spark API 대신 Spark SQL 을 이용하기 위해 [createOrReplaceTempView](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=createorreplace#pyspark.sql.DataFrame.createOrReplaceTempView) 를 생성합니다

In [109]:
# DAU - access
spark.sql("select a_time as a_time, a_uid from access").show()
dau = spark.sql("select count(distinct a_uid) as DAU from access where a_time >= '2020-10-25 00:00:00' and a_time < '2020-10-26 00:00:00'")
dau.show()

+-------------------+-----+
|             a_time|a_uid|
+-------------------+-----+
|2020-10-25 17:00:00|    1|
|2020-10-25 17:33:20|    1|
|2020-10-25 18:06:40|    2|
|2020-10-25 18:23:20|    2|
|2020-10-25 19:13:20|    2|
|2020-10-25 20:20:00|    3|
|2020-10-25 20:53:20|    3|
|2020-10-25 21:10:00|    4|
|2020-10-25 22:16:40|    4|
|2020-10-25 22:21:40|    4|
|2020-10-25 22:55:00|    5|
|2020-10-25 23:45:00|    5|
|2020-10-26 00:01:40|    6|
|2020-10-26 00:51:40|    7|
|2020-10-26 01:08:20|    8|
|2020-10-26 01:25:00|    9|
+-------------------+-----+

+---+
|DAU|
+---+
|  5|
+---+



In [110]:
# PU - purchase
spark.sql("select p_time, p_uid from purchase").show()
pu = spark.sql("select count(distinct p_uid) as PU from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00'")
pu.show()

+-------------------+-----+
|             p_time|p_uid|
+-------------------+-----+
|2020-10-25 18:45:50|    1|
|2020-10-26 06:45:55|    1|
|2020-10-26 00:51:40|    2|
|2020-10-25 18:55:55|    3|
|2020-10-26 01:08:20|    4|
|2020-10-25 22:45:55|    5|
|2020-10-25 22:49:15|    5|
+-------------------+-----+

+---+
| PU|
+---+
|  3|
+---+



In [111]:
v_dau = dau.collect()[0]["DAU"]
v_pu = pu.collect()[0]["PU"]

### 과제 2. 주어진 데이터를 이용하여 2020/10/25 기준의 ARPU, ARPPU 를 구하시오
* ARPU : Average Revenue Per User, 유저 당 평균 수익
  - 해당 일자의 전체 수익 (Total Purchase Amount) / 해당 일에 접속한 유저 수 (DAU)
* ARPPU : Average Revenue Per Purchase User, 구매 유저 당 평균 수익
  - 해당 일자의 전체 수익 (Total Purchase Amount) / 해당 일에 접속한 구매 유저 수 (PU)

In [118]:
# ARPU - total purchase amount, dau

query="select sum(p_amount) / {} from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00'".format(v_dau)
print(query)

total_purchase_amount = spark.sql("select sum(p_amount) as total_purchase_amount from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00'")
total_purchase_amount.show()

spark.sql("select sum(p_amount) / 5 from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00'").show()

spark.sql("select sum(p_amount) / {} as ARPU from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00'".format(v_dau)).show()

select sum(p_amount) / 5 from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00'
+---------------------+
|total_purchase_amount|
+---------------------+
|              9000000|
+---------------------+

+-------------------------------------------------------------------+
|(CAST(sum(CAST(p_amount AS BIGINT)) AS DOUBLE) / CAST(5 AS DOUBLE))|
+-------------------------------------------------------------------+
|                                                          1800000.0|
+-------------------------------------------------------------------+

+---------+
|     ARPU|
+---------+
|1800000.0|
+---------+



In [116]:
# ARPPU - total purchase amount, pu
v_amt = total_purchase_amount.collect()[0]["total_purchase_amount"]
print("| ARPPU | {} |".format(v_amt / v_pu))

| ARPPU | 3000000.0 |


### 과제 3. 주어진 데이터를 이용하여 2020/10/26 현재의 "누적 매출 금액" 과 "누적 접속 유저수"를 구하시오
* 누적 매출 금액 : 10/25 (오픈) ~ 현재
  - 전체 로그를 읽어서 매출 금액의 합을 구한다
  - 유저별 매출 정보를 누적하여 저장해두고 재활용한다
* 누적 접속 유저수 : 10/25 (오픈) ~ 현재
  - 전체 로그를 읽어서 접속자의 유일한 수를 구한다
  - 유저별 접속 정보를 누적하여 저장해두고 재활용한다

In [120]:
# 누적 매출 금액
spark.sql("select sum(p_amount) from purchase ").show()

# 누적 접속 유저수
spark.sql("select count(distinct a_uid) from access").show()

+-------------+
|sum(p_amount)|
+-------------+
|     16700000|
+-------------+

+---------------------+
|count(DISTINCT a_uid)|
+---------------------+
|                    9|
+---------------------+



### User Dimension 테이블 설계
| 컬럼 명 | 컬럼 타입 | 컬럼 설명 |
| :- | :-: | :- |
| d_uid | int | 유저 아이디 |
| d_name | string | 고객 이름 |
| d_pamount | int | 누적 구매 금액 |
| d_pcount | int | 누적 구매 횟수 |
| d_acount | int | 누적 접속 횟수 |

In [24]:
# 오픈 첫 날의 경우 예외적으로 별도의 프로그램을 작성합니다
# 
# 1. 가장 큰 레코드 수를 가진 정보가 접속정보이므로 해당 일자의 이용자 별 접속 횟수를 추출합니다
# 단, login 횟수를 접속 횟수로 가정합니다 - logout 만 있는 경우는 login 유실 혹은 전일자의 로그이므로 이러한 경우는 제외합니다
spark.sql("describe access").show()
spark.sql("select * from access where a_id = 'login' and a_time >= '2020-10-25 00:00:00' and a_time < '2020-10-26 00:00:00'").show()
uids = spark.sql("select a_uid, count(a_uid) as acount from access where a_id = 'login' and a_time >= '2020-10-25 00:00:00' and a_time < '2020-10-26 00:00:00' group by a_uid")
uids.show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  a_time|   string|   null|
|   a_uid|      int|   null|
|    a_id|   string|   null|
+--------+---------+-------+

+-------------------+-----+-----+
|             a_time|a_uid| a_id|
+-------------------+-----+-----+
|2020-10-25 17:00:00|    1|login|
|2020-10-25 18:06:40|    2|login|
|2020-10-25 19:13:20|    2|login|
|2020-10-25 20:20:00|    3|login|
|2020-10-25 21:10:00|    4|login|
|2020-10-25 22:21:40|    4|login|
|2020-10-25 22:55:00|    5|login|
+-------------------+-----+-----+

+-----+------+
|a_uid|acount|
+-----+------+
|    1|     1|
|    3|     1|
|    5|     1|
|    4|     2|
|    2|     2|
+-----+------+



In [26]:

# 2. 해당 일자의 이용자 별 총 매출 금액과, 구매 횟수를 추출합니다
spark.sql("describe purchase").show()
amts = spark.sql("select p_uid, sum(p_amount) as pamount, count(p_uid) as pcount from purchase where p_time >= '2020-10-25 00:00:00' and p_time < '2020-10-26 00:00:00' group by p_uid")
amts.show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  p_time|   string|   null|
|   p_uid|      int|   null|
|    p_id|      int|   null|
|  p_name|   string|   null|
|p_amount|      int|   null|
+--------+---------+-------+

+-----+-------+------+
|p_uid|pamount|pcount|
+-----+-------+------+
|    1|2000000|     1|
|    3|1000000|     1|
|    5|6000000|     2|
+-----+-------+------+



In [49]:
# 3. 이용자 접속횟수 + 총구매금액 + 구매횟수 (uids + amts)

uids.printSchema()
amts.printSchema()

dim1 = uids.join(amts, uids["a_uid"] == amts["p_uid"], how="left").sort(uids["a_uid"].asc())
dim2 = dim1.withColumnRenamed("a_uid", "d_uid") \
.withColumnRenamed("acount", "d_acount") \
.drop("p_uid") \
.withColumnRenamed("pamount", "d_pamount") \
.withColumnRenamed("pcount", "d_pcount")
dim2.show()

root
 |-- a_uid: integer (nullable = true)
 |-- acount: long (nullable = false)

root
 |-- p_uid: integer (nullable = true)
 |-- pamount: long (nullable = true)
 |-- pcount: long (nullable = false)

+-----+--------+---------+--------+
|d_uid|d_acount|d_pamount|d_pcount|
+-----+--------+---------+--------+
|    1|       1|  2000000|       1|
|    2|       2|     null|    null|
|    3|       1|  1000000|       1|
|    4|       2|     null|    null|
|    5|       1|  6000000|       2|
+-----+--------+---------+--------+



In [56]:
# 4. 이용자 정보를 덧 붙입니다
user = spark.sql("select * from user")
user.show()

dim3 = dim2.join(user, dim2["d_uid"] == user["u_id"], "left")
dim4 = dim3.withColumnRenamed("u_name", "d_name") \
.withColumnRenamed("u_gender", "d_gender")

dim5 = dim4.select("d_uid", "d_name", "d_gender", "d_acount", "d_pamount", "d_pcount")
dimension = dim5.na.fill({"d_pamount":0, "d_pcount":0})
dimension.show()

+----+----------+--------+--------+
|u_id|    u_name|u_gender|u_signup|
+----+----------+--------+--------+
|   1|    정휘센|      남|19580808|
|   2|  김싸이언|      남|19590201|
|   3|    박트롬|      여|19951030|
|   4|    청소기|      남|19770329|
|   5|유코드제로|      여|20021029|
|   6|  윤디오스|      남|20040101|
|   7|  임모바일|      남|20040807|
|   8|  조노트북|      여|20161201|
|   9|  최컴퓨터|      남|20201124|
+----+----------+--------+--------+

+-----+----------+--------+--------+---------+--------+
|d_uid|    d_name|d_gender|d_acount|d_pamount|d_pcount|
+-----+----------+--------+--------+---------+--------+
|    1|    정휘센|      남|       1|  2000000|       1|
|    2|  김싸이언|      남|       2|        0|       0|
|    3|    박트롬|      여|       1|  1000000|       1|
|    4|    청소기|      남|       2|        0|       0|
|    5|유코드제로|      여|       1|  6000000|       2|
+-----+----------+--------+--------+---------+--------+



In [57]:
# 4. 다음날 해당 데이터를 사용하도록 하기 위해 일자별 경로에 저장합니다
# - ./users/dt=20201025/
target="./users/dt=20201025"
dimension.write.mode("overwrite").parquet(target)

In [58]:
yesterday = spark.read.parquet(target)
yesterday.sort(yesterday["d_uid"].asc()).show()

+-----+----------+--------+--------+---------+--------+
|d_uid|    d_name|d_gender|d_acount|d_pamount|d_pcount|
+-----+----------+--------+--------+---------+--------+
|    1|    정휘센|      남|       1|  2000000|       1|
|    2|  김싸이언|      남|       2|        0|       0|
|    3|    박트롬|      여|       1|  1000000|       1|
|    4|    청소기|      남|       2|        0|       0|
|    5|유코드제로|      여|       1|  6000000|       2|
+-----+----------+--------+--------+---------+--------+



In [None]:
# 5. 다음 날 동일한 지표를 생성하되 이전 일자의 정보에 누적한 지표를 생성합니다


### 과제 4. 주어진 데이터를 이용하여 2020/10/25 기준의 


### 과제 5. 주어진 데이터를 이용하여 2020/10/25 기준의 


### 과제 6. 주어진 데이터를 이용하여 2020/10/25 기준의 
