In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Read_Youtube_Comments_Data") \
    .master('local') \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode1:9000") \
    .getOrCreate()

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("query",StringType(),True),
    StructField("author",StringType(),True),
    StructField("text",StringType(),True)
])

In [7]:
df = spark.read.parquet("hdfs://namenode1:9000/youtube_DE_project/datalake/youtube_video/raw", header=True)
df.show()

+--------------------+-----------+--------------------+--------------------+-------------------+----------+----------+-------------+----+-----+---+
|          channel_id|   video_id|               title|         description|       published_at|view_count|like_count|comment_count|year|month|day|
+--------------------+-----------+--------------------+--------------------+-------------------+----------+----------+-------------+----+-----+---+
|UCtxD0x6AuNNqdXO9...|2auLO7ea9tQ|   ...And Action! 🎬|I'm excited to an...|2025-04-11 07:00:03|   1081131|     94028|         9047|2025|    5|  5|
|UCtxD0x6AuNNqdXO9...|mLNz-_X1aGA|24 hours with Gio...|                    |2025-04-07 11:13:34|   1120672|     59694|         2916|2025|    5|  5|
|UCtxD0x6AuNNqdXO9...|pdjt7bCn558|Join me in a Fash...|                    |2025-04-01 09:23:31|   1271065|     58337|         1918|2025|    5|  5|
|UCtxD0x6AuNNqdXO9...|6C-iqXvAE2M|The MOST EXCLUSIV...|                    |2025-03-31 14:03:23|    822991|     5

In [8]:
df.filter(df['day']==6).show()

+--------------------+-----------+--------------------+--------------------+-------------------+----------+----------+-------------+----+-----+---+
|          channel_id|   video_id|               title|         description|       published_at|view_count|like_count|comment_count|year|month|day|
+--------------------+-----------+--------------------+--------------------+-------------------+----------+----------+-------------+----+-----+---+
|UCtxD0x6AuNNqdXO9...|2auLO7ea9tQ|   ...And Action! 🎬|I'm excited to an...|2025-04-11 07:00:03|   1099422|     95028|         9143|2025|    5|  6|
|UCtxD0x6AuNNqdXO9...|mLNz-_X1aGA|24 hours with Gio...|                    |2025-04-07 11:13:34|   1136059|     60381|         2950|2025|    5|  6|
|UCtxD0x6AuNNqdXO9...|pdjt7bCn558|Join me in a Fash...|                    |2025-04-01 09:23:31|   1282121|     58726|         1922|2025|    5|  6|
|UCtxD0x6AuNNqdXO9...|6C-iqXvAE2M|The MOST EXCLUSIV...|                    |2025-03-31 14:03:23|    830958|     5

In [9]:
from pyspark.sql import functions as F

# Giả sử df là DataFrame của bạn
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+----------+--------+-----+-----------+------------+----------+----------+-------------+----+-----+---+
|channel_id|video_id|title|description|published_at|view_count|like_count|comment_count|year|month|day|
+----------+--------+-----+-----------+------------+----------+----------+-------------+----+-----+---+
|         0|       0|    0|          0|           0|         0|         0|            0|   0|    0|  0|
+----------+--------+-----+-----------+------------+----------+----------+-------------+----+-----+---+



In [11]:
df.count()

204

In [12]:
df.filter(df['target'] == 0).count()

799999

In [13]:
df.filter(df['target'] == 2).count()

0

In [14]:
df.filter(df['target'] == 4).count()

800000

In [15]:
df.count()

1599999

In [24]:
from pyspark.sql.functions import when
df1 = df.withColumn('target_new', when(df['target']==4,1).otherwise(0))
df1.show()

+------+----------+--------------------+--------+---------------+--------------------+----------+
|target|        id|                date|   query|         author|                text|target_new|
+------+----------+--------------------+--------+---------------+--------------------+----------+
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|         0|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|         0|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|         0|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|         0|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|         0|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |         0|
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|         0|
|     0|1467811795|M

In [25]:
df1.filter(df1['target_new'] == 1).show()

+------+----------+--------------------+--------+---------------+--------------------+----------+
|target|        id|                date|   query|         author|                text|target_new|
+------+----------+--------------------+--------+---------------+--------------------+----------+
|     4|1467822272|Mon Apr 06 22:22:...|NO_QUERY|          ersle|I LOVE @Health4Ua...|         1|
|     4|1467822273|Mon Apr 06 22:22:...|NO_QUERY|       becca210|im meeting up wit...|         1|
|     4|1467822283|Mon Apr 06 22:22:...|NO_QUERY|      Wingman29|@DaRealSunisaKim ...|         1|
|     4|1467822287|Mon Apr 06 22:22:...|NO_QUERY|      katarinka|Being sick can be...|         1|
|     4|1467822293|Mon Apr 06 22:22:...|NO_QUERY|    _EmilyYoung|@LovesBrooklyn2 h...|         1|
|     4|1467822391|Mon Apr 06 22:22:...|NO_QUERY|  ajarofalmonds|@ProductOfFear Yo...|         1|
|     4|1467822447|Mon Apr 06 22:22:...|NO_QUERY|      vmdavinci|@r_keith_hill Tha...|         1|
|     4|1467822465|M

In [28]:
df.select("target").distinct().show()


+------+
|target|
+------+
|     4|
|     0|
+------+

