In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os
import pandas as pd

In [2]:
spark=SparkSession\
.builder\
.appName("transaction")\
.getOrCreate()

## 1. Loading Data 

### 1.1 Transaction Records

Prepare a merged table for all the transacitons records in the assigned folder

In [3]:
from pyspark.sql.types import StructType,StructField,StringType,DoubleType,BooleanType,IntegerType

#create schema
schema=StructType([
    StructField("user_id",StringType(),False),
    StructField("short_session",StringType(),True),
    StructField("long_session",StringType(),True),
    StructField("url",StringType(),True),
    StructField("path",StringType(),True),
    StructField("action",StringType(),True),
    StructField("amount",DoubleType(),True),
    StructField("product_code",StringType(),True),
    StructField("time_stamp",StringType(),True),
])

#create empty dataframe with schema
emptyRDD=spark.sparkContext.emptyRDD()
trans_merged=spark.createDataFrame(emptyRDD,schema)


#load all the data and merge them into one dataframe
transaction_path=r"D:\academic\academic_record\acamedic_record\generator\output_storage"
for folder in os.listdir(transaction_path):
    full_=os.path.join(transaction_path,folder,'transaction.parquet')
    df=spark.read.format('parquet').load(full_)
    df=(df.select("*",F.col("transaction.amount").alias("amount"),F.col("transaction.product_code").alias("product_code"),
                  F.col("transaction.time_stamp").alias("time_stamp")).drop("transaction"))
    #append the loaded dataframe to the merged datframe
    trans_merged=trans_merged.unionAll(df)

del df

### 1.2  Student Personal Information 

In [4]:
info_path=r"D:\academic\academic_record\acamedic_record\generator\input_storage"
os.chdir(info_path)

#Prepare a merged table for all the student informations in the assigned folder
#define the schema for transcation table
#discarding unrelevant infomration 
info_schema=StructType(
    [
        StructField("user_id",StringType(),False),
        StructField("sex",StringType(),False),
        StructField("grade",StringType(),True),
        StructField('weekly_study_hours',DoubleType(),True),
        StructField("daily_study_hours",DoubleType(),True),
        StructField("is_online",IntegerType(),True),
        StructField("off_line_numbers",IntegerType(),True),
        StructField("satisfaction_last",IntegerType(),True),
        StructField("no_interest",IntegerType(),True),
        StructField("no_prize",IntegerType(),True),
        StructField("no_competitions",IntegerType(),True),
        StructField("age",IntegerType(),True),
        StructField("register_date",StringType(),False)
        ]
)


info_merged=spark.createDataFrame(emptyRDD,info_schema)
for file in os.listdir(info_path):
    full_path=os.path.join(info_path,file,'student_info_temp.json')
    temp=spark.read.option("multiline",True).json(full_path)
    temp=temp.select("user_id","sex","grade","weekly_study_hours","daily_study_hours",
                    "is_online","off_line_numbers","satisfaction_last","no_interest","no_prizes",
                    "no_competitions","age","register_date")
    
    info_merged=info_merged.unionAll(temp)
    
del temp


    

### 1.3 Login  Records

In [5]:
study_hour_path=r"D:\academic\academic_record\acamedic_record\generator\study_hours_output"
os.chdir(study_hour_path)

login_schema=StructType(
    [
        StructField('id',StringType(),False),
        StructField("login_timestamp",StringType(),False),
        StructField("logout_timestamp",StringType(),False),
        StructField("device",StringType(),True)
    ]
)

login_merged=spark.createDataFrame(emptyRDD,login_schema)

for file in os.listdir(study_hour_path):
    full_path=os.path.join(study_hour_path,file,'study_hours.parquet')
    temp=spark.read.format('parquet').load(full_path)
    login_merged=login_merged.unionAll(temp)

del temp
#ogin_merged=spark.createDataFrame(emptyRDD,)

In [6]:
#chnage the column name for the later use
login_merged=login_merged.withColumnRenamed('id','user_id')

### 2 Preprocessing ( Padding leading zero)

In [7]:
year_month_date=F.split(F.split(F.col("register_date"),':')[0],'-')

#padding the leading zero 
year,month,day=year_month_date[0],year_month_date[1],year_month_date[2]
day=F.when(F.length(day)>1,day).otherwise(F.lpad(day,2,'0'))
month=F.when(F.length(month)>1,month).otherwise(F.lpad(month,2,'0'))


info_merged=info_merged.withColumn('register_date',F.concat_ws('-',year,month,day))

### 

### 3. Merge Data

In [8]:
# merge two data sets together
merged_tb=trans_merged.join(info_merged,how='left',on='user_id').join(login_merged,how='left',on='user_id')

In [9]:
merged_tb.createOrReplaceTempView("merged")

In [10]:
def print_(query,toPD=False):
    if toPD:
        return spark.sql(query).toPandas()
    return spark.sql(query).show()
    

In [11]:
merged_tb.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- short_session: string (nullable = true)
 |-- long_session: string (nullable = true)
 |-- url: string (nullable = true)
 |-- path: string (nullable = true)
 |-- action: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- product_code: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- weekly_study_hours: double (nullable = true)
 |-- daily_study_hours: double (nullable = true)
 |-- is_online: long (nullable = true)
 |-- off_line_numbers: long (nullable = true)
 |-- satisfaction_last: long (nullable = true)
 |-- no_interest: long (nullable = true)
 |-- no_prize: long (nullable = true)
 |-- no_competitions: long (nullable = true)
 |-- age: long (nullable = true)
 |-- register_date: string (nullable = true)
 |-- login_timestamp: string (nullable = true)
 |-- logout_timestamp: string (nullable = true)
 |-- device: string (nullable = tr

### 4 Digital Marketing Analysis

### 4.1 Registration Students Trend


<b>English</b>

A high number of new members comes for every January of the year. The registration number is highly affected by the unique consumer behavior( i.e., "new year resolution") this month of the year.  The effect is demonstrated through our result bleow. 

<b>Korean</b>

매년 일월달이 학생들이 회원가입을 많이 하는 시기입니다. 아무래도 새해의 의지를 다질 겸, 학생 수가 증가하는 것은 일반적인 흐름인거 같습니다. 이러한 경향은 아래에 주어질 결과에서도 확인할 수 있습니다. 


In [12]:
query='''
WITH year_month AS(
    SELECT user_id,
           SUBSTRING(register_date,1,7) AS year_month
           FROM merged)
    SELECT year_month,
           COUNT(DISTINCT user_id) AS register_counts,
           LAG(COUNT(DISTINCT user_id)) OVER(ORDER BY year_month) AS previous_month,
           COUNT(DISTINCT user_id)/LAG(COUNT(DISTINCT user_id)) OVER(ORDER BY year_month)*100 AS monthly_over_ratio
           FROM year_month
           GROUP BY year_month
        
'''

print_(query)

+----------+---------------+--------------+------------------+
|year_month|register_counts|previous_month|monthly_over_ratio|
+----------+---------------+--------------+------------------+
|   2022-01|           1010|          null|              null|
|   2022-03|            992|          1010| 98.21782178217822|
+----------+---------------+--------------+------------------+



## 4.2 Repeat Rate


In [13]:
merged_tb.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- short_session: string (nullable = true)
 |-- long_session: string (nullable = true)
 |-- url: string (nullable = true)
 |-- path: string (nullable = true)
 |-- action: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- product_code: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- weekly_study_hours: double (nullable = true)
 |-- daily_study_hours: double (nullable = true)
 |-- is_online: long (nullable = true)
 |-- off_line_numbers: long (nullable = true)
 |-- satisfaction_last: long (nullable = true)
 |-- no_interest: long (nullable = true)
 |-- no_prize: long (nullable = true)
 |-- no_competitions: long (nullable = true)
 |-- age: long (nullable = true)
 |-- register_date: string (nullable = true)
 |-- login_timestamp: string (nullable = true)
 |-- logout_timestamp: string (nullable = true)
 |-- device: string (nullable = tr

In [14]:
query='''
WITH  schedule(interval,day) AS(
    SELECT '01-Day-repeat' as interval,1 
    UNION ALL SELECT '02-Day-repeat', 2 
    UNION ALL SELECT '03-Day-repeat', 3 
    UNION ALL SELECT '04-Day-repeat', 4 
    UNION ALL SELECT '05-Day-repeat', 5 
    UNION ALL SELECT '06-Day-repeat',6 
    UNION ALL SELECT '07-Day-repeat',7)
,action_record AS(
    SELECT m.user_id,
           m.register_date,
           SUBSTRING(m.login_timestamp,1,10) AS login_date,
           MAX(SUBSTRING(m.login_timestamp,1,10)) OVER() AS latest_date,
           DATE_ADD(CAST(SUBSTRING(m.register_date,1,10) AS DATE),s.day) AS index_date,
           s.interval AS index_name
    FROM merged AS m
    CROSS JOIN schedule AS s)
,action_flag AS(
 SELECT user_id,
        register_date,
        index_name,
        index_date,
        SIGN(SUM(CASE WHEN index_date<=latest_date THEN
        CASE WHEN index_date=login_date THEN 1 ELSE 0 END END)) AS index_date_action
        FROM action_record
        GROUP BY user_id,register_date,index_name,index_date
        )
SELECT register_date,
       index_name,
       ROUND(AVG(100*index_date_action),2) AS repeat_rate
    FROM action_flag 
    GROUP BY register_date,index_name

'''
repeat_rate=print_(query,toPD=True)

In [15]:
repeat_table=pd.pivot_table(repeat_rate,index='register_date',columns='index_name',values='repeat_rate')
#remove unnecessary label 
repeat_table.columns.name=None
#present the table with all values in %
(repeat_table.round(2).astype(str)+'%').head()

Unnamed: 0_level_0,01-Day-repeat,02-Day-repeat,03-Day-repeat,04-Day-repeat,05-Day-repeat,06-Day-repeat,07-Day-repeat
register_date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2022-01-01,21.88%,62.5%,75.0%,84.38%,81.25%,87.5%,87.5%
2022-01-02,10.0%,50.0%,85.0%,90.0%,85.0%,80.0%,90.0%
2022-01-03,8.33%,61.11%,77.78%,83.33%,83.33%,88.89%,86.11%
2022-01-04,17.65%,76.47%,82.35%,82.35%,80.39%,86.27%,74.51%
2022-01-05,20.69%,82.76%,89.66%,72.41%,89.66%,89.66%,86.21%


In [16]:
#average of each repeat period
repeat_rate.groupby("index_name")['repeat_rate'].mean().round(2).astype(str)+'%'

index_name
01-Day-repeat    14.94%
02-Day-repeat    73.44%
03-Day-repeat    79.23%
04-Day-repeat    80.87%
05-Day-repeat    79.42%
06-Day-repeat     80.2%
07-Day-repeat    80.18%
Name: repeat_rate, dtype: object


<b>English</b>

The good news is the repeat rate shows a steady increase as the number of days increases. The rate indirectly tells us that users visit our website regularly. Given 7-day periods, it is a reference point to show the intention of our users to continue using our services. However, the analysis based on the 7-day results can not   tell us a  full strocy of  consumer bevhavior.In the upcoming section, additiona analysis is called for to get an compelete undestanding. Meanwhile,If our customers are satisfied with the provision of services, we need to find what features of our platform have attracted our customers. 

<b>Korean</b>

정해진 7일 동안,고객들의 지속률이 지속적으로 상승하는 모습을 보여줍니다. 이는 고객들이 앞으로 주기적으로 우리의 서비스이용을 지속할 것이라는 것을 이야기해줍니다. 여기서 선택된 7일은 학원 온라인 시스템에서는 중요한 지표인데,이를 통해서 고객의 행동에 대한 단기적인 시각을 가질 수 있기 때문입니다. 하지만 이 분석으로 고객의 행동에 대한 전체적인 측면을 보여주는 것이 아니므로, 중단기적인 지표가 필요할거 같습니다.





### 5. Monthly Active User( MAU)

In [93]:
query='''
WITH monthly_active_user AS(
   SELECT distinct user_id,
          SUBSTRING(register_date,1,7) AS register_year_month,
          SUBSTRING(ADD_MONTHS(CAST(SUBSTRING(register_date,1,7) AS DATE),-1),1,7)  AS previous_month,
          SUBSTRING(login_timestamp,1,7) AS action_month
   FROM merged)
  ,customer_types AS(
   SELECT user_id,
       action_month,
       previous_month,
       CASE WHEN register_year_month=action_month THEN 'new_user' 
         WHEN previous_month=LAG(action_month) OVER(PARTITION BY user_id ORDER BY action_month) THEN 'repeat_user'
         ELSE 'comeback user'END AS user_type
   FROM monthly_active_user)
    ,monthly_users AS(
    SELECT m2.action_month,
           COUNT(m2.user_id) AS mau,
           COUNT(CASE WHEN m2.user_type='new_user' THEN 1 END) AS new_users,
           COUNT(CASE WHEN m2.user_type='repeat_user' THEN 1 END) AS repeat_user,
           COUNT(CASE WHEN m2.user_type='comback_user' THEN 1 END) AS comeback_user,
           COUNT(CASE WHEN m2.user_type='repeat_user' AND m1.user_type='new_user' THEN 1 END) AS new_repeat_user,
           COUNT(CASE WHEN m2.user_type='repeat_user' AND m1.user_type='repeat_user' THEN 1 END) AS continous_repeat_user,
           COUNT(CASE WHEN m2.user_type='repeat_user' AND m1.user_type='comeback_user' THEN 1 END) AS comeback_repeat_user
    FROM customer_types AS m1
    LEFT OUTER JOIN customer_types AS  m2
    ON m1.user_id=m2.user_id AND m2.previous_month=m1.action_month
    GROUP BY m2.action_month)
    SELECT *
    FROM monthly_users
'''

print(query_)

Unnamed: 0,action_month,mau,new_user,repeat_user,comback_user,new_repeat_user,continous_repeat_user,comeback_repeat_user
0,2022-01,202313,202313,0,0,0,0,0
1,2022-02,681305,140201,101030,204391,68130,34065,133488
2,2022-03,557890,9810,95001,167367,55789,27894,202029
3,2022-04,447902,8810,78239,134370,44790,22395,159298


<b>English</b>

The education industry in South Korea follows a specific pattern; a significant number of new users are at the beginning of the year and during the exam periods. This pattern is demonstrated through our results above. One concern is that the number of repeated users shows a general downstream.  The relative decrease alerts that the company should be struggling with building the strong attachment of users to the company itself.  

<b>Korean</b>

학교 사교육업계는 큰 특징을 가지고 있습니다. 매년 초기 그리고 내신 기간에 새로운 고객의 유입이 이루어집니다.이러한 패턴은 위에 
나온 결과물을 통해서 다시한번 확일 할 수 있습니다. 다만 반복고객이 시간에 흐름에 따라 떨어지고 있다는 점은 우려할 점 입니다. 주로, 
하향추세가 나오는 것은 학원이 새로운 또는 기존 고객들의 충성심을 제대로 확보하고 있지 않다는 점을 암시해줍니다. 



### 4.  Rentation Rate


In [85]:
query='''
WITH interval(period) AS(
    SELECT 1
    UNION ALL SELECT 2 
    UNION ALL SELECT 3
    UNION ALL SELECT 4
    UNION ALL SELECT 5
    UNION ALL SELECT 6
    UNION ALL SELECT 7
    UNION ALL SELECT 8
    UNION ALL SELECT 9
    UNION ALL SELECT 10
    UNION ALL SELECT 11
    UNION ALL SELECT 12)
,index_info AS(
     SELECT m.user_id,
            SUBSTRING(register_date,1,7) AS register_month,
            SUBSTRING(CAST(ADD_MONTHS(SUBSTRING(register_date,1,7),r.period) AS STRING),1,7) AS index_month
     FROM merged AS m
     CROSS JOIN interval AS r)
,login_track AS(
   SELECT DISTINCT user_id,
          SUBSTRING(login_timestamp,1,7)AS action_month
          FROM merged)
  SELECT i.register_month,
         i.index_month,
         SUM(CASE WHEN action_month IS NOT NULL THEN 1 ELSE 0 END) AS users,
         AVG(CASE WHEN action_month IS NOT NULL THEN 1 ELSE 0 END)*100 AS rentation_rate
  FROM index_info AS i
  LEFT JOIN login_track AS t
  ON i.user_id=t.user_id and  i.index_month=t.action_month
  GROUP BY i.register_month,i.index_month
'''

print_(query)

Unnamed: 0,register_month,index_month,usres,rentation_rate
0,2022-01,2022-02,1343208,99.23
1,2022-02,2022-03,926745,67.6463
2,2022-03,2022-04,557890,37.9759
3,2022-04,2022-05,687902,42.9759


In [95]:
merged_tb.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- short_session: string (nullable = true)
 |-- long_session: string (nullable = true)
 |-- url: string (nullable = true)
 |-- path: string (nullable = true)
 |-- action: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- product_code: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- weekly_study_hours: double (nullable = true)
 |-- daily_study_hours: double (nullable = true)
 |-- is_online: long (nullable = true)
 |-- off_line_numbers: long (nullable = true)
 |-- satisfaction_last: long (nullable = true)
 |-- no_interest: long (nullable = true)
 |-- no_prize: long (nullable = true)
 |-- no_competitions: long (nullable = true)
 |-- age: long (nullable = true)
 |-- register_date: string (nullable = true)
 |-- login_timestamp: string (nullable = true)
 |-- logout_timestamp: string (nullable = true)
 |-- device: string (nullable = tr