In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.context import SparkContext
import pandas as pd
from pyspark.sql.window import Window


In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").getOrCreate()

In [3]:
path = "D:\\WORKSPACE\\DE\\study_de\\Big_Data\\Items Shared on 4-29-2023\\Dataset\\log_search\\20220601"

In [4]:
data = spark.read.parquet(path)

In [5]:
data.printSchema()

root
 |-- eventID: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- keyword: string (nullable = true)
 |-- category: string (nullable = true)
 |-- proxy_isp: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- networkType: string (nullable = true)
 |-- action: string (nullable = true)
 |-- userPlansMap: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [6]:
data.show()

+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+
|             eventID|            datetime| user_id|             keyword|category|proxy_isp|            platform|networkType|action|        userPlansMap|
+--------------------+--------------------+--------+--------------------+--------+---------+--------------------+-----------+------+--------------------+
|61804316-6b89-4cf...|2022-06-01 18:59:...|    NULL|            trữ tình|   enter|     vnpt|   fplay-ottbox-2019|   ethernet|search|                NULL|
|22c35287-9fe1-487...|2022-06-01 18:59:...|44887906|            trữ tình|   enter|     vnpt|   fplay-ottbox-2019|   ethernet|search|                  []|
|f9af5a95-1f72-486...|2022-06-01 18:59:...| 2719170|              bolero|   enter|  viettel|   fplay-ottbox-2019|   ethernet|search|[Kênh Gia Đình:pr...|
|fd53edee-132d-4ac...|2022-06-01 15:00:...|    NULL|amy schumer: trực...|   

In [7]:
def processing_data(data):
    data = data.select("user_id", "keyword")
    data = data.groupBy("user_id", "keyword").count()
    data = data.withColumnRenamed("count", "search_count").orderBy("search_count", ascending=False)
    window = Window.partitionBy("user_id").orderBy(col("search_count").desc())
    data = data.withColumn("rank", row_number().over(window))
    data = data.filter(col("rank") == 1)
    data = data.withColumnRenamed("keyword", "Most_Search")
    data = data.select(col('user_id'), col('Most_Search'))
    data = data.fillna({'Most_Search': 'trống','user_id': 'Chưa xác định'})
    
    return data

In [8]:
data_key = spark.read.csv("D:\\WORKSPACE\\DE\\study_de\\Practice\\Class7\\ETL_MOST_SEARCH\\key_search.csv", header=True)
data_key.show()

+--------------------+---------+
|         Most_Search| Category|
+--------------------+---------+
|               trống|  UNKNOWN|
|           du?ng h?m| Romantic|
|    bong da viet nam|undefined|
|           HUNTER XH|   Horror|
|   phàm nhân tu tiên|  Science|
|  bật thầy hoá trang|   Comedy|
|        gã không mặt|   Action|
|                 cây| Category|
|Nhanh Và Nguy Hiểm 9|   Action|
|          bé học chữ|   Comedy|
|                   K|  K-DRAMA|
|aikatsu - nhi?t h...|    Child|
| c�ng t? chuy?n sinh|  K-DRAMA|
|C� N�ng �?u B?p C...|   Comedy|
|      phim lu?i tr?i| Romantic|
|               anime|undefined|
|     tien nu ha pham|    Anime|
|chuy?n sinh th�nh...|   Action|
| t�nh ngu?i duy�n ma|RealityTV|
|             d� banh|   Horror|
+--------------------+---------+
only showing top 20 rows



In [9]:
path = "D:\\WORKSPACE\\DE\\study_de\\Big_Data\\Items Shared on 4-29-2023\\Dataset\\log_search\\20220601"
data_f_t6 = spark.read.parquet(path)
data_p_t6 = processing_data(data_f_t6)
data_p_t6 = data_p_t6.join(data_key, data_p_t6.Most_Search == data_key.Most_Search, how='left').drop(data_key.Most_Search)
data_p_t6 = data_p_t6.fillna({'Category': 'UNKNOWN'})
data_p_t6 = data_p_t6.withColumnRenamed("user_id", "User_ID").withColumnRenamed("Most_Search", "Most_Search_T6").withColumnRenamed("Category", "Category_T6")
data_p_t6.show()
data_p_t6.count()

+-------------+--------------------+-----------+
|      User_ID|      Most_Search_T6|Category_T6|
+-------------+--------------------+-----------+
|Chưa xác định|               trống|    UNKNOWN|
|      0003021|               trống|    UNKNOWN|
|      0005063|                vtv5|      Sport|
|      0005117|           YOUTUBIII|   TH-DRAMA|
|      0005178|   bored to death s1|  RealityTV|
|      0006773|                  30|      Child|
|      0007729|              KUROKO|     Comedy|
|      0007996|   phàm nhân tu tiên|    Science|
|      0010414| bật thầy hoá trang |    UNKNOWN|
|      0014661|        gã không mặt|     Action|
|      0016455|                 cây|   Category|
|      0017222|            BAO NGAM|     Action|
|      0018591|Nhanh Và Nguy Hiểm 9|     Action|
|      0022333|               trống|    UNKNOWN|
|      0023968|          bé học chữ|     Comedy|
|      0027338|          fairy tail|      Music|
|      0027338|          fairy tail|    Science|
|      0028131|     

82350

In [587]:
path = "D:\\WORKSPACE\\DE\\study_de\\Big_Data\\Items Shared on 4-29-2023\\Dataset\\log_search\\20220701"
data_f_t7 = spark.read.parquet(path)
data_p_t7 = processing_data(data_f_t7)
data_p_t7 = data_p_t7.join(data_key, data_p_t7.Most_Search == data_key.Most_Search, how='left').drop(data_key.Most_Search)
data_p_t7 = data_p_t7.fillna({'Category': 'UNKNOWN'})
data_p_t7 = data_p_t7.withColumnRenamed("user_id", "User_ID").withColumnRenamed("Most_Search", "Most_Search_T7").withColumnRenamed("Category", "Category_T7")
data_p_t7.show()

+-------------+--------------------+-----------+
|      User_ID|      Most_Search_T7|Category_T7|
+-------------+--------------------+-----------+
|Chưa xác định|               trống|    UNKNOWN|
|      0008153| thương ngày nắng về|    UNKNOWN|
|      0008700|pháp y tần minh: ...|    UNKNOWN|
|      0008802|      người ấy là ai|    UNKNOWN|
|      0009576|          dữ quân ca|    UNKNOWN|
|      0009683|  nàng vệ sĩ của tôi|    UNKNOWN|
|      0010270|kênh truyền hình ...|    UNKNOWN|
|      0011634|liên khúc con đườ...|    UNKNOWN|
|      0012110|        vo tac thien|    C-DRAMA|
|      0012401|boku no hero acad...|    V-DRAMA|
|      0013823|boruto: naruto th...|     Comedy|
|       001804|thế giới này khôn...|    UNKNOWN|
|      0018346|           penthouse|    V-DRAMA|
|      0018346|           penthouse|     Comedy|
|      0023415| TAM QUOC  DIENNGHIA|    UNKNOWN|
|      0024233|                   M|   Category|
|      0025492|            hoàn hồn|    UNKNOWN|
|      0026258|     

In [588]:
data_result = data_p_t6.join(data_p_t7, on='User_ID', how='inner')
data_result = data_result.cache()
data_result.show()

+--------+--------------------+-----------+--------------------+-----------+
| User_ID|      Most_Search_T6|Category_T6|      Most_Search_T7|Category_T7|
+--------+--------------------+-----------+--------------------+-----------+
|41752947|               trống|    UNKNOWN|               trống|    UNKNOWN|
| 7316968|  chung kết việt nam|    UNKNOWN|sợi dây chuyền đị...|    UNKNOWN|
|90141654|   tình chị duyên em|    UNKNOWN|        khúc côn cầu|    UNKNOWN|
|90919545|          gọi tôi là|    UNKNOWN|  hoa gian tân nương|    UNKNOWN|
|92481088|           penthouse|     Comedy|           penthouse|    V-DRAMA|
|92481088|           penthouse|    V-DRAMA|           penthouse|    V-DRAMA|
|92481088|           penthouse|     Comedy|           penthouse|     Comedy|
|92481088|           penthouse|    V-DRAMA|           penthouse|     Comedy|
|93302448|       thực tập sinh|    UNKNOWN|               trống|    UNKNOWN|
|93714458|     sắc đẹp ẩn giấu|    UNKNOWN|         cô nàng béo|    UNKNOWN|

In [589]:
data_result = data_result.withColumn("Trending_Type", 
    when(
        col("Category_T6") == col("Category_T7"), "Unchanged"
    ).otherwise("Changed")
)
data_result.show()

+--------+--------------------+-----------+--------------------+-----------+-------------+
| User_ID|      Most_Search_T6|Category_T6|      Most_Search_T7|Category_T7|Trending_Type|
+--------+--------------------+-----------+--------------------+-----------+-------------+
|41752947|               trống|    UNKNOWN|               trống|    UNKNOWN|    Unchanged|
| 7316968|  chung kết việt nam|    UNKNOWN|sợi dây chuyền đị...|    UNKNOWN|    Unchanged|
|90141654|   tình chị duyên em|    UNKNOWN|        khúc côn cầu|    UNKNOWN|    Unchanged|
|90919545|          gọi tôi là|    UNKNOWN|  hoa gian tân nương|    UNKNOWN|    Unchanged|
|92481088|           penthouse|     Comedy|           penthouse|    V-DRAMA|      Changed|
|92481088|           penthouse|    V-DRAMA|           penthouse|    V-DRAMA|    Unchanged|
|92481088|           penthouse|     Comedy|           penthouse|     Comedy|    Unchanged|
|92481088|           penthouse|    V-DRAMA|           penthouse|     Comedy|      Changed|

In [592]:
data_result = data_result.withColumn("Previous", 
    when(
        col("Category_T6") == col("Category_T7"), "Unchanged"
    ).otherwise(concat_ws(" - ",col("Category_T6"), col("Category_T7")))
)
data_result.show()

+--------+--------------------+-----------+--------------------+-----------+-------------+----------------+
| User_ID|      Most_Search_T6|Category_T6|      Most_Search_T7|Category_T7|Trending_Type|        Previous|
+--------+--------------------+-----------+--------------------+-----------+-------------+----------------+
|41752947|               trống|    UNKNOWN|               trống|    UNKNOWN|    Unchanged|       Unchanged|
| 7316968|  chung kết việt nam|    UNKNOWN|sợi dây chuyền đị...|    UNKNOWN|    Unchanged|       Unchanged|
|90141654|   tình chị duyên em|    UNKNOWN|        khúc côn cầu|    UNKNOWN|    Unchanged|       Unchanged|
|90919545|          gọi tôi là|    UNKNOWN|  hoa gian tân nương|    UNKNOWN|    Unchanged|       Unchanged|
|92481088|           penthouse|     Comedy|           penthouse|    V-DRAMA|      Changed|Comedy - V-DRAMA|
|92481088|           penthouse|    V-DRAMA|           penthouse|    V-DRAMA|    Unchanged|       Unchanged|
|92481088|           penthou