In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
PRE_DIR = '/content/drive/MyDrive/BigData/Data'
MODEL_DATA_DIR = '/content/drive/MyDrive/BigData/Data/Model_Data'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Colab Spark") \
    .getOrCreate()

In [95]:
from pyspark.sql.functions import col, when, count, lit, count
import shutil
import glob
import os
from pyspark.sql.functions import explode, col
import seaborn as sns
import matplotlib.pyplot as plt

def save_df_as_text(df, output_path, file_name):
    temp_output_path = f"{output_path}/_temp_output"
    if os.path.exists(temp_output_path):
        shutil.rmtree(temp_output_path)
    df.coalesce(1).rdd \
        .map(lambda row: "\t".join([str(c) for c in row])) \
        .saveAsTextFile(temp_output_path)
    part_files = glob.glob(f"{temp_output_path}/part-*")
    if part_files:
        part_file = part_files[0]
        final_path = os.path.join(output_path, file_name)
        if os.path.exists(final_path):
            os.remove(final_path)
        shutil.move(part_file, final_path)
        print(f"File saved as {final_path}")
    shutil.rmtree(temp_output_path)


def save_df_as_csv(df, output_path, file_name, header=True):
    temp_output_path = f"{output_path}/_temp_output"
    if os.path.exists(temp_output_path):
        shutil.rmtree(temp_output_path)
    df.coalesce(1).rdd \
        .map(lambda row: ",".join([str(c) for c in row])) \
        .saveAsTextFile(temp_output_path)
    part_files = glob.glob(f"{temp_output_path}/part-*")
    if part_files:
        part_file = part_files[0]
        final_path = os.path.join(output_path, file_name)
        if os.path.exists(final_path):
            os.remove(final_path)
        with open(final_path, "w", encoding="utf-8") as fout:
            if header is True:
                fout.write(",".join(df.columns) + "\n")
            elif isinstance(header, list):
                fout.write(",".join(header) + "\n")
            with open(part_file, "r", encoding="utf-8") as fin:
                shutil.copyfileobj(fin, fout)
        print(f"CSV file saved as {final_path}")
    shutil.rmtree(temp_output_path)

# KG

## course-concept

In [None]:
import os
course_concept_df = spark.read.option("delimiter", "\t") \
                              .csv(os.path.join(PRE_DIR, 'concept_course_filter.txt')) \
                              .toDF("course", "concept")

course_concept_df.show(5)

+--------+-------------------------------------+
|  course|                              concept|
+--------+-------------------------------------+
|C_682400|K_嵌入式实时操作系统_计算机科学与技术|
|C_682277|                    K_审美感受_艺术学|
|C_697825|              K_机械量_仪器科学与技术|
|C_707096|                    K_复制方式_教育学|
|C_735129|      K_电场的差_动力工程及工程热物理|
+--------+-------------------------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import lit
course_concept_df = course_concept_df.withColumnRenamed('course', 'h') \
                                     .withColumnRenamed('concept', 't') \
                                     .withColumn('r', lit('course.concept'))
course_concept_df = course_concept_df.select('h', 'r', 't')

course_concept_df.show(5)
print(f"Rows: {course_concept_df.count()}, Columns: {len(course_concept_df.columns)}")

+--------+--------------+-------------------------------------+
|       h|             r|                                    t|
+--------+--------------+-------------------------------------+
|C_682400|course.concept|K_嵌入式实时操作系统_计算机科学与技术|
|C_682277|course.concept|                    K_审美感受_艺术学|
|C_697825|course.concept|              K_机械量_仪器科学与技术|
|C_707096|course.concept|                    K_复制方式_教育学|
|C_735129|course.concept|      K_电场的差_动力工程及工程热物理|
+--------+--------------+-------------------------------------+
only showing top 5 rows

Rows: 350498, Columns: 3


## course-teacher

In [None]:
course_teacher_df = spark.read.option("delimiter", "\t") \
                              .csv(os.path.join(PRE_DIR, 'course_teacher_filter.txt')) \
                              .toDF("teacher", "course")
course_teacher_df.show(5)

+-------+---------+
|teacher|   course|
+-------+---------+
| T_1988| C_681086|
| T_7696|C_1750851|
| T_7697|C_1750851|
| T_7474| C_948392|
| T_7472| C_948392|
+-------+---------+
only showing top 5 rows



In [None]:
course_teacher_df = course_teacher_df.withColumnRenamed('course', 'h') \
                                     .withColumnRenamed('teacher', 't') \
                                     .withColumn('r', lit('course.teacher'))
course_teacher_df = course_teacher_df.select('h', 'r', 't')

course_teacher_df.show(5)
print(f"Rows: {course_teacher_df.count()}, Columns: {len(course_teacher_df.columns)}")

+---------+--------------+------+
|        h|             r|     t|
+---------+--------------+------+
| C_681086|course.teacher|T_1988|
|C_1750851|course.teacher|T_7696|
|C_1750851|course.teacher|T_7697|
| C_948392|course.teacher|T_7474|
| C_948392|course.teacher|T_7472|
+---------+--------------+------+
only showing top 5 rows

Rows: 10658, Columns: 3


## Course-school

In [None]:
course_school_df = spark.read.option("delimiter", "\t") \
                              .csv(os.path.join(PRE_DIR, 'course_school_filter.txt')) \
                              .toDF("school", "course")
course_school_df.show(5)

+------+---------+
|school|   course|
+------+---------+
| S_319| C_898312|
| S_319| C_873858|
| S_319|C_2221551|
| S_319| C_873860|
| S_319|C_2199450|
+------+---------+
only showing top 5 rows



In [None]:
course_school_df = course_school_df.withColumnRenamed('course', 'h') \
                                     .withColumnRenamed('school', 't') \
                                     .withColumn('r', lit('course.school'))
course_school_df = course_school_df.select('h', 'r', 't')

course_school_df.show(5)
print(f"Rows: {course_school_df.count()}, Columns: {len(course_school_df.columns)}")

+---------+-------------+-----+
|        h|            r|    t|
+---------+-------------+-----+
| C_898312|course.school|S_319|
| C_873858|course.school|S_319|
|C_2221551|course.school|S_319|
| C_873860|course.school|S_319|
|C_2199450|course.school|S_319|
+---------+-------------+-----+
only showing top 5 rows

Rows: 2816, Columns: 3


## Course - field

In [None]:
course_field_df = spark.read.option("delimiter", "\t") \
                              .csv(os.path.join(PRE_DIR, 'course_field_filter.txt')) \
                              .toDF("course", "field")
course_field_df.show(5)

+--------+--------------------+
|  course|               field|
+--------+--------------------+
|C_735405|    kinh tế ứng dụng|
|C_682734|khoa học và kỹ th...|
|C_696800|khoa học và kỹ th...|
|C_680975|           pháp luật|
|C_680923|      y học lâm sàng|
+--------+--------------------+
only showing top 5 rows



In [None]:
course_field_df = course_field_df.withColumnRenamed('course', 'h') \
                                     .withColumnRenamed('field', 't') \
                                     .withColumn('r', lit('course.field'))
course_field_df = course_field_df.select('h', 'r', 't')

course_field_df.show(5)
print(f"Rows: {course_field_df.count()}, Columns: {len(course_field_df.columns)}")

+--------+------------+--------------------+
|       h|           r|                   t|
+--------+------------+--------------------+
|C_735405|course.field|    kinh tế ứng dụng|
|C_682734|course.field|khoa học và kỹ th...|
|C_696800|course.field|khoa học và kỹ th...|
|C_680975|course.field|           pháp luật|
|C_680923|course.field|      y học lâm sàng|
+--------+------------+--------------------+
only showing top 5 rows

Rows: 560, Columns: 3


## course-topic

In [None]:
import os
course_topic_df = spark.read.option("delimiter", "\t") \
                              .csv(os.path.join(PRE_DIR, 'course_topic.txt')) \
                              .toDF("course", "topic")

course_topic_df.show(5)

+---------+-----+
|   course|topic|
+---------+-----+
| C_949439|    9|
|C_1918552|    9|
| C_948270|    7|
| C_682483|   69|
| C_682356|   85|
+---------+-----+
only showing top 5 rows



In [None]:
course_topic_df = course_topic_df.withColumnRenamed('course', 'h') \
                                     .withColumnRenamed('topic', 't') \
                                     .withColumn('r', lit('course.topic'))
course_topic_df = course_topic_df.select('h', 'r', 't')

course_topic_df.show(5)
print(f"Rows: {course_topic_df.count()}, Columns: {len(course_topic_df.columns)}")

+---------+------------+---+
|        h|           r|  t|
+---------+------------+---+
| C_949439|course.topic|  9|
|C_1918552|course.topic|  9|
| C_948270|course.topic|  7|
| C_682483|course.topic| 69|
| C_682356|course.topic| 85|
+---------+------------+---+
only showing top 5 rows

Rows: 2828, Columns: 3


## Filter invalid courses

In [None]:
n_core_interactions = spark.read.option("delimiter", "\t") \
                              .csv(os.path.join(PRE_DIR, 'user_course_filter.txt')) \
                              .toDF('user', 'course', 'time')
n_core_interactions.show()

+----------+---------+-------------------+
|      user|   course|               time|
+----------+---------+-------------------+
|U_12882889| C_948076|2020-02-06 10:31:52|
|U_19944695| C_707035|2020-03-02 23:41:20|
|U_12906326| C_758208|2019-11-14 19:19:44|
|U_12906326| C_948421|2020-07-17 10:22:11|
|U_12863490| C_936977|2020-03-27 18:47:47|
|U_12908233| C_936990|2020-03-19 12:30:58|
|U_18608313| C_676648|2020-05-03 00:59:12|
|U_12904769| C_681076|2020-09-21 16:50:38|
|U_12876806| C_948432|2020-06-13 16:34:44|
|U_12857163| C_911165|2019-12-29 20:39:45|
|U_12898701| C_682442|2020-09-18 15:47:06|
|U_16577873| C_677136|2020-02-05 12:07:45|
| U_2661892| C_696897|2019-10-14 23:23:11|
|U_28611745|C_1434593|2020-03-12 03:15:14|
|U_12891642| C_697152|2019-12-03 08:34:21|
|U_14817225| C_696740|2019-12-06 23:19:02|
|U_30962478| C_697791|2020-08-06 12:09:58|
|U_12891326|C_2059295|2020-11-12 09:38:27|
|U_12891326| C_800360|2020-06-14 20:02:30|
|U_25005722| C_697025|2020-02-19 22:44:59|
+----------

In [None]:
print("# of users: ", n_core_interactions.select("user").distinct().count())
print("# of courses: ", n_core_interactions.select("course").distinct().count())

# of users:  99970
# of courses:  2828


In [None]:
from pyspark.sql.functions import col

valid_courses = n_core_interactions.select("course").distinct().rdd.flatMap(lambda x: x).collect()
triplets = course_field_df \
    .unionByName(course_concept_df) \
    .unionByName(course_teacher_df) \
    .unionByName(course_school_df)  \
    .unionByName(course_topic_df)
triplets = triplets.filter(col("h").isin(valid_courses)).dropDuplicates()

triplets.show(5)
print(f"Rows: {triplets.count()}, Columns: {len(triplets.columns)}")

+--------+------------+--------------------+
|       h|           r|                   t|
+--------+------------+--------------------+
|C_680804|course.field|khoa học và kỹ th...|
|C_681047|course.field|        y học cơ bản|
|C_674924|course.field|    kinh tế ứng dụng|
|C_676643|course.field|              tâm lý|
|C_682618|course.field|khoa học và công ...|
+--------+------------+--------------------+
only showing top 5 rows

Rows: 367360, Columns: 3


In [None]:
from pyspark.sql.functions import col, countDistinct

def filter_invalid_relations_and_entities(triplets_df, min_entities=5, min_rel=25):
    old_size = -1
    current_size = triplets_df.count()

    while old_size != current_size:
        old_size = current_size

        # Filter entities (on column 't')
        entity_counts = triplets_df.groupBy("t").count()
        valid_entities = entity_counts.filter(col("count") >= min_entities).select("t")
        triplets_df = triplets_df.filter(col("t").isin(valid_entities.rdd.flatMap(lambda x: x).collect()))

        # Filter relations (on column 'r')
        rel_counts = triplets_df.groupBy("r").count()
        valid_rels = rel_counts.filter(col("count") >= min_rel).select("r")
        triplets_df = triplets_df.filter(col("r").isin(valid_rels.rdd.flatMap(lambda x: x).collect()))

        current_size = triplets_df.count()
        print(f"New size: {current_size}")

    print("================ Valid relations ===============")
    triplets_df.groupBy("r").count().orderBy("count", ascending=False).show()

    print("===== # of attribute type in each relation =====")
    rel_list = [row.r for row in triplets_df.select("r").distinct().collect()]

    for rel in rel_list:
        n_uni_attr = triplets_df.filter(col("r") == rel).select("t").distinct().count()
        print(f"+ {rel}: {n_uni_attr}")

    return triplets_df

In [None]:
fil_triplets = filter_invalid_relations_and_entities(triplets)

New size: 69174
New size: 69174
+--------------+-----+
|             r|count|
+--------------+-----+
|course.concept|63303|
|  course.topic| 2807|
| course.school| 2319|
|  course.field|  472|
|course.teacher|  273|
+--------------+-----+

===== # of attribute type in each relation =====
+ course.field: 41
+ course.concept: 7113
+ course.teacher: 41
+ course.school: 145
+ course.topic: 110


In [96]:
save_df_as_csv(fil_triplets, MODEL_DATA_DIR, 'fil_triplets.csv')

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/fil_triplets.csv


# Mapping

## items


In [39]:
item_df = spark.read.option("header", True).csv(os.path.join(PRE_DIR, 'course_map.csv')).toDF("org_id", "remap_id")
item_df.show()
print(f"Rows: {item_df.count()}, Columns: {len(item_df.columns)}")

+---------+--------+
|   org_id|remap_id|
+---------+--------+
| C_679390|       0|
| C_696994|       1|
| C_697791|       2|
| C_696911|       3|
| C_875624|       4|
| C_676664|       5|
| C_707135|       6|
| C_696968|       7|
|C_1824921|       8|
| C_677109|       9|
| C_707373|      10|
| C_735157|      11|
| C_707054|      12|
| C_735395|      13|
| C_682485|      14|
| C_697422|      15|
| C_756654|      16|
| C_629559|      17|
| C_677045|      18|
|C_1756063|      19|
+---------+--------+
only showing top 20 rows

Rows: 2828, Columns: 2


In [98]:
save_df_as_csv(item_df, MODEL_DATA_DIR, 'course_map.csv', header=False)

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/course_map.csv


## entities


In [38]:
from pyspark.sql.functions import col
from pyspark.sql import Row

item_df_selected = item_df.select("org_id", "remap_id")
entity_df = fil_triplets.select("t").distinct().withColumnRenamed("t", "org_id")

start_index = item_df_selected.count()
indexed_new = entity_df.rdd.zipWithIndex().map(
    lambda row_index: Row(**row_index[0].asDict(), remap_id=row_index[1] + start_index)
)
new_entities_with_id = indexed_new.toDF()
entity_df_with_id = item_df_selected.unionByName(new_entities_with_id)

entity_df_with_id.show()
print(f"Rows: {entity_df_with_id.count()}, Columns: {len(entity_df_with_id.columns)}")

+---------+--------+
|   org_id|remap_id|
+---------+--------+
| C_679390|       0|
| C_696994|       1|
| C_697791|       2|
| C_696911|       3|
| C_875624|       4|
| C_676664|       5|
| C_707135|       6|
| C_696968|       7|
|C_1824921|       8|
| C_677109|       9|
| C_707373|      10|
| C_735157|      11|
| C_707054|      12|
| C_735395|      13|
| C_682485|      14|
| C_697422|      15|
| C_756654|      16|
| C_629559|      17|
| C_677045|      18|
|C_1756063|      19|
+---------+--------+
only showing top 20 rows

Rows: 10278, Columns: 2


In [None]:
entity_df_with_id = entity_df_with_id.withColumn("remap_id", col("remap_id").cast(IntegerType()))

In [101]:
save_df_as_csv(entity_df_with_id, MODEL_DATA_DIR, 'entity_list.csv', header=False)

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/entity_list.csv


## relations

In [42]:
from pyspark.sql import Row

rel_df = fil_triplets.select('r').distinct()
indexed_rel_df = rel_df.rdd.zipWithIndex().map(
    lambda row_index: Row(org_id=row_index[0]['r'], remap_id=row_index[1])
)
rel_df_with_id = indexed_rel_df.toDF()
rel_df_with_id.show()
print(f"Rows: {rel_df_with_id.count()}, Columns: {len(rel_df_with_id.columns)}")

+--------------+--------+
|        org_id|remap_id|
+--------------+--------+
|  course.field|       0|
|course.concept|       1|
|course.teacher|       2|
| course.school|       3|
|  course.topic|       4|
+--------------+--------+

Rows: 5, Columns: 2


In [104]:
save_df_as_csv(rel_df_with_id, MODEL_DATA_DIR, 'relation_list.csv', header=False)

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/relation_list.csv


## KG

In [44]:
fil_triplets_pd = fil_triplets.toPandas()
entity_df_with_id_pd = entity_df_with_id.toPandas()
rel_df_pd = rel_df_with_id.toPandas()

entity_mapping = dict(zip(entity_df_with_id_pd["org_id"], entity_df_with_id_pd["remap_id"]))
rel_mapping = dict(zip(rel_df_pd["org_id"], rel_df_pd["remap_id"]))

fil_triplets_pd["h"] = fil_triplets_pd["h"].map(entity_mapping).fillna(-1).astype(int)
fil_triplets_pd["t"] = fil_triplets_pd["t"].map(entity_mapping).fillna(-1).astype(int)
fil_triplets_pd["r"] = fil_triplets_pd["r"].map(rel_mapping).fillna(-1).astype(int)

In [45]:
fil_triplets_pd.to_csv(MODEL_DATA_DIR + "/kg_final.txt", sep='\t', index=False, header=False)

## user

In [106]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import Row

unique_users = n_core_interactions.select("user").distinct()

@udf(returnType=IntegerType())
def extract_int_user(user_id):
    return int(user_id[2:])

user_df = unique_users.withColumn("int_user", extract_int_user("user")) \
                      .orderBy("int_user")

user_df = user_df.rdd.zipWithIndex().map(lambda x: Row(org_id=x[0]['user'], remap_id=x[1])).toDF()
user_df.show()
print(f"Rows: {user_df.count()}, Columns: {len(user_df.columns)}")

+------+--------+
|org_id|remap_id|
+------+--------+
|  U_69|       0|
|  U_90|       1|
| U_105|       2|
| U_112|       3|
| U_172|       4|
| U_185|       5|
| U_205|       6|
| U_796|       7|
| U_817|       8|
|U_1361|       9|
|U_1368|      10|
|U_1461|      11|
|U_1485|      12|
|U_1551|      13|
|U_1748|      14|
|U_2059|      15|
|U_2325|      16|
|U_2369|      17|
|U_2452|      18|
|U_2561|      19|
+------+--------+
only showing top 20 rows

Rows: 99970, Columns: 2


In [144]:
save_df_as_csv(user_df, MODEL_DATA_DIR, 'user_list.csv', header=False)

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/user_list.csv


In [76]:
from pyspark.sql.functions import col, udf, collect_list
from pyspark.sql.types import IntegerType

user_mapping = dict(zip(
    user_df.select("org_id").rdd.flatMap(lambda x: x).collect(),
    user_df.select("remap_id").rdd.flatMap(lambda x: x).collect()
))
entity_mapping = dict(zip(
    entity_df_with_id.select("org_id").rdd.flatMap(lambda x: x).collect(),
    entity_df_with_id.select("remap_id").rdd.flatMap(lambda x: x).collect()
))

@udf(IntegerType())
def remap_user(user_id):
    return user_mapping.get(user_id, -1)  # -1 nếu không tồn tại

@udf(IntegerType())
def remap_course(course_id):
    return entity_mapping.get(course_id, -1)

mapped_df = n_core_interactions.withColumn("user", remap_user(col("user"))) \
                               .withColumn("course", remap_course(col("course")))

sorted_df = mapped_df.orderBy("time")
sorted_interactions = sorted_df.groupBy("user") \
                               .agg(collect_list("course").alias("course_order")) \
                               .orderBy("user")
sorted_interactions.show()
print(f"Rows: {sorted_interactions.count()}, Columns: {len(sorted_interactions.columns)}")

+----+--------------------+
|user|        course_order|
+----+--------------------+
|   0|     [0, 1, 2, 3, 4]|
|   1|     [5, 6, 7, 1, 8]|
|   2|[9, 10, 11, 12, 1...|
|   3|[1, 32, 33, 34, 3...|
|   4|[65, 73, 2, 77, 7...|
|   5|[91, 92, 39, 24, 93]|
|   6|[94, 95, 12, 96, ...|
|   7|[68, 100, 101, 10...|
|   8|[123, 124, 59, 12...|
|   9|[141, 142, 92, 14...|
|  10|[43, 120, 154, 10...|
|  11|[158, 108, 109, 1...|
|  12|[162, 103, 163, 1...|
|  13|[165, 166, 167, 2...|
|  14|[134, 49, 1, 169,...|
|  15|[171, 40, 163, 17...|
|  16|[183, 1, 97, 2, 5...|
|  17|[218, 219, 220, 2...|
|  18|[224, 226, 227, 2...|
|  19|[48, 169, 244, 24...|
+----+--------------------+
only showing top 20 rows

Rows: 99970, Columns: 2


In [77]:
save_df_as_text(sorted_interactions, MODEL_DATA_DIR, 'sorted_interactions.txt')

File saved as /content/drive/MyDrive/BigData/Data/Model_Data/sorted_interactions.txt


## Train val test

In [78]:
from pyspark.sql.functions import col, udf, concat_ws, concat
from pyspark.sql.types import StringType, IntegerType

@udf(StringType())
def get_train_courses_str(courses):
    return ' '.join(map(str, courses[:-2]))

@udf(IntegerType())
def get_val_course(courses):
    return courses[-2]

@udf(IntegerType())
def get_test_course(courses):
    return courses[-1]

train_df = sorted_interactions \
    .withColumn("train_courses", get_train_courses_str("course_order")) \
    .select("user", "train_courses")

val_df = sorted_interactions \
    .withColumn("val_course", get_val_course("course_order")) \
    .select("user", "val_course")

test_df = sorted_interactions \
    .withColumn("test_course", get_test_course("course_order")) \
    .select("user", "test_course")


In [135]:
from pyspark.sql.functions import col, expr

train = sorted_interactions \
    .withColumn("train_courses", expr("slice(course_order, 1, size(course_order) - 2)")) \
    .select("user", "train_courses")

val = sorted_interactions \
    .withColumn("val_course", expr("course_order[size(course_order) - 2]")) \
    .select("user", "val_course")

test = sorted_interactions \
    .withColumn("test_course", expr("course_order[size(course_order) - 1]")) \
    .select("user", "test_course")

In [136]:
train.show(5)
print(f"Rows: {train.count()}, Columns: {len(train.columns)}")

+----+--------------------+
|user|       train_courses|
+----+--------------------+
|   0|           [0, 1, 2]|
|   1|           [5, 6, 7]|
|   2|[9, 10, 11, 12, 1...|
|   3|[1, 32, 33, 34, 3...|
|   4|[65, 73, 2, 77, 7...|
+----+--------------------+
only showing top 5 rows

Rows: 99970, Columns: 2


In [128]:
train_pd = train.toPandas()
train_pd = train_pd.rename(columns={"user": "user", "train_courses": "feature"})
train_pd.to_csv(MODEL_DATA_DIR + "/train_df.csv", index=False)

In [137]:
val.show(5)
print(f"Rows: {val.count()}, Columns: {len(val.columns)}")

+----+----------+
|user|val_course|
+----+----------+
|   0|         3|
|   1|         1|
|   2|         5|
|   3|        75|
|   4|        89|
+----+----------+
only showing top 5 rows

Rows: 99970, Columns: 2


In [139]:
save_df_as_csv(val, MODEL_DATA_DIR, 'val_df.csv', header=["user", "val_label"])

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/val_df.csv


In [138]:
test.show(5)
print(f"Rows: {test.count()}, Columns: {len(test.columns)}")

+----+-----------+
|user|test_course|
+----+-----------+
|   0|          4|
|   1|          8|
|   2|         31|
|   3|         76|
|   4|         90|
+----+-----------+
only showing top 5 rows

Rows: 99970, Columns: 2


In [141]:
save_df_as_csv(test, MODEL_DATA_DIR, 'test_df.csv', header=["user", "test_label"])

CSV file saved as /content/drive/MyDrive/BigData/Data/Model_Data/test_df.csv


## Gender for user

In [85]:
from pyspark.sql.functions import split, col

user_df = spark.read.text(os.path.join(PRE_DIR, 'users.txt'))
user_df = user_df.withColumn("columns", split(user_df["value"], "\t")) \
    .select(
        col("columns")[0].alias("id"),
        col("columns")[1].alias("gender")
    )
user_df.show()

+----------+------+
|        id|gender|
+----------+------+
|U_17081735|   1.0|
|U_16707717|   1.0|
|U_11699254|   0.0|
|U_20573053|   1.0|
|U_17464819|   1.0|
|U_18655437|   0.0|
|U_18853686|   2.0|
|U_28735689|   0.0|
|U_21344253|   1.0|
|U_28864801|   1.0|
| U_1258926|   0.0|
|U_17172210|   1.0|
| U_8472878|   1.0|
|U_24193388|   0.0|
|U_12883936|   0.0|
|U_22776642|   2.0|
|U_22345366|   1.0|
|U_12859723|   0.0|
|U_15667313|   2.0|
|U_13351448|   0.0|
+----------+------+
only showing top 20 rows



In [86]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
import os
import shutil

shutil.copyfile(
    os.path.join(MODEL_DATA_DIR, 'user_list.txt'),
    os.path.join(MODEL_DATA_DIR, 'user_list_without_gender.txt')
)

'/content/drive/MyDrive/BigData/Data/Model_Data/user_list_without_gender.txt'

In [87]:
user_mapping = dict(zip(
    user_df.select("id").rdd.flatMap(lambda x: x).collect(),
    user_df.select("gender").rdd.flatMap(lambda x: x).collect()
))

user_list_df = spark.read.option("delimiter", "\t").csv(
    os.path.join(MODEL_DATA_DIR, 'user_list_without_gender.txt')
).toDF("org_id", "remap_id")

user_with_gender = user_list_df.join(
    user_df,
    user_list_df.org_id == user_df.id,
    how="left"
).drop("id")

user_with_gender.show()

+------+--------+------+
|org_id|remap_id|gender|
+------+--------+------+
|  U_69|       0|   1.0|
|  U_90|       1|   0.0|
| U_105|       2|   1.0|
| U_112|       3|   1.0|
| U_172|       4|   1.0|
| U_185|       5|   2.0|
| U_205|       6|   1.0|
| U_796|       7|   0.0|
| U_817|       8|   2.0|
|U_1361|       9|   0.0|
|U_1368|      10|   0.0|
|U_1461|      11|   0.0|
|U_1485|      12|   0.0|
|U_1551|      13|   2.0|
|U_1748|      14|   2.0|
|U_2059|      15|   0.0|
|U_2325|      16|   0.0|
|U_2369|      17|   2.0|
|U_2452|      18|   0.0|
|U_2561|      19|   2.0|
+------+--------+------+
only showing top 20 rows



In [88]:
save_df_as_text(user_with_gender, MODEL_DATA_DIR, 'user_with_gender.txt')

File saved as /content/drive/MyDrive/BigData/Data/Model_Data/user_with_gender.txt
