In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import when
import pyspark.sql.functions as sf
import os
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_date, date_format

In [2]:
spark = SparkSession.builder.config('spark.driver.memory','4g').getOrCreate()
filenamedatapath = "d:\\LEARNING\\Study_DE\\Data\\Dataset\\log_content\\"
savepath = "D:\\KQ\\result\\output.csv"

In [3]:
listStr = []
listInt = list(range(20220401,20220405 + 1))
for item in listInt:
    listStr.append(str(item))
filesname = [filenamedatapath + fname + ".json" for fname in listStr]

In [4]:
schema = StructType([
        StructField("AppName", StringType(), True),
        StructField("Contract", StringType(), True),
        StructField("Mac", StringType(), True),
        StructField("TotalDuration", IntegerType(), True),
        StructField("Date", StringType(), True)
    ])
outputData = spark.createDataFrame([], schema=schema)

for name in filesname:
        df = spark.read.json(name)
        df = df.select('_source.*')

        date = os.path.basename(name)
        date_str = date.split('.')[0]

        df = df.withColumn("Date",lit(date_str))

        outputData = outputData.union(df)

        #print("Done")

In [5]:
outputData = outputData.withColumn('Contract',trim(outputData.Contract))

In [6]:
outputData.show(5)

+-------+---------+------------+-------------+--------+
|AppName| Contract|         Mac|TotalDuration|    Date|
+-------+---------+------------+-------------+--------+
|  KPLUS|HNH579912|0C96E62FC55C|          254|20220401|
|  KPLUS|HUFD40665|CCEDDC333614|         1457|20220401|
|  KPLUS|HNH572635|B068E6A1C5F6|         2318|20220401|
|  KPLUS|HND141717|08674EE8D2C2|         1452|20220401|
|  KPLUS|HNH743103|402343C25D7D|          251|20220401|
+-------+---------+------------+-------------+--------+
only showing top 5 rows



In [7]:
outputData = outputData.withColumn("Type",when((col("AppName") == 'CHANNEL') | (col("AppName") =='DSHD')| (col("AppName") =='KPLUS')| (col("AppName") =='KPlus'), "Truyền Hình")
        .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS_RES')| (col("AppName") =='BHD_RES')| 
             (col("AppName") =='VOD_RES')| (col("AppName") =='FIMS')| (col("AppName") =='BHD')| (col("AppName") =='DANET'), "Phim Truyện")
        .when((col("AppName") == 'RELAX')|(col("AppName") == 'APP'), "Giải Trí")
        .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
        .when((col("AppName") == 'SPORT'), "Thể Thao")
        .otherwise("Error"))
    
print("Chuẩn hóa AppName xong")

outputData = outputData.groupBy('Contract','Type','Date').sum('TotalDuration').withColumnRenamed('sum(TotalDuration)','TotalDuration')
print("GroupBy xong")

outputData = outputData.groupBy('Contract','Date').pivot('Type').sum('TotalDuration')
print("Pivot xong")

Chuẩn hóa AppName xong
GroupBy xong
Pivot xong


In [8]:
outputData = outputData.withColumnsRenamed({'Giải Trí':'RelaxDuration','Phim Truyện':'MovieDuration',
                                                'Thiếu nhi':'ChildDuration','Thể Thao':'SportDuration','Truyền Hình':'TVDuration'})
outputData = outputData.fillna(0)

In [9]:
most_watch = greatest(col("RelaxDuration"),col("MovieDuration"),col("ChildDuration"),col("SportDuration"),col("TVDuration"))

outputData = outputData.withColumn("Most_Watch",when((col("RelaxDuration") == most_watch),'Relax').
              when((col("MovieDuration") == most_watch),'Movie').
              when((col("ChildDuration") == most_watch),'Child').
              when((col("SportDuration") == most_watch),'Sport').
              when((col("TVDuration") == most_watch),'TV'))

In [10]:
cust_taste = concat_ws(',',when(col("RelaxDuration") != 0 ,'Relax'),
                       when(col("MovieDuration") != 0 ,'Movie'),
                       when(col("ChildDuration") != 0 ,'Child'),
                       when(col("SportDuration") != 0 ,'Sport'),
                       when(col("TVDuration") != 0 ,'TV'))

outputData = outputData.withColumn('Customer_Taste', cust_taste)
outputData = outputData.withColumn("Customer_Taste",when(col('Customer_Taste') == 'Relax,Movie,Child,Sport,TV','ALL').otherwise(col("Customer_taste")))

In [11]:
outputData.groupby("Contract").agg(count(col("Contract")).alias("Number_Active_Date")).show()

+---------+------------------+
| Contract|Number_Active_Date|
+---------+------------------+
|BID006982|                 5|
|TBFD06119|                 5|
|SGH592445|                 5|
|HNH579514|                 5|
|BED003648|                 5|
|SGH068360|                 4|
|DAD046926|                 5|
|VLFD04186|                 3|
|BDH053839|                 5|
|SGH729350|                 5|
|HNJ158334|                 5|
|BGD025937|                 5|
|HNH720916|                 5|
|SLD000654|                 5|
|HYFD35051|                 5|
|GLFD18874|                 5|
|DAFD39282|                 5|
|QNFD87881|                 5|
|HDFD42710|                 5|
|PTFD05875|                 5|
+---------+------------------+
only showing top 20 rows



In [12]:
count_active_date = outputData.groupby("Contract").agg(count(col("Contract")).alias("Number_Active_Date"))

In [13]:
count_active_date.filter(col('Contract') == 'BID006982').show()

+---------+------------------+
| Contract|Number_Active_Date|
+---------+------------------+
|BID006982|                 5|
+---------+------------------+



In [22]:
outputData.join(count_active_date,on = "Contract",how = 'left').show()

+---------+--------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+
| Contract|    Date|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Most_Watch|Customer_Taste|Number_Active_Date|
+---------+--------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+
|AGFD14221|20220401|            0|         9267|            0|            0|      8721|     Movie|      Movie,TV|                 5|
|BEFD22243|20220401|            0|            0|            0|            0|        40|        TV|            TV|                 4|
|DAFD76913|20220401|            0|           87|            0|            0|     59709|        TV|      Movie,TV|                 5|
|HND075685|20220401|            0|            0|            0|            0|     79984|        TV|            TV|                 5|
|HND330927|20220401|            0|         1457|            0|       

In [14]:
outputData = outputData.join(count_active_date,on = "Contract",how = 'left')

In [15]:
outputData = outputData.withColumn('Activeness', format_number((col("Number_Active_Date") / 30) * 100, 2))

In [24]:
outputData.filter(col('Contract') == 'BID006982').show()

+---------+--------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+
| Contract|    Date|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Most_Watch|Customer_Taste|Number_Active_Date|
+---------+--------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+
|BID006982|20220401|        10315|         1083|            0|            0|       962|     Relax|Relax,Movie,TV|                 5|
|BID006982|20220404|            0|         4991|            0|            0|      1685|     Movie|      Movie,TV|                 5|
|BID006982|20220405|            0|         3216|            0|            0|         0|     Movie|         Movie|                 5|
|BID006982|20220403|           33|         2073|            0|            0|        13|     Movie|Relax,Movie,TV|                 5|
|BID006982|20220402|        47301|            0|            0|       

In [22]:
outputData.show(5)

+---------+--------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+----------+
| Contract|    Date|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Most_Watch|Customer_Taste|Number_Active_Date|Activeness|
+---------+--------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+----------+
|AGFD14221|20220401|            0|         9267|            0|            0|      8721|     Movie|      Movie,TV|                 5|     16.67|
|BEFD22243|20220401|            0|            0|            0|            0|        40|        TV|            TV|                 4|     13.33|
|NDFD13360|20220401|            0|        18481|            0|            0|      1112|     Movie|      Movie,TV|                 3|     10.00|
|VLFD11547|20220401|            0|            0|            0|            0|     82434|        TV|            TV|                 3|    

In [22]:
outputData = outputData.withColumn("Date",date_format(to_date(col('Date'),'yyyyMMdd'),'yyyy-MM-dd'))

In [23]:
outputData.show(3)

+---------+----------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+----------+
| Contract|      Date|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Most_Watch|Customer_Taste|Number_Active_Date|Activeness|
+---------+----------+-------------+-------------+-------------+-------------+----------+----------+--------------+------------------+----------+
|BEFD22243|2022-04-01|            0|            0|            0|            0|        40|        TV|            TV|                 4|     13.33|
|NDFD13360|2022-04-01|            0|        18481|            0|            0|      1112|     Movie|      Movie,TV|                 3|     10.00|
|VLFD11547|2022-04-01|            0|            0|            0|            0|     82434|        TV|            TV|                 3|     10.00|
+---------+----------+-------------+-------------+-------------+-------------+----------+----------+--------------+---------

In [23]:
def import_To_MySql(data,dbname,table_name,username,password):
    
    data.write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/"+ dbname) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("batchsize", 5000) \
    .option("password", password) \
    .save()
    
    print("Import to MySQL completed")

In [24]:
import_To_MySql(outputData,"storagedb","customer_info","root",12345678)

Import to MySQL completed
