In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [3]:
spark = SparkSession.builder \
    .appName("User-Course") \
    .getOrCreate()

## Read

In [4]:
user_df = spark.read.json("/content/drive/MyDrive/Big Data/Input/user.json")

In [5]:
print(user_df.count())

3330294


In [None]:
user_df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
user_df.describe().show()

+-------+------------------+--------+-------+--------------------+------------------+
|summary|            gender|      id|   name|              school|     year_of_birth|
+-------+------------------+--------+-------+--------------------+------------------+
|  count|           3330240| 3330294|3330240|             3330240|             48530|
|   mean|0.9455747934082829|    NULL|    NaN|2.598243233699481E15|2039.0162991963734|
| stddev|0.8321099128426235|    NULL|    NaN|3.852631940997602...| 358.6743025264895|
|    min|                 0| U_10000|       |                    |              1111|
|    max|               232|U_999999|     Ùèø∞|         üöÄ Â∑•Á®ãÂ§ßÂ≠¶|              9989|
+-------+------------------+--------+-------+--------------------+------------------+



In [None]:
user_df.printSchema()

root
 |-- course_order: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- enroll_time: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- gender: long (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- school: string (nullable = true)
 |-- year_of_birth: long (nullable = true)



In [None]:
user_df = user_df.withColumn("course_count", size(col("course_order")))
user_df.select("id", "course_count").show(truncate=False)

+-----+------------+
|id   |course_count|
+-----+------------+
|U_22 |2           |
|U_24 |65          |
|U_25 |1           |
|U_53 |8           |
|U_54 |9           |
|U_67 |1           |
|U_68 |2           |
|U_69 |18          |
|U_90 |5           |
|U_104|2           |
|U_105|49          |
|U_108|2           |
|U_112|71          |
|U_118|3           |
|U_119|3           |
|U_120|1           |
|U_123|4           |
|U_129|3           |
|U_141|1           |
|U_144|1           |
+-----+------------+
only showing top 20 rows



In [None]:
course_df = spark.read.json("/content/drive/MyDrive/Big Data/Input/course.json")

In [None]:
course_df.describe().show()

+-------+------------------------------------+---------+-----------------------------------+--------------------------+
|summary|                               about|       id|                               name|             prerequisites|
+-------+------------------------------------+---------+-----------------------------------+--------------------------+
|  count|                                3779|     3781|                               3781|                      3779|
|   mean|                1.587301587301592E19|     NULL|                               NULL|                     111.0|
| stddev|                4.199605255658078E19|     NULL|                               NULL|                      NULL|
|    min|                                    |C_1017355|                    Food Chemistry |                          |
|    max|Ôºà1ÔºâÁâπËâ≤ÔºöËØæÁ®ãËµÑÊ∫êÂª∫ËÆæÔºåÂÖÖÂàÜ‰ΩìÁé∞...| C_956450|ÔºàÁñæÈ£éËÆ°ÂàíÔºâÈù¢ÂêëÂØπË±°Á®ãÂ∫èËÆæËÆ°ÔºàC++Ôºâ|È´òÁ∫ßËØ≠Ë®ÄÁ®ãÂ∫èËÆæËÆ°„ÄÅÊï∞ÊçÆÁªì

In [None]:
print("S·ªë l∆∞·ª£ng course: ", course_df.count())

S·ªë l∆∞·ª£ng course:  3781


In [None]:
# Th√™m C_ v√†o tr∆∞·ªõc m·ªói id trong course_order
user_df = user_df.withColumn("course_order", expr("transform(course_order, x -> concat('C_', x))"))
user_df.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Filter

### Course

In [None]:
# B∆∞·ªõc 1: Explode course_order ƒë·ªÉ c√≥ t·ª´ng course_id ri√™ng bi·ªát
exploded_df = user_df.select(
    col("id"),
    explode(col("course_order")).alias("course_id")
)

print(f"\nS·ªë l∆∞·ª£ng user-course pairs sau khi explode: {exploded_df.count()}")


S·ªë l∆∞·ª£ng user-course pairs sau khi explode: 11807090


In [None]:
# B∆∞·ªõc 2: ƒê·∫øm s·ªë l∆∞·ª£ng user cho m·ªói course
course_user_count = exploded_df.groupBy("course_id").agg(
    count("id").alias("user_count")
).orderBy(col("user_count").desc())
course_user_count.show(20)

+---------+----------+
|course_id|user_count|
+---------+----------+
| C_936971|    231674|
| C_696994|    181697|
| C_676932|    125789|
| C_697791|     96210|
| C_883345|     78374|
| C_677049|     68762|
| C_629559|     67051|
|C_2019328|     66702|
| C_696911|     61055|
| C_696700|     55873|
| C_801420|     50848|
| C_674903|     48029|
| C_696942|     46328|
| C_866756|     46060|
| C_735351|     45079|
| C_697821|     44600|
| C_697034|     44291|
| C_677010|     42690|
| C_696679|     41847|
| C_696968|     39824|
+---------+----------+
only showing top 20 rows



In [None]:
print("Th·ªëng k√™ ph√¢n ph·ªëi s·ªë user:")
course_user_count.describe("user_count").show()

Th·ªëng k√™ ph√¢n ph·ªëi s·ªë user:
+-------+-----------------+
|summary|       user_count|
+-------+-----------------+
|  count|             4701|
|   mean|2511.612422888747|
| stddev|7232.724744062156|
|    min|                1|
|    max|           231674|
+-------+-----------------+



In [None]:
# B∆∞·ªõc 3: T√¨m c√°c course c√≥ √≠t h∆°n 5 users
courses_less_than_5_users = course_user_count.filter(col("user_count") < 5)

print(f"\n=== COURSES C√ì √çT H·ª†N 5 USERS ===")
print(f"S·ªë l∆∞·ª£ng course c√≥ √≠t h∆°n 5 users: {courses_less_than_5_users.count()}")


=== COURSES C√ì √çT H·ª†N 5 USERS ===
S·ªë l∆∞·ª£ng course c√≥ √≠t h∆°n 5 users: 1106


In [None]:
courses_less_than_5_users.show(10)

+---------+----------+
|course_id|user_count|
+---------+----------+
| C_928746|         4|
| C_681272|         4|
| C_784036|         4|
| C_629508|         4|
| C_597263|         4|
|C_2266767|         4|
| C_597373|         4|
| C_735399|         4|
| C_605008|         4|
| C_597291|         4|
+---------+----------+
only showing top 10 rows



In [None]:
# B∆∞·ªõc 4: L·∫•y danh s√°ch course_id c·∫ßn lo·∫°i b·ªè
courses_to_remove = [row.course_id for row in courses_less_than_5_users.collect()]
# B∆∞·ªõc 5: T·∫°o UDF ƒë·ªÉ l·ªçc course_order (lo·∫°i b·ªè c√°c course c√≥ √≠t h∆°n 5 users). Broadcast courses_to_remove ƒë·ªÉ t·ªëi ∆∞u hi·ªáu su·∫•t
courses_to_remove_broadcast = spark.sparkContext.broadcast(set(courses_to_remove))

In [None]:
def filter_courses_with_enough_users(course_list):
    if course_list is None:
        return []
    courses_to_remove_set = courses_to_remove_broadcast.value
    return [course_id for course_id in course_list if course_id not in courses_to_remove_set]

filter_courses_udf = udf(filter_courses_with_enough_users, ArrayType(StringType()))

In [None]:
# B∆∞·ªõc 6: √Åp d·ª•ng filter l√™n user_df
filtered_user_df = user_df.withColumn(
    "course_order_filtered",
    filter_courses_udf(col("course_order"))
)

In [None]:
filtered_user_df.count()

3330294

In [None]:
filtered_user_df.show()

+--------------------+--------------------+------+-----+------+--------+-------------+------------+---------------------+
|        course_order|         enroll_time|gender|   id|  name|  school|year_of_birth|course_count|course_order_filtered|
+--------------------+--------------------+------+-----+------+--------+-------------+------------+---------------------+
|[C_682129, C_2294...|[2019-10-12 10:28...|     0| U_22|    Êàë|        |         2015|           2| [C_682129, C_2294...|
|[C_597214, C_6055...|[2019-05-20 16:06...|     1| U_24|ÁéãÂ∏ÖÂõΩ|Ê∏ÖÂçéÂ§ßÂ≠¶|         6558|          65| [C_597214, C_6055...|
|         [C_1903985]|[2020-08-07 18:59...|     0| U_25|ÁéãÂ∏ÖÂõΩ|Ê∏ÖÂçéÂ§ßÂ≠¶|         NULL|           1|          [C_1903985]|
|[C_696679, C_1704...|[2020-03-01 21:24...|     1| U_53|‰∫éÊ≠ÜÊù∞|Ê∏ÖÂçéÂ§ßÂ≠¶|         1973|           8| [C_696679, C_1704...|
|[C_682442, C_6821...|[2019-10-09 02:17...|     2| U_54|È©¨Êò±Êò•|Ê∏ÖÂçéÂ§ßÂ≠¶|         NULL|           9| [C_682442, C_6821

In [None]:
courses_at_least_5_users = course_user_count.filter(col("user_count") >= 5)

print(f"\n=== COURSES C√ì √çT NH·∫§T 5 USERS ===")
print(f"S·ªë l∆∞·ª£ng course c√≥ √≠t nh·∫•t 5 users: {courses_at_least_5_users.count()}")


=== COURSES C√ì √çT NH·∫§T 5 USERS ===
S·ªë l∆∞·ª£ng course c√≥ √≠t nh·∫•t 5 users: 3595


In [None]:
filter_valid_courses = course_df.join(
    courses_at_least_5_users,
    course_df.id == courses_at_least_5_users.course_id,
    "inner"
).select(
    course_df.id,
    courses_at_least_5_users.user_count
)

In [None]:
print(f"S·ªë l∆∞·ª£ng kh√≥a h·ªçc: {course_df.count()}")

S·ªë l∆∞·ª£ng kh√≥a h·ªçc: 3781


In [None]:
print(f"S·ªë l∆∞·ª£ng kh√≥a h·ªçc h·ª£p l·ªá c√≥ √≠t nh·∫•t 5 ng∆∞·ªùi ƒëƒÉng k√≠: {filter_valid_courses.count()}")

S·ªë l∆∞·ª£ng kh√≥a h·ªçc h·ª£p l·ªá c√≥ √≠t nh·∫•t 5 ng∆∞·ªùi ƒëƒÉng k√≠: 3148


#### Mapping course

In [None]:
# Add a new column with numbers starting from 1
window_spec = Window.orderBy("id")
df_mapped = filter_valid_courses.withColumn("mapped_id", row_number().over(window_spec) - 1)

In [None]:
# Select only the id and the new number
mapping_df = df_mapped.select(filter_valid_courses["id"].alias("original_id"), "mapped_id")

In [None]:
mapping_df.show()

+-----------+---------+
|original_id|mapped_id|
+-----------+---------+
|  C_1017355|        0|
|  C_1017419|        1|
|  C_1025064|        2|
|  C_1025076|        3|
|  C_1025079|        4|
|  C_1073350|        5|
|  C_1123814|        6|
|  C_1123848|        7|
|  C_1123944|        8|
|  C_1123979|        9|
|  C_1124039|       10|
|  C_1159827|       11|
|  C_1214863|       12|
|  C_1320505|       13|
|  C_1328548|       14|
|  C_1410076|       15|
|  C_1410096|       16|
|  C_1410106|       17|
|  C_1410117|       18|
|  C_1410126|       19|
+-----------+---------+
only showing top 20 rows



In [None]:
# Save as a single .txt file with two columns (CSV format)
mapping_df.coalesce(1).write \
    .option("header", True) \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("output/mapping_txt")

### User

In [None]:
# Thu th·∫≠p t·∫•t c·∫£ course_id h·ª£p l·ªá v√†o m·ªôt set
valid_course_ids = [row.id for row in filter_valid_courses.collect()]
valid_course_ids_broadcast = spark.sparkContext.broadcast(set(valid_course_ids))
valid_ids = valid_course_ids_broadcast.value

In [None]:
def filter_valid_courses(course_list):
    if course_list is None:
        return []
    return [course_id for course_id in course_list if course_id in valid_ids]

In [None]:
filter_udf = udf(filter_valid_courses, ArrayType(StringType()))

In [None]:
# √Åp d·ª•ng filter l√™n user_df
filtered_user_df = user_df.withColumn("course_order", filter_udf(col("course_order"))) \
                          .withColumn("course_count", size(col("course_order")))

In [None]:
filtered_user_df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
filtered_user_df.select("id", "course_count").show(truncate=False)

+-----+------------+
|id   |course_count|
+-----+------------+
|U_22 |1           |
|U_24 |40          |
|U_25 |1           |
|U_53 |6           |
|U_54 |6           |
|U_67 |1           |
|U_68 |2           |
|U_69 |7           |
|U_90 |5           |
|U_104|2           |
|U_105|26          |
|U_108|1           |
|U_112|48          |
|U_118|3           |
|U_119|3           |
|U_120|0           |
|U_123|3           |
|U_129|3           |
|U_141|1           |
|U_144|1           |
+-----+------------+
only showing top 20 rows



In [None]:
def get_invalid_courses(course_list):
    if course_list is None:
        return []
    return [course_id for course_id in course_list if course_id not in valid_ids]

In [None]:
invalid_courses_udf = udf(get_invalid_courses, ArrayType(StringType()))

In [None]:
# T·∫°o DataFrame ch·ª©a c√°c course_id kh√¥ng h·ª£p l·ªá
user_with_invalid_courses = user_df.withColumn("invalid_courses", invalid_courses_udf(col("course_order"))) \
                                   .withColumn("invalid_course_count", size(col("invalid_courses")))

In [None]:
user_with_invalid_courses.show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
user_with_invalid_courses.select("id", "invalid_course_count").show(truncate=False)

+-----+--------------------+
|id   |invalid_course_count|
+-----+--------------------+
|U_22 |1                   |
|U_24 |25                  |
|U_25 |0                   |
|U_53 |2                   |
|U_54 |3                   |
|U_67 |0                   |
|U_68 |0                   |
|U_69 |11                  |
|U_90 |0                   |
|U_104|0                   |
|U_105|23                  |
|U_108|1                   |
|U_112|23                  |
|U_118|0                   |
|U_119|0                   |
|U_120|1                   |
|U_123|1                   |
|U_129|0                   |
|U_141|0                   |
|U_144|0                   |
+-----+--------------------+
only showing top 20 rows



In [None]:
final_df = filtered_user_df.filter(size(col("course_order")) >= 10)
final_df.select("id", "course_order").show(truncate=False)

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id    |course_order                               

In [None]:
# ƒê·∫øm s·ªë l∆∞·ª£ng user (s·ªë h√†ng trong b·∫£ng)
user_count_original = user_df.count()
user_count_filtered = final_df.count()

print(f"T·ªïng s·ªë l∆∞·ª£ng user ban ƒë·∫ßu: {user_count_original}")
print(f"S·ªë l∆∞·ª£ng user c√≥ >= 10 course h·ª£p l·ªá: {user_count_filtered}")
print(f"T·ª∑ l·ªá user c√≥ >= 10 course h·ª£p l·ªá: {user_count_filtered/user_count_original*100:.2f}%")

T·ªïng s·ªë l∆∞·ª£ng user ban ƒë·∫ßu: 3330294
S·ªë l∆∞·ª£ng user c√≥ >= 10 course h·ª£p l·ªá: 182207
T·ª∑ l·ªá user c√≥ >= 10 course h·ª£p l·ªá: 5.47%


In [None]:
final_df = final_df.orderBy(rand()).limit(70000)
final_df.count()

70000

#### Mapping User

In [None]:
# Add a new column with numbers starting from 1
window_spec = Window.orderBy("id")
user_mapped = final_df.withColumn("mapped_id", row_number().over(window_spec) - 1)

In [None]:
# Select only the id and the new number
mapping_df = user_mapped.select(final_df["id"].alias("original_id"), "mapped_id")

In [None]:
mapping_df.show()

+-----------+---------+
|original_id|mapped_id|
+-----------+---------+
|   U_100090|        0|
|   U_100140|        1|
|   U_100195|        2|
|  U_1002858|        3|
| U_10030854|        4|
| U_10032601|        5|
| U_10033495|        6|
| U_10035650|        7|
| U_10036035|        8|
| U_10036250|        9|
|   U_100378|       10|
|   U_100485|       11|
| U_10056951|       12|
| U_10057608|       13|
| U_10058172|       14|
| U_10059042|       15|
| U_10059977|       16|
| U_10060130|       17|
| U_10060973|       18|
| U_10064340|       19|
+-----------+---------+
only showing top 20 rows



In [None]:
# Save as a single .txt file with two columns (CSV format)
mapping_df.coalesce(1).write \
    .option("header", True) \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("output/user_txt")

## Mapping

In [None]:
# ƒê·ªçc c√°c file CSV mapping
user_mapping_df = spark.read.option("header", "true").csv("/content/drive/MyDrive/Big Data/Output/user_mapping.csv")
course_mapping_df = spark.read.option("header", "true").csv("/content/drive/MyDrive/Big Data/Output/course_mapping.csv")

# Hi·ªÉn th·ªã mapping data ƒë·ªÉ ki·ªÉm tra
print("User Mapping:")
user_mapping_df.show(10)
print("Course Mapping:")
course_mapping_df.show(10)

User Mapping:
+-----------+---------+
|original_id|mapped_id|
+-----------+---------+
|   U_100090|        0|
|   U_100140|        1|
|   U_100195|        2|
|  U_1002858|        3|
| U_10030854|        4|
| U_10032601|        5|
| U_10033495|        6|
| U_10035650|        7|
| U_10036035|        8|
| U_10036250|        9|
+-----------+---------+
only showing top 10 rows

Course Mapping:
+-----------+---------+
|original_id|mapped_id|
+-----------+---------+
|  C_1017355|        0|
|  C_1017419|        1|
|  C_1025064|        2|
|  C_1025076|        3|
|  C_1025079|        4|
|  C_1073350|        5|
|  C_1123814|        6|
|  C_1123848|        7|
|  C_1123944|        8|
|  C_1123979|        9|
+-----------+---------+
only showing top 10 rows



In [None]:
# B∆∞·ªõc 1: Map user_id t·ª´ original sang mapped
user_mapped_df = final_df.join(
    user_mapping_df,
    final_df.id == user_mapping_df.original_id,
    "inner"
).select(
    user_mapping_df.mapped_id.alias("mapped_user_id"),
    final_df.course_order
)

In [None]:
# B∆∞·ªõc 2: Explode course_order ƒë·ªÉ c√≥ t·ª´ng course_id ri√™ng bi·ªát
exploded_df = user_mapped_df.select(
    col("mapped_user_id"),
    explode(col("course_order")).alias("original_course_id")
)

In [None]:
# B∆∞·ªõc 3: Map course_id t·ª´ original sang mapped
course_mapped_df = exploded_df.join(
    course_mapping_df,
    exploded_df.original_course_id == course_mapping_df.original_id,
    "inner"
).select(
    col("mapped_user_id"),
    course_mapping_df.mapped_id.alias("mapped_course_id")
)

In [None]:
# B∆∞·ªõc 4: Group by user_id v√† collect t·∫•t c·∫£ mapped course_ids
final_mapped_df = course_mapped_df.groupBy("mapped_user_id").agg(
    collect_list("mapped_course_id").alias("mapped_course_list")
)

In [None]:
final_mapped_df.show(5)

+--------------+--------------------+
|mapped_user_id|  mapped_course_list|
+--------------+--------------------+
|             0|[2345, 1216, 2207...|
|             1|[2816, 2845, 2282...|
|            10|[2552, 1883, 2716...|
|         10000|[2665, 1814, 2168...|
|         10001|[2690, 2595, 2615...|
+--------------+--------------------+
only showing top 5 rows



In [None]:
# B∆∞·ªõc 5: T·∫°o chu·ªói text theo format y√™u c·∫ßu: user_id course_id1 course_id2 ...
output_df = final_mapped_df.select(
        col("mapped_user_id").alias('user'),
        concat_ws(",", col("mapped_course_list")).alias('course_order')
)

In [None]:
output_df.show(10, truncate=False)

+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user |course_order                                                                                                                                                                                                                                                                                                                    |
+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |2345,1

In [None]:
output_df.coalesce(1).write \
    .option("header", True) \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv("output/user-course")