In [1]:
# Import các thư viện cần thiết
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
from pyspark.sql.functions import col, count, isnan, when
from pyspark.sql.functions import expr
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

In [2]:
# Khởi tạo Spark Session
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

23/12/03 01:43:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
# Đọc file
df = spark.read.csv("Live_raw.csv", header=True, inferSchema=True)

                                                                                

In [4]:
df.show()

+--------------------+-----------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+-------+-------+-------+-------+
|           status_id|status_type|   status_published|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|Column1|Column2|Column3|Column4|
+--------------------+-----------+-------------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+-------+-------+-------+-------+
|246675545449582_1...|      video|2018-04-22 06:00:00|          529|         512|       262|      432|       92|       3|        1|       1|         0|   NULL|   NULL|   NULL|   NULL|
|246675545449582_1...|      photo|2018-04-21 22:45:00|          150|           0|         0|      150|        0|       0|        0|       0|         0|   NULL|   NULL|   NULL|   NULL|
|246675545449582_1...|      video|2018-04-21 06:17:00|          227|         236

In [5]:
# Xem thống kê dữ liệu
df.describe().show()

23/12/03 01:45:23 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 5:>                                                          (0 + 1) / 1]

+-------+--------------------+-----------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+-------+-------+-------+-------+
|summary|           status_id|status_type|     num_reactions|     num_comments|        num_shares|        num_likes|         num_loves|          num_wows|         num_hahas|           num_sads|         num_angrys|Column1|Column2|Column3|Column4|
+-------+--------------------+-----------+------------------+-----------------+------------------+-----------------+------------------+------------------+------------------+-------------------+-------------------+-------+-------+-------+-------+
|  count|                7050|       7050|              7050|             7050|              7050|             7050|              7050|              7050|              7050|               7050|               7050|      0|      0|      0|      0|
|   mean|       

                                                                                

In [6]:
# Hiển thị thông tin cấu trúc dữ liệu df
df.printSchema()

root
 |-- status_id: string (nullable = true)
 |-- status_type: string (nullable = true)
 |-- status_published: timestamp (nullable = true)
 |-- num_reactions: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- num_shares: integer (nullable = true)
 |-- num_likes: integer (nullable = true)
 |-- num_loves: integer (nullable = true)
 |-- num_wows: integer (nullable = true)
 |-- num_hahas: integer (nullable = true)
 |-- num_sads: integer (nullable = true)
 |-- num_angrys: integer (nullable = true)
 |-- Column1: string (nullable = true)
 |-- Column2: string (nullable = true)
 |-- Column3: string (nullable = true)
 |-- Column4: string (nullable = true)



In [7]:
# Hiển thị số giá trị NaN cho mỗi cột
df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

+---------+-----------+----------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+-------+-------+-------+-------+
|status_id|status_type|status_published|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|Column1|Column2|Column3|Column4|
+---------+-----------+----------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+-------+-------+-------+-------+
|        0|          0|               0|            0|           0|         0|        0|        0|       0|        0|       0|         0|   7050|   7050|   7050|   7050|
+---------+-----------+----------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+-------+-------+-------+-------+



In [8]:
# Xóa các cột thừa, trống dữ liệu
df = df[df.columns[:-4]]
df.printSchema()

root
 |-- status_id: string (nullable = true)
 |-- status_type: string (nullable = true)
 |-- status_published: timestamp (nullable = true)
 |-- num_reactions: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- num_shares: integer (nullable = true)
 |-- num_likes: integer (nullable = true)
 |-- num_loves: integer (nullable = true)
 |-- num_wows: integer (nullable = true)
 |-- num_hahas: integer (nullable = true)
 |-- num_sads: integer (nullable = true)
 |-- num_angrys: integer (nullable = true)



In [9]:
# Đếm các giá trị duy nhất ở mỗi thuộc tính
df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns)).show()

[Stage 9:>                                                          (0 + 1) / 1]

+---------+-----------+----------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|status_id|status_type|status_published|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|
+---------+-----------+----------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|     6997|          4|            6913|         1067|         993|       501|     1044|      229|      65|       42|      24|        14|
+---------+-----------+----------------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+



                                                                                

In [10]:
# Xóa các cột không dùng để phân tích
df = df.drop("status_id", "status_published")

Kiểm tra sự nhất quán dữ liệu

In [11]:
# Tạo cột mới 'total_react' bằng cách tính tổng 6 loại tương tác
df = df.withColumn('total_react', col('num_likes') + col('num_loves') + col('num_wows') + col('num_hahas') + col('num_sads') + col('num_angrys'))

In [12]:
df.select('total_react').show()

+-----------+
|total_react|
+-----------+
|        529|
|        150|
|        227|
|        111|
|        213|
|        217|
|        503|
|        295|
|        203|
|        170|
|        210|
|        222|
|        313|
|        209|
|        346|
|        332|
|        135|
|        150|
|        221|
|        152|
+-----------+
only showing top 20 rows



In [13]:
# Tạo cột mới để kiểm tra num_reactions bằng total_react
df = df.withColumn('check', expr("num_reactions = total_react"))

In [14]:
# Đếm số lượng giá trị True và False trong cột 'check'
df.groupBy('check').count().show()

+-----+-----+
|check|count|
+-----+-----+
| true| 7041|
|false|    9|
+-----+-----+



In [15]:
# Cập nhật giá trị của 'num_reactions' bằng với 'total_react'
df = df.withColumn('num_reactions', col('total_react'))

In [16]:
# Xóa 2 cột total_react và check
df = df.drop('total_react', 'check')

In [17]:
df.printSchema()

root
 |-- status_type: string (nullable = true)
 |-- num_reactions: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- num_shares: integer (nullable = true)
 |-- num_likes: integer (nullable = true)
 |-- num_loves: integer (nullable = true)
 |-- num_wows: integer (nullable = true)
 |-- num_hahas: integer (nullable = true)
 |-- num_sads: integer (nullable = true)
 |-- num_angrys: integer (nullable = true)



In [16]:
# Hiển thị các giá trị duy nhất của cột 'status_type'
df.select('status_type').distinct().show()

+-----------+
|status_type|
+-----------+
|       link|
|     status|
|      video|
|      photo|
+-----------+



In [22]:
# Tạo và huấn luyện StringIndexer
indexer = StringIndexer(inputCol="status_type", outputCol="status_type_indexed")
model = indexer.fit(df)
df = model.transform(df)


# Cập nhật cột 'status_type' với giá trị của 'status_type_indexed'
df = df.withColumn("status_type", col("status_type_indexed").cast('int'))

# Xóa cột 'status_type_indexed'
df = df.drop("status_type_indexed")

# Hiển thị DataFrame sau khi được mã hóa
df.show()

# Lấy danh sách các nhãn
labels = model.labels

# Tạo DataFrame từ danh sách nhãn và chỉ số
labels_df = spark.createDataFrame([(labels[i], i) for i in range(len(labels))], ["label", "index"])

# Hiển thị ánh xạ giữa nhãn gốc và chỉ số
labels_df.show()

+-----------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|status_type|num_reactions|num_comments|num_shares|num_likes|num_loves|num_wows|num_hahas|num_sads|num_angrys|
+-----------+-------------+------------+----------+---------+---------+--------+---------+--------+----------+
|          1|          529|         512|       262|      432|       92|       3|        1|       1|         0|
|          0|          150|           0|         0|      150|        0|       0|        0|       0|         0|
|          1|          227|         236|        57|      204|       21|       1|        1|       0|         0|
|          0|          111|           0|         0|      111|        0|       0|        0|       0|         0|
|          0|          213|           0|         0|      204|        9|       0|        0|       0|         0|
|          0|          217|           6|         0|      211|        5|       1|        0|       0|         0|
|

[Stage 28:>                                                         (0 + 1) / 1]

+------+-----+
| label|index|
+------+-----+
| photo|    0|
| video|    1|
|status|    2|
|  link|    3|
+------+-----+



                                                                                

In [23]:
df.printSchema()

root
 |-- status_type: integer (nullable = true)
 |-- num_reactions: integer (nullable = true)
 |-- num_comments: integer (nullable = true)
 |-- num_shares: integer (nullable = true)
 |-- num_likes: integer (nullable = true)
 |-- num_loves: integer (nullable = true)
 |-- num_wows: integer (nullable = true)
 |-- num_hahas: integer (nullable = true)
 |-- num_sads: integer (nullable = true)
 |-- num_angrys: integer (nullable = true)



In [24]:
# Danh sách các cột cần chuẩn hóa
columns_to_scale = ["num_reactions", "num_comments", "num_shares", "num_likes", 
                    "num_loves", "num_wows", "num_hahas", "num_sads", "num_angrys"]

# Thực hiện Min-Max Scaling cho mỗi cột
for col_name in columns_to_scale:
    # Tính min và max cho cột
    min_col = df.agg({col_name: "min"}).collect()[0][0]
    max_col = df.agg({col_name: "max"}).collect()[0][0]

    # Áp dụng Min-Max Scaling
    df = df.withColumn(col_name, (col(col_name) - min_col) / (max_col - min_col))

# Hiển thị DataFrame sau khi được chuẩn hóa
df.show()

+-----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+-------------------+
|status_type|       num_reactions|        num_comments|          num_shares|           num_likes|         num_loves|            num_wows|           num_hahas|          num_sads|         num_angrys|
+-----------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+-------------------+
|          1| 0.11231422505307856|0.024392567889471178| 0.07651869158878505| 0.09171974522292993|0.1400304414003044| 0.01079136690647482|0.006369426751592357|0.0196078431372549|                0.0|
|          0| 0.03184713375796178|                 0.0|                 0.0| 0.03184713375796178|               0.0|                 0.0|                 0.0|               0.0|                0.0|
|         

In [25]:
# Xuất ra file mới
df.coalesce(1).write.csv("Live.csv", header=True)

                                                                                