In [0]:
%pyspark

In [2]:
%pyspark
sc

In [4]:
%pyspark
import sys
sys.version_info

In [5]:
%pyspark
import json
raw = '[{"name": "grab"}, {"name": "graby"}, {"name": "new grab"}]'

print(json.loads(raw))

In [6]:
%sh
#User Analytics Data 다운로드
wget --no-check-certificate -r 'https://docs.google.com/uc?export=download&id=1MdN9jWZfiw1gFIfCzJlm8t0b2ooBYLSP' -O raw_data.zip
unzip raw_data.zip
head /zeppelin/session_timestamp.csv

In [7]:
%pyspark
#df로 import하기
df_session_ts = spark.read.option("header", "true").csv('/raw_data/session_timestamp.csv')
df_session_tx = spark.read.option("header", "true").csv('/raw_data/session_transaction.csv')
df_channel = spark.read.option("header", "true").csv('/raw_data/channel.csv')
df_usc = spark.read.option("header", "true").csv('/raw_data/user_session_channel.csv')


# 전체 dataframe 출력
# df_session_ts.show()|

# explain으로 실행계획 확인하기
# df_session_ts.select("ts").explain()

df_usc.limit(1).show()
df_session_ts.limit(1).show()
df_session_tx.limit(1).show()
df_channel.limit(1).show()

#SQL에 사용할 TempView를 만들기
df_usc.createOrReplaceTempView("usc")
df_session_ts.createOrReplaceTempView("st")
df_session_tx.createOrReplaceTempView("tx")
df_channel.createOrReplaceTempView("ch")



In [8]:
%pyspark
#1. 특정 컬럼만 출력하기
# df_session_ts.select('sessionid').show()

# from pyspark.sql.functions import max
# df_session_tx.select(max("amount")).take(1)

#2. 특정 Row만 필터링하기
# df_session_ts.filter(df['ts'] < '2019-05-01 01:00:00').show()

#3. groupby하기 
# df_session_ts.groupby("sessionid").agg({'ts': 'count'}).show()

#4. sort 해서 2개만 가져오기
# desc하려면 모듈 import 필요
# from pyspark.sql.functions import desc

# df_session_ts.sort('ts').take(2)
# df_session_ts.sort(desc('ts')).take(2)







In [9]:
%


In [10]:
%pyspark
# Monthly Active User 구하기

# df_usc.show()
# df_session_ts.show()
from pyspark.sql.functions import count
df_session_ts.alias("st").join(df_usc.alias("usc"), df_session_ts.sessionid == df_usc.sessionid, how='left').groupby('st.ts').agg(count('usc.userid')).show()



In [12]:
SQL로 Growth 지표 뽑아보기

In [13]:
%pyspark

# MAU 구하기
spark.sql("""
SELECT date_format(st.ts, "yyyy-MM") as month , count(usc.userid)
FROM usc 
LEFT JOIN st on usc.sessionid = st.sessionid 
GROUP BY 1
ORDER BY 1
""").show()


In [14]:
%pyspark

# Net Revenue가 가장 큰 UserId 10개 찾기
spark.sql("""
SELECT usc.userid as uid, SUM(IF(tx.refunded = false, tx.amount, 0)) as total
FROM tx
LEFT JOIN usc
ON tx.sessionid = usc.sessionid
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
""").show()


In [15]:
%pyspark
#채녈 별 사용자 구하기
spark.sql("""
SELECT ch.channelname, COUNT(usc.userid) as count
FROM ch
LEFT JOIN usc
ON ch.channelname = usc.channel
GROUP BY ch.channelname
ORDER BY 2 desc
""").show()


In [16]:

%sql

SELECT distinct userid, FIRST_VALUE(channel) over (partition by userid order by ts rows between unbounded preceding and unbounded following) as first, LAST_VALUE(channel) over (partition by userid order by ts rows between unbounded preceding and unbounded following) as last
FROM usc
LEFT JOIN st
ON st.sessionid = usc.sessionid




In [17]:
%sql
SELECT 
    DATE_FORMAT(st.ts,"YYYY-MM") as month, 
    channel, 
    count(distinct usc.userid) as uniqueUsers,
    count(amount) as paidUsers,
    CONCAT(IF(count(amount) = 0, 0,round((count(amount) / count(distinct usc.userid)) * 100,1)),'%') as conversionRate,
    SUM(amount) as growthRevenue,
    SUM(IF(refunded = FALSE ,amount, 0)) as netRevenue
FROM ch
LEFT JOIN usc
on ch.channelname = usc.channel
LEFT JOIN st
on st.sessionid = usc.sessionid
LEFT JOIN tx
on tx.sessionid = usc.sessionid
WHERE channel IS NOT NULL
GROUP BY 1, 2
ORDER BY 1, 2




In [18]:
%pyspark
from pyspark.sql.functions import months_between, countDistinct, col

cohorts = spark.sql("""
SELECT 
    cohort_month,
    months_between("visit_month","cohort_month") as month_diff,
    count(distinct cohort.userid) as unique_users
FROM (
    SELECT userid, MIN(DATE_FORMAT(ts,'yyyy-MM')) as cohort_month
    FROM usc
    JOIN st ON st.sessionid = usc.sessionid
    GROUP BY 1
) as cohort 
LEFT JOIN (
    SELECT distinct userid, DATE_FORMAT(ts,'yyyy-MM') as visit_month
    FROM usc
    JOIN st ON st.sessionid = usc.sessionid    
) as visit
ON cohort.cohort_month <= visit.visit_month AND cohort.userid = visit.userid
GROUP BY 1,2
ORDER BY 1,2
""").cache()
# cohorts.show()
cohorts.show()

In [19]:
%%sql
