In [1]:
import findspark

findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.csv("./Clean_Data/cleaned_data.csv", header=True)

In [5]:
df.show()

+---------+-------------+----------------+--------------+-------------+----------------+
| Contract|sum(Giải Trí)|sum(Phim Truyện)|sum(Thiếu Nhi)|sum(Thể Thao)|sum(Truyền Hình)|
+---------+-------------+----------------+--------------+-------------+----------------+
|AGAAA0848|         null|            null|          null|         null|           12141|
|AGAAA2588|         null|            null|          null|         null|         1078595|
|AGD003807|         null|          153369|           164|         null|            4352|
|AGD004253|         null|            null|          null|         null|          264972|
|AGD008179|        13225|          135699|          null|         null|          115222|
|AGD011212|         null|            3799|          null|         null|          308980|
|AGD022636|           35|            null|          null|         null|           26553|
|AGD026510|         null|            null|          null|         null|          676380|
|AGD029035|         n

In [6]:
df = df.withColumnRenamed('sum(Giải Trí)', 'RelaxDuration') \
.withColumnRenamed('sum(Phim Truyện)', 'MovieDuration') \
.withColumnRenamed('sum(Thiếu Nhi)', 'ChildDuration') \
.withColumnRenamed('sum(Thể Thao)', 'SportDuration') \
.withColumnRenamed('sum(Truyền Hình)', 'TVDuration')

In [7]:
df.show()

+---------+-------------+-------------+-------------+-------------+----------+
| Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|
+---------+-------------+-------------+-------------+-------------+----------+
|AGAAA0848|         null|         null|         null|         null|     12141|
|AGAAA2588|         null|         null|         null|         null|   1078595|
|AGD003807|         null|       153369|          164|         null|      4352|
|AGD004253|         null|         null|         null|         null|    264972|
|AGD008179|        13225|       135699|         null|         null|    115222|
|AGD011212|         null|         3799|         null|         null|    308980|
|AGD022636|           35|         null|         null|         null|     26553|
|AGD026510|         null|         null|         null|         null|    676380|
|AGD029035|         null|         9345|         null|         null|    822316|
|AGFD02813|         null|        98056|         null

In [8]:
def unpivot_data(df):
    a = df.select('Contract', 'RelaxDuration').withColumn('Category', lit('Relax')).withColumnRenamed('RelaxDuration', 'TotalDuration')
    b = df.select('Contract', 'MovieDuration').withColumn('Category', lit('Movie')).withColumnRenamed('MovieDuration', 'TotalDuration')
    c = df.select('Contract', 'ChildDuration').withColumn('Category', lit('Child')).withColumnRenamed('ChildDuration', 'TotalDuration')
    d = df.select('Contract', 'SportDuration').withColumn('Category', lit('Sport')).withColumnRenamed('SportDuration', 'TotalDuration')
    e = df.select('Contract', 'TVDuration').withColumn('Category', lit('TV')).withColumnRenamed('TVDuration', 'TotalDuration')
    
    t = a.union(b).union(c).union(d).union(e)
    t = t.withColumn('TotalDuration', t.TotalDuration.cast(IntegerType()))
    return t

In [9]:
def calc_most_watch(data_unpivot):
    windowSpec  = Window.partitionBy("Contract").orderBy(desc("TotalDuration"))
    t = data_unpivot.withColumn("Rank",rank().over(windowSpec))
    t = t.filter(col('Rank') == 1)  
    t = t.select('Contract', 'Category').withColumnRenamed('Category', 'Most_Watch')
    return t

In [10]:
data_unpivot = unpivot_data(df)
most_watch = calc_most_watch(data_unpivot)

In [11]:
df = df.join(most_watch, "Contract", "inner")

In [12]:
df.show()

+--------------+-------------+-------------+-------------+-------------+----------+----------+
|      Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Most_Watch|
+--------------+-------------+-------------+-------------+-------------+----------+----------+
|113.182.209.48|           89|         null|         null|         null|        63|     Relax|
|14.182.110.125|           92|         null|         null|         null|       404|        TV|
|     AGAAA0338|         null|         null|         null|         null|    278633|        TV|
|     AGAAA0342|          204|         null|         null|         null|    117788|        TV|
|     AGAAA0346|         null|         null|         null|         null|   2056249|        TV|
|     AGAAA0353|         null|         1665|         null|         null|     25982|        TV|
|     AGAAA0372|         null|         null|         null|         null|     13123|        TV|
|     AGAAA0391|          373|          129|      

In [13]:
def calc_taste(df):
    calculate_taste = df.withColumn('RelaxDuration',when(df.RelaxDuration.isNotNull(),'Relax').otherwise(df.RelaxDuration))
    calculate_taste = calculate_taste.withColumn('MovieDuration',when(calculate_taste.MovieDuration.isNotNull(),'Movie').otherwise(calculate_taste.MovieDuration))
    calculate_taste = calculate_taste.withColumn('ChildDuration',when(calculate_taste.ChildDuration.isNotNull(),'Child').otherwise(calculate_taste.ChildDuration))
    calculate_taste = calculate_taste.withColumn('SportDuration',when(calculate_taste.SportDuration.isNotNull(),'Sport').otherwise(calculate_taste.SportDuration))
    calculate_taste = calculate_taste.withColumn('TVDuration',when(calculate_taste.TVDuration.isNotNull(),'TV').otherwise(calculate_taste.TVDuration))
    calculate_taste = calculate_taste.withColumn('Taste', concat_ws('-', *[c for c in calculate_taste.columns if (c!='Contract' and c!='Most_Watch')]))
    calculate_taste = calculate_taste.select('Contract','Taste')
    return calculate_taste

In [14]:
calculate_taste = calc_taste(df)

In [15]:
df1 = df.join(calculate_taste, "Contract", "inner")

In [16]:
df1 = df1.withColumn('RelaxDuration', df1.RelaxDuration.cast(IntegerType())) \
    .withColumn('MovieDuration', df1.MovieDuration.cast(IntegerType())) \
    .withColumn('ChildDuration', df1.ChildDuration.cast(IntegerType())) \
    .withColumn('SportDuration', df1.SportDuration.cast(IntegerType())) \
    .withColumn('TVDuration', df1.TVDuration.cast(IntegerType()))

In [17]:
df1.printSchema()

root
 |-- Contract: string (nullable = true)
 |-- RelaxDuration: integer (nullable = true)
 |-- MovieDuration: integer (nullable = true)
 |-- ChildDuration: integer (nullable = true)
 |-- SportDuration: integer (nullable = true)
 |-- TVDuration: integer (nullable = true)
 |-- Most_Watch: string (nullable = false)
 |-- Taste: string (nullable = false)



In [18]:
df1.show()

+--------------+-------------+-------------+-------------+-------------+----------+----------+--------------+
|      Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Most_Watch|         Taste|
+--------------+-------------+-------------+-------------+-------------+----------+----------+--------------+
|113.182.209.48|           89|         null|         null|         null|        63|     Relax|      Relax-TV|
|14.182.110.125|           92|         null|         null|         null|       404|        TV|      Relax-TV|
|     AGAAA0338|         null|         null|         null|         null|    278633|        TV|            TV|
|     AGAAA0342|          204|         null|         null|         null|    117788|        TV|      Relax-TV|
|     AGAAA0346|         null|         null|         null|         null|   2056249|        TV|            TV|
|     AGAAA0353|         null|         1665|         null|         null|     25982|        TV|      Movie-TV|
|     AGAA

In [19]:
def calc_type(df):
    df = df.na.fill(value=0)
    
    # avgs = {
    #     c: (df.select(avg(c)).collect())[0][0] for c in df.columns if ((c != 'Contract') and (c != 'Most_Watch') and (c != 'Taste'))
    # }

    df = df.withColumn('TotalDuration', col('RelaxDuration') + col('MovieDuration') + col('ChildDuration') + col('SportDuration') + col('TVDuration'))

    bounds = {
        'TotalDuration': dict(
            zip(["q1", "q3"], df.approxQuantile('TotalDuration', [0.25, 0.75], 0))
        )
    }

    df = df.withColumn("Type", when(df.TotalDuration < bounds['TotalDuration']['q1'], "Low")
                                 .when(df.TotalDuration > bounds['TotalDuration']['q3'], "High")
                                 .otherwise("Medium"))
    
    # df = df.withColumn('Type', when((df.Most_Watch == "Relax") & (df.RelaxDuration >= avgs['RelaxDuration']), "Above average")
    #                      .when((df.Most_Watch == "Relax") & (df.RelaxDuration < avgs['RelaxDuration']), "Below average")
    #                      .when((df.Most_Watch == "Movie") & (df.MovieDuration >= avgs['MovieDuration']), "Above average")
    #                      .when((df.Most_Watch == "Movie") & (df.MovieDuration < avgs['RelaxDuration']), "Below average")
    #                      .when((df.Most_Watch == "Child") & (df.ChildDuration >= avgs['ChildDuration']), "Above average")
    #                      .when((df.Most_Watch == "Child") & (df.ChildDuration < avgs['ChildDuration']), "Below average")
    #                      .when((df.Most_Watch == "Sport") & (df.SportDuration >= avgs['SportDuration']), "Above average")
    #                      .when((df.Most_Watch == "Sport") & (df.SportDuration < avgs['SportDuration']), "Below average")
    #                      .when((df.Most_Watch == "TV") & (df.TVDuration >= avgs['TVDuration']), "Above average")
    #                      .when((df.Most_Watch == "TV") & (df.TVDuration < avgs['TVDuration']), "Below average")
    #                     )


    return df.select('Contract', 'RelaxDuration', 'MovieDuration', 'ChildDuration', 'SportDuration', 'TVDuration', 'TotalDuration', 'Most_Watch', 'Taste', 'Type')

In [20]:
df2 = calc_type(df1)

In [21]:
df2.show()

+--------------+-------------+-------------+-------------+-------------+----------+-------------+----------+--------------+------+
|      Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|TotalDuration|Most_Watch|         Taste|  Type|
+--------------+-------------+-------------+-------------+-------------+----------+-------------+----------+--------------+------+
|113.182.209.48|           89|            0|            0|            0|        63|          152|     Relax|      Relax-TV|   Low|
|14.182.110.125|           92|            0|            0|            0|       404|          496|        TV|      Relax-TV|   Low|
|     AGAAA0338|            0|            0|            0|            0|    278633|       278633|        TV|            TV|Medium|
|     AGAAA0342|          204|            0|            0|            0|    117788|       117992|        TV|      Relax-TV|Medium|
|     AGAAA0346|            0|            0|            0|            0|   2056249|

In [22]:
def read_all_30_days():
    path = "D:\\Dataset\\log_content\\"
    file_name = "20220401.json"

    df = spark.read.json(path + file_name)
    df = df.select("_source.AppName", "_source.Contract", "_source.Mac", "_source.TotalDuration")
    df = df.groupBy("Contract").sum('TotalDuration')

    for i in range(2, 31):
        if (i < 10):
            file_name = "2022040{}.json".format(i)
        else:
            file_name = "202204{}.json".format(i)

        df_temp = spark.read.json(path + file_name)
        df_temp = df_temp.select("_source.AppName", "_source.Contract", "_source.Mac", "_source.TotalDuration")
        df_temp = df_temp.groupBy("Contract").sum('TotalDuration')

        df = df.union(df_temp)
    return df

In [23]:
def calc_activeness():
    df = read_all_30_days()
    df = df.groupBy("Contract").agg(count("*").alias("Activeness"))
    return df

In [24]:
activeness_data = calc_activeness()

In [25]:
df3 = df2.join(activeness_data, 'Contract', 'inner')

In [26]:
df3.show()

+--------------+-------------+-------------+-------------+-------------+----------+-------------+----------+--------------------+------+----------+
|      Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|TotalDuration|Most_Watch|               Taste|  Type|Activeness|
+--------------+-------------+-------------+-------------+-------------+----------+-------------+----------+--------------------+------+----------+
|113.182.209.48|           89|            0|            0|            0|        63|          152|     Relax|            Relax-TV|   Low|         1|
|14.182.110.125|           92|            0|            0|            0|       404|          496|        TV|            Relax-TV|   Low|         1|
|     AGAAA0338|            0|            0|            0|            0|    278633|       278633|        TV|                  TV|Medium|        30|
|     AGAAA0342|          204|            0|            0|            0|    117788|       117992|        TV|    

In [26]:
def calc_clinginess(df):
    df = df.withColumn('Clinginess', when((df.Type == 'Low') & (df.Activeness <= 20), 'Low')
                                    .when((df.Type == 'Low') & (df.Activeness > 20), 'Medium')
                                    .when((df.Type == 'Medium') & (df.Activeness <= 10), 'Low')
                                    .when((df.Type == 'Medium') & (df.Activeness > 10) & (df.Activeness <= 20), 'Medium')
                                    .when((df.Type == 'Medium') & (df.Activeness > 20), 'High')
                                    .when((df.Type == 'High') & (df.Activeness <= 10), 'Medium')
                                    .when((df.Type == 'High') & (df.Activeness > 10), 'High'))

    return df

In [27]:
df4 = calc_clinginess(df3)

In [29]:
df4.show()

+--------------+-------------+-------------+-------------+-------------+----------+-------------+----------+--------------------+------+----------+----------+
|      Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|TotalDuration|Most_Watch|               Taste|  Type|Activeness|Clinginess|
+--------------+-------------+-------------+-------------+-------------+----------+-------------+----------+--------------------+------+----------+----------+
|113.182.209.48|           89|            0|            0|            0|        63|          152|     Relax|            Relax-TV|   Low|         1|       Low|
|14.182.110.125|           92|            0|            0|            0|       404|          496|        TV|            Relax-TV|   Low|         1|       Low|
|     AGAAA0338|            0|            0|            0|            0|    278633|       278633|        TV|                  TV|Medium|        30|      High|
|     AGAAA0342|          204|            0|  

In [28]:
# Save data
df4.repartition(1).write.csv('./Final_Data', header=True)