In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.config("spark.driver.memory","2g").getOrCreate()

In [4]:
ds = spark.read.json('E:\\log_content\\20220401.json')

In [5]:
ds.show()

+--------------------+-------+------+--------------------+-----+
|                 _id| _index|_score|             _source|_type|
+--------------------+-------+------+--------------------+-----+
|AX_momhia1FFivsGrn9o|history|     0|{KPLUS, HNH579912...|kplus|
|AX_momhca1FFivsGrnvg|history|     0|{KPLUS, HUFD40665...|kplus|
|AX_momhaa1FFivsGrnny|history|     0|{KPLUS, HNH572635...|kplus|
|AX_momhca1FFivsGrnvv|history|     0|{KPLUS, HND141717...|kplus|
|AX_momhia1FFivsGrn98|history|     0|{KPLUS, HNH743103...|kplus|
|AX_momg9a1FFivsGrnkS|history|     0|{KPLUS, HNH893773...|kplus|
|AX_momhca1FFivsGrnwA|history|     0|{KPLUS, HND083642...|kplus|
|AX_momhfa1FFivsGrn2u|history|     0|{KPLUS, DNFD74404...|kplus|
|AX_momhca1FFivsGrnwP|history|     0|{KPLUS, DTFD21200...|kplus|
|AX_momhca1FFivsGrnwU|history|     0|{KPLUS, LDFD05747...|kplus|
|AX_momhfa1FFivsGrn24|history|     0|{KPLUS, HNH063566...|kplus|
|AX_momhia1FFivsGrn-W|history|     0|{KPLUS, HNH866786...|kplus|
|AX_momhia1FFivsGrn-a|his

In [6]:
ds.printSchema()

root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- AppName: string (nullable = true)
 |    |-- Contract: string (nullable = true)
 |    |-- Mac: string (nullable = true)
 |    |-- TotalDuration: long (nullable = true)
 |-- _type: string (nullable = true)



In [7]:
ds = ds.select('_source.Contract','_source.AppName','_source.TotalDuration')
ds.show()

+---------+-------+-------------+
| Contract|AppName|TotalDuration|
+---------+-------+-------------+
|HNH579912|  KPLUS|          254|
|HUFD40665|  KPLUS|         1457|
|HNH572635|  KPLUS|         2318|
|HND141717|  KPLUS|         1452|
|HNH743103|  KPLUS|          251|
|HNH893773|  KPLUS|          924|
|HND083642|  KPLUS|         1444|
|DNFD74404|  KPLUS|          691|
|DTFD21200|  KPLUS|         1436|
|LDFD05747|  KPLUS|         1434|
|HNH063566|  KPLUS|          687|
|HNH866786|  KPLUS|          248|
|NBAAA1128|  KPLUS|          247|
|HNH960439|  KPLUS|          683|
|HNJ035736|  KPLUS|          246|
|NTFD93673|  KPLUS|         2288|
|HNJ063267|  KPLUS|         2282|
|HNH790383|  KPLUS|          906|
|THFD12466|  KPLUS|          242|
|HNH566080|  KPLUS|          242|
+---------+-------+-------------+
only showing top 20 rows



In [8]:
ds.select('AppName').distinct().show()

+-------+
|AppName|
+-------+
|  KPLUS|
|  RELAX|
|  CHILD|
|CHANNEL|
|    VOD|
|   FIMS|
|  SPORT|
+-------+



In [9]:
ds = ds.withColumn("Type",
       when((col("AppName") == 'CHANNEL') |  (col("AppName") =='KPLUS'), "TV")
      .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS') , "Movie")
      .when((col("AppName") == 'CHILD'), "Child")
      .when((col("AppName") == 'RELAX'), "Relax")
      .when((col("AppName") == 'SPORT'), "Sport")
      .otherwise("Error"))

In [10]:
ds.show()

+---------+-------+-------------+----+
| Contract|AppName|TotalDuration|Type|
+---------+-------+-------------+----+
|HNH579912|  KPLUS|          254|  TV|
|HUFD40665|  KPLUS|         1457|  TV|
|HNH572635|  KPLUS|         2318|  TV|
|HND141717|  KPLUS|         1452|  TV|
|HNH743103|  KPLUS|          251|  TV|
|HNH893773|  KPLUS|          924|  TV|
|HND083642|  KPLUS|         1444|  TV|
|DNFD74404|  KPLUS|          691|  TV|
|DTFD21200|  KPLUS|         1436|  TV|
|LDFD05747|  KPLUS|         1434|  TV|
|HNH063566|  KPLUS|          687|  TV|
|HNH866786|  KPLUS|          248|  TV|
|NBAAA1128|  KPLUS|          247|  TV|
|HNH960439|  KPLUS|          683|  TV|
|HNJ035736|  KPLUS|          246|  TV|
|NTFD93673|  KPLUS|         2288|  TV|
|HNJ063267|  KPLUS|         2282|  TV|
|HNH790383|  KPLUS|          906|  TV|
|THFD12466|  KPLUS|          242|  TV|
|HNH566080|  KPLUS|          242|  TV|
+---------+-------+-------------+----+
only showing top 20 rows



In [11]:
ds = ds.drop(ds.AppName)
ds.show()

+---------+-------------+----+
| Contract|TotalDuration|Type|
+---------+-------------+----+
|HNH579912|          254|  TV|
|HUFD40665|         1457|  TV|
|HNH572635|         2318|  TV|
|HND141717|         1452|  TV|
|HNH743103|          251|  TV|
|HNH893773|          924|  TV|
|HND083642|         1444|  TV|
|DNFD74404|          691|  TV|
|DTFD21200|         1436|  TV|
|LDFD05747|         1434|  TV|
|HNH063566|          687|  TV|
|HNH866786|          248|  TV|
|NBAAA1128|          247|  TV|
|HNH960439|          683|  TV|
|HNJ035736|          246|  TV|
|NTFD93673|         2288|  TV|
|HNJ063267|         2282|  TV|
|HNH790383|          906|  TV|
|THFD12466|          242|  TV|
|HNH566080|          242|  TV|
+---------+-------------+----+
only showing top 20 rows



In [12]:
ds = ds.groupBy('Contract').pivot('Type').sum('TotalDuration')
ds.show()

+---------+-----+-----+-----+-----+-----+
| Contract|Child|Movie|Relax|Sport|   TV|
+---------+-----+-----+-----+-----+-----+
|DLFD15794| NULL| NULL| NULL| NULL|   71|
|DAH029562| NULL| NULL| NULL| NULL| 1226|
|HNH655987| NULL| NULL| NULL| NULL| 5089|
|SGH975310| NULL| NULL| NULL| NULL|72628|
|VTFD32869| 2003| NULL| NULL| NULL|  338|
|NTFD24068| NULL| 3836| NULL| NULL|  281|
|HND372201| NULL| NULL| NULL| NULL|78260|
|HNH905038| NULL| NULL| NULL| NULL|   49|
|QNAAA2461| NULL| NULL| NULL|    0|63659|
|HBFD00992| NULL| NULL| NULL| NULL|15133|
|HNH526095| NULL| NULL| NULL| NULL|15908|
|HNH912466| NULL| NULL| NULL| NULL|86412|
|HNH913550| NULL| NULL| NULL| NULL|73211|
|DNFD62761| NULL| 4613| NULL| NULL| 1589|
|HNH927176| NULL| NULL| NULL| NULL|97179|
|HNFD43638| NULL| NULL| NULL| NULL|18947|
|HTFD11598| NULL| 2884| NULL| NULL|  707|
|HNH767925| NULL|  655| NULL| NULL|  101|
|DAFD26297| NULL|17274| NULL| NULL|77786|
|HNH884948| NULL|  594| NULL| NULL|40236|
+---------+-----+-----+-----+-----

In [13]:
ds = ds.fillna(0)
ds.show()

+---------+-----+-----+-----+-----+-----+
| Contract|Child|Movie|Relax|Sport|   TV|
+---------+-----+-----+-----+-----+-----+
|DLFD15794|    0|    0|    0|    0|   71|
|DAH029562|    0|    0|    0|    0| 1226|
|HNH655987|    0|    0|    0|    0| 5089|
|SGH975310|    0|    0|    0|    0|72628|
|VTFD32869| 2003|    0|    0|    0|  338|
|NTFD24068|    0| 3836|    0|    0|  281|
|HND372201|    0|    0|    0|    0|78260|
|HNH905038|    0|    0|    0|    0|   49|
|QNAAA2461|    0|    0|    0|    0|63659|
|HBFD00992|    0|    0|    0|    0|15133|
|HNH526095|    0|    0|    0|    0|15908|
|HNH912466|    0|    0|    0|    0|86412|
|HNH913550|    0|    0|    0|    0|73211|
|DNFD62761|    0| 4613|    0|    0| 1589|
|HNH927176|    0|    0|    0|    0|97179|
|HNFD43638|    0|    0|    0|    0|18947|
|HTFD11598|    0| 2884|    0|    0|  707|
|HNH767925|    0|  655|    0|    0|  101|
|DAFD26297|    0|17274|    0|    0|77786|
|HNH884948|    0|  594|    0|    0|40236|
+---------+-----+-----+-----+-----

In [14]:
ds = ds.withColumnRenamed('Child','ChildDuration')
ds = ds.withColumnRenamed('Movie','MovieDuration')
ds = ds.withColumnRenamed('Relax','RelaxDuration')
ds = ds.withColumnRenamed('Sport','SportDuration')
ds = ds.withColumnRenamed('TV','TVDuration')
ds.show()

+---------+-------------+-------------+-------------+-------------+----------+
| Contract|ChildDuration|MovieDuration|RelaxDuration|SportDuration|TVDuration|
+---------+-------------+-------------+-------------+-------------+----------+
|DLFD15794|            0|            0|            0|            0|        71|
|DAH029562|            0|            0|            0|            0|      1226|
|HNH655987|            0|            0|            0|            0|      5089|
|SGH975310|            0|            0|            0|            0|     72628|
|VTFD32869|         2003|            0|            0|            0|       338|
|NTFD24068|            0|         3836|            0|            0|       281|
|HND372201|            0|            0|            0|            0|     78260|
|HNH905038|            0|            0|            0|            0|        49|
|QNAAA2461|            0|            0|            0|            0|     63659|
|HBFD00992|            0|            0|            0

In [15]:
ds.printSchema()

root
 |-- Contract: string (nullable = true)
 |-- ChildDuration: long (nullable = true)
 |-- MovieDuration: long (nullable = true)
 |-- RelaxDuration: long (nullable = true)
 |-- SportDuration: long (nullable = true)
 |-- TVDuration: long (nullable = true)



In [16]:
ds.write.mode('overwrite').csv("E:\\output_logcontent\\20220401.csv")