In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('csw').getOrCreate()

print(spark.version)

3.2.0


## 1. 해당 전체 기간에서, KST 기준으로 active user 수가 제일 큰 날짜를 구하세요
### 문제 이해 : 날짜 별 user_id의 개수가 가장 큰 날짜를 구한다. 이때 같은 날짜에 중복된 user_id는 1개만 count한다. data_format은 KST기준으로 수행됨
#### [주석]
#### nov = csv파일 불러오기
#### df_first = 날짜, 유저 추출
#### df_g = 날짜로 group화, 유저 중복 제거 카운트
#### df_d = count기준 내림차순 정렬
#### df_last = count가 제일 큰것 하나만 남기기
#### answer = 결과(active user 수가 제일 큰 날짜)

In [35]:
nov = spark.read.csv('2019-Nov.csv', header=True)
df_first = nov.select(date_format(col('event_time'), 'yyyy-MM-dd').alias('event_time'), col('user_id'))
df_g = df_first.groupby("event_time").agg(countDistinct(col('user_id')).alias('count'))
df_d = df_g.sort(desc('count'))
df_last = df_d.select(df_d.event_time).limit(1)
for i in df_last.collect():
    a = tuple(i)
answer = a[0]
print(answer)

2019-11-17


## 2. 1의 날짜에서, 세션이 가장 긴 사용자 10명에 대해 "user_id, session_id, 세션시간"를 구하세요
### 문제 이해 : 세션은 한번 접속에 있는 시간으로 정의, 같은 user_id일 경우 user_session이 같으면 계속 접속 중을 의미함, 세션시간은 한개의 세션의 시작시간과 끝나는 시간의 차이
#### [주석]
#### df_s = 날짜, 유저, 세션, 시간 select
#### df_fil = 1번 과제 날짜(2019-11-17)만 추출
#### df_ma_mi = 유저와 세션을 기준으로 group화 하고 같은 group에서 최대 시간과 최소 시간을 추출
#### df_sts = 최대 시간과 최소 시간의 차이를 초(sec)단위로 구함, 이 시간이 세션 시간이 됨
#### df_so = 세션 시간을 기준으로 내림차순 정렬
#### df_li = 유저, 세션, 세션 시간을 10개만 추출

In [3]:
df_s = nov.select(date_format(col('event_time'), 'yyyy-MM-dd').alias('event_day'), col('user_id'), col('user_session'), date_format(col('event_time'), 'HH:mm:ss').alias('event_time') )
df_fil = df_s.filter(df_s.event_day == answer)
df_ma_mi = df_fil.groupBy('user_id', 'user_session').agg(max('event_time').alias('ma'), min('event_time').alias('mi'))
df_sts = df_ma_mi.withColumn('to_ma',to_timestamp(col('ma'),'HH:mm:ss')).withColumn('to_mi',to_timestamp(col('mi'),'HH:mm:ss')).withColumn('session_time_sec', col('to_ma').cast('long') - col('to_mi').cast('long'))
df_so = df_sts.sort(desc('session_time_sec'))
df_li = df_so.select(df_so.user_id, df_so.user_session.alias('session_id'), df_so.session_time_sec.alias('세션시간')).limit(10)
df_li.show()

+---------+--------------------+--------+
|  user_id|          session_id|세션시간|
+---------+--------------------+--------+
|565022209|a7c5906e-5dd8-417...|   78377|
|568848552|f2d487ec-1a93-47c...|   76709|
|546179105|c28e0611-7590-49c...|   75847|
|557268031|a7b1bde2-4493-4c4...|   74193|
|554760857|c5fc6a55-1735-463...|   72733|
|543658395|8c85d761-88c9-45f...|   71769|
|532969916|1ffae9b4-50e0-4d7...|   70058|
|544784642|cfd07110-7047-4ac...|   66739|
|561409785|a7db81e5-7d03-404...|   65743|
|524688046|0193ea4b-88eb-40a...|   65539|
+---------+--------------------+--------+



## 3. 1의 날짜의 15분 단위로 active user 수를 구하세요
### 문제 이해 : 00:00:00부터 23:59:59 까지 15분 단위로 각 단위에서의 유저 수를 중복 제거하고 구한다. 구간은 start_time이상 finish_time이하이다.
#### [주석] 
#### df_th = 날짜, 유저, 시간 추출
#### df_thf = 1번 과제 날짜(2019-11-17)만 추출
#### df_hm = '시', '분'을 추출하고 '분'을 15로 나눈 몫(소수점 내림)*15를 추출한다. '분'을 15로 나눈 몫(소수점 내림)*15는 15분 단위 구간의 시작 시간(분)이 된다.
#### df_temp = '시', '분'을 15로 나눈 몫(소수점 내림)*15로 group화 하면 15분 단위로 나눌 수 있다. 이때 유저 수를 중복 제거하고 count한다.
#### df_tso = '시'단위로 오름차순 정렬 후 '시'가 같을 때  "'분'을 15로 나눈 몫(소수점 내림)*15"단위로 오름차순 정렬 한다.
#### df_tem = "'분'을 추출하고 '분'을 15로 나눈 몫(소수점 내림)*15"+14를 구한다. 이것은 15분 단위 구간의 끝나는 시간(분)이 된다.
#### df_check = 시작 시간은 시:분:00, 끝나는 시간은 시:분:59 형식으로 만든다
#### df_new = 시작 시간과 끝나는 시간을 timestamp 형식으로 바꿔준다
#### df_ans = 시작 시간과 끝나는 시간 중 시간값만 추출하고 유저 수를 select

In [16]:
df_th = nov.select(date_format(col('event_time'), 'yyyy-MM-dd').alias('event_day'), col('user_id'), date_format(col('event_time'), 'HH:mm:ss').alias('event_time') )
df_thf = df_th.filter(df_th.event_day == answer)
df_hm = df_thf.withColumn('hour', hour('event_time')).withColumn('minute', minute('event_time')).withColumn('minute//15',(floor(col('minute').cast('int')/15))*15)
df_temp = df_hm.groupby('hour','minute//15').agg(countDistinct(col('user_id')).alias('active user'))
df_tso = df_temp.sort(asc('hour'), asc('minute//15'))
df_tem = df_tso.withColumn('f_m', col('minute//15')+14)
df_check = df_tem.withColumn('str_hour',col('hour').cast('string')).withColumn('str_startm', col('minute//15').cast('string')).withColumn('str_finishm', col('f_m').cast('string')).withColumn('str_start', expr("str_hour ||':'|| str_startm ||':'|| 00")).withColumn('str_finish',expr(" str_hour ||':'|| str_finishm ||':'|| 59"))
df_new = df_check.withColumn('start', to_timestamp(col('str_start').cast('string'),'H:m:s')).withColumn('finish', to_timestamp(col('str_finish').cast('string'),'H:m:s'))
df_ans = df_new.select(date_format(col('start'), 'HH:mm:ss').alias('start_time'), (date_format(col('finish'),'HH:mm:ss')).alias('finish_time'), col('active user'))
df_ans.show(df_ans.count(), False)

+----------+-----------+-----------+
|start_time|finish_time|active user|
+----------+-----------+-----------+
|00:00:00  |00:14:59   |18397      |
|00:15:00  |00:29:59   |17563      |
|00:30:00  |00:44:59   |14695      |
|00:45:00  |00:59:59   |16119      |
|01:00:00  |01:14:59   |15257      |
|01:15:00  |01:29:59   |15763      |
|01:30:00  |01:44:59   |16024      |
|01:45:00  |01:59:59   |12350      |
|02:00:00  |02:14:59   |12825      |
|02:15:00  |02:29:59   |14053      |
|02:30:00  |02:44:59   |13852      |
|02:45:00  |02:59:59   |14612      |
|03:00:00  |03:14:59   |12782      |
|03:15:00  |03:29:59   |12220      |
|03:30:00  |03:44:59   |14555      |
|03:45:00  |03:59:59   |13697      |
|04:00:00  |04:14:59   |12041      |
|04:15:00  |04:29:59   |10895      |
|04:30:00  |04:44:59   |9556       |
|04:45:00  |04:59:59   |8205       |
|05:00:00  |05:14:59   |7099       |
|05:15:00  |05:29:59   |6040       |
|05:30:00  |05:44:59   |5243       |
|05:45:00  |05:59:59   |4398       |
|

## 4. 1의 날짜에서 view → cart → purchase 이벤트 진행에 따른 funnel 수치를 구하세요
### 문제 이해 : 제품을 구매하는 과정을 제품을 보고(view) 장바구니에 담고(cart) 장바구니에 담은 제품만 구매(purchase)한다고 이해, view → cart의 funnel은 (cart 수 / view 수)*100 [단위:%], cart → purchase의 funnel은 (purchase 수 / cart 수)*100 [단위:%]로 이해
#### [주석]
#### df_fo = 날짜, 이벤트 타입(view, cart, purchase), 유저 추출
#### df_fofi = 1번 과제 날짜(2019-11-17)만 추출
#### df_fogr = 이벤트 타입을 기준으로 group화, 이벤트 타입 개수 카운트
#### df_fote = 이벤트 타입을 하나의 group으로 묶을 수 있도록 임시 열(temp_group)을 같은 값 '1'을 갖도록 생성
#### df_fopi = temp_group로 group화를 하면서 이벤트 타입이 column이 될 수 있도록 pivot 사용, 값은 '각 이벤트 타입 개수' 이다.
#### df_fofu =  view → cart의 funnel은 (cart 수 / view 수)x100 [단위:%], cart → purchase의 funnel은 (purchase 수 / cart 수)x100 [단위:%], 소수 3번째 자리에서 반올림

In [34]:
df_fo = nov.select(date_format(col('event_time'), 'yyyy-MM-dd').alias('event_day'), col('event_type'), col('user_id'))
df_fofi = df_fo.filter(df_fo.event_day == answer)
df_fogr = df_fofi.groupBy('event_type').count().alias('count')
df_fote = df_fogr.withColumn('temp_group',expr('1'))
df_fopi = df_foso.groupBy('temp_group').pivot('event_type').agg(sum('count'))
df_fofu = df_fopi.select(round(col('cart')/col('view')*100, 2).alias('view→cart[%]'), round(col('purchase')/col('cart')*100, 2).alias('cart→purchase[%]'))
df_fofu.show()

+------------+----------------+
|view→cart[%]|cart→purchase[%]|
+------------+----------------+
|        6.91|           33.77|
+------------+----------------+

