In [24]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *

In [7]:
def get_spark_session():    
    spark = SparkSession.builder\
                        .config('spark.driver.memory', '4g')\
                        .config('spark.driver.cores', '2')\
                        .config('spark.executor.memory', '4g')\
                        .config('spark.executor.cores', '2')\
                        .getOrCreate()
    return spark

spark = get_spark_session()

In [8]:
path = 'D:\DE-class\data\log_content'
start_date = '2022-04-01'
end_date = '2022-04-04'

In [9]:
df = spark.read.json(path + '\\20220401.json')

In [10]:
df.select('_source.*').show()

+-------+---------+------------+-------------+
|AppName| Contract|         Mac|TotalDuration|
+-------+---------+------------+-------------+
|  KPLUS|HNH579912|0C96E62FC55C|          254|
|  KPLUS|HUFD40665|CCEDDC333614|         1457|
|  KPLUS|HNH572635|B068E6A1C5F6|         2318|
|  KPLUS|HND141717|08674EE8D2C2|         1452|
|  KPLUS|HNH743103|402343C25D7D|          251|
|  KPLUS|HNH893773|B84DEE76D3B8|          924|
|  KPLUS|HND083642|B84DEE849A0F|         1444|
|  KPLUS|DNFD74404|90324BB44C39|          691|
|  KPLUS|DTFD21200|B84DEED27709|         1436|
|  KPLUS|LDFD05747|0C96E6C95E53|         1434|
|  KPLUS|HNH063566|B84DEEDD1C85|          687|
|  KPLUS|HNH866786|10394E2790A5|          248|
|  KPLUS|NBAAA1128|10394E47C1AF|          247|
|  KPLUS|HNH960439|B84DEED34371|          683|
|  KPLUS|HNJ035736|CCD4A1FA86A5|          246|
|  KPLUS|NTFD93673|B84DEEEF4763|         2288|
|  KPLUS|HNJ063267|10394E172CA7|         2282|
|  KPLUS|HNH790383|4CEBBD53378B|          906|
|  KPLUS|THFD

In [11]:
source_df = df.select('_source.*')

In [12]:
source_df.select('AppName').distinct().show()

+-------+
|AppName|
+-------+
|  KPLUS|
|  RELAX|
|  CHILD|
|   FIMS|
|CHANNEL|
|  SPORT|
|    VOD|
+-------+



In [13]:
source_df.count()

1654519

In [14]:
source_df = source_df.withColumn("Type",
           when((col("AppName") == 'CHANNEL') | (col("AppName") =='KPLUS') | (col("AppName") =='KPlus'), "Truyền Hình")
          .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS'), "Phim Truyện")
          .when((col("AppName") == 'RELAX'), "Giải Trí")
          .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
          .when((col("AppName") == 'SPORT'), "Thể Thao")
          .otherwise("Error"))


In [15]:
source_df = source_df.select('Contract','Type','TotalDuration')

In [16]:
source_df = source_df.filter(col('Contract') != '0')
source_df = source_df.filter(source_df.Type != 'Error')

In [17]:
source_df = source_df.groupBy('Contract','Type').sum('TotalDuration').withColumnRenamed('sum(TotalDuration)','TotalDuration')

In [18]:
source_df.show()

+---------+-----------+-------------+
| Contract|       Type|TotalDuration|
+---------+-----------+-------------+
|DNH014998|Phim Truyện|         3365|
|HND486882|Phim Truyện|         5545|
|HUFD07189|Truyền Hình|         2264|
|HDFD36288|Truyền Hình|        11904|
|CTFD04401|Truyền Hình|        55881|
|HNH954607|Phim Truyện|        13115|
|HNH855959|Truyền Hình|          327|
|SGH034683|Truyền Hình|        82195|
|NTFD35330|Truyền Hình|        19139|
|NTFD48198|Phim Truyện|        55202|
|HNH443856|Truyền Hình|         7687|
|NAFD05338|Truyền Hình|        81934|
|LCFD20510|Phim Truyện|        10852|
|QNFD29007|Truyền Hình|        82705|
|SGH569599|Truyền Hình|        18769|
|SGH701752|Truyền Hình|        31028|
|HNH712164|Phim Truyện|         6106|
|PYFD01920|Truyền Hình|        82664|
|HTFD13716|Truyền Hình|        18759|
|SGJ039473|Truyền Hình|        18695|
+---------+-----------+-------------+
only showing top 20 rows



In [19]:
pivotDF = source_df.groupBy("Contract").pivot("Type").sum("TotalDuration")\
  .withColumnRenamed("Truyền Hình", 'TVDuration')\
  .withColumnRenamed("Thể Thao", 'SportDuration')\
  .withColumnRenamed("Thiếu Nhi", 'ChildDuration')\
  .withColumnRenamed("Giải Trí", 'RelaxDuration')\
  .withColumnRenamed("Phim Truyện", 'MovieDuration')

In [20]:
pivotDF.show()

+---------+-------------+-------------+-------------+-------------+----------+
| Contract|RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|
+---------+-------------+-------------+-------------+-------------+----------+
|HTFD11598|         null|         2884|         null|         null|       707|
|HPFD48556|           69|         null|         null|         null|     92976|
|NBFD10014|         null|         null|         null|         null|     84628|
|HNH619088|         null|         8456|          234|         null|     65210|
|HNH036174|         null|         null|         null|         null|      6049|
|DNH067877|         null|         null|         null|         null|      5760|
|SGH806190|         null|         null|         null|         null|      1131|
|HNH582022|         null|         null|         null|         null|     86400|
|HNH795510|         null|         5840|         null|         null|     68589|
|DNFD91557|         null|         null|         null

In [21]:
result = pivotDF.fillna(0)

In [22]:
result = result.withColumn('Date',lit('2022-04-01'))

In [23]:
result.show(10, truncate=False)

+---------+-------------+-------------+-------------+-------------+----------+----------+
|Contract |RelaxDuration|MovieDuration|ChildDuration|SportDuration|TVDuration|Date      |
+---------+-------------+-------------+-------------+-------------+----------+----------+
|HTFD11598|0            |2884         |0            |0            |707       |2022-04-01|
|HPFD48556|69           |0            |0            |0            |92976     |2022-04-01|
|NBFD10014|0            |0            |0            |0            |84628     |2022-04-01|
|HNH619088|0            |8456         |234          |0            |65210     |2022-04-01|
|HNH036174|0            |0            |0            |0            |6049      |2022-04-01|
|DNH067877|0            |0            |0            |0            |5760      |2022-04-01|
|SGH806190|0            |0            |0            |0            |1131      |2022-04-01|
|HNH582022|0            |0            |0            |0            |86400     |2022-04-01|
|HNH795510