## Initialize spark session

In [1]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

## Create data schema

In [2]:
from pyspark.sql.types import StructType

action_schema = StructType().add("student_code", "integer")\
    .add("activity", "string")\
    .add("numberOfFile", "integer")\
    .add("timestamp", "string")

sv_de_schema = StructType().add("student_code", "integer")\
    .add("fullname", "string")

## Save danh_sach_sv_de to hdfs with path: /raw_zone/danh_sach_sv_de.csv

In [3]:
df = spark.read.format("csv").option("header", "false").schema(sv_de_schema)\
    .load("./data/danh_sach_sv_de.csv")
df.show()

+------------+--------------------+
|student_code|            fullname|
+------------+--------------------+
|           1|          Mai Đức An|
|           2|      Nguyễn Mai Anh|
|           3|   Ngô Ngọc Tuấn Anh|
|           4|      Trần Trung Anh|
|           5|       Trần Ngọc Bảo|
|           6|  Nguyễn Vũ Hòa Bình|
|           7|    Nguyễn Thành Đạt|
|           8|        Đỗ Thành Đạt|
|           9|    Nguyễn Khoa Đoàn|
|          10|    Nguyễn Quốc Dũng|
|          11|     Đường Minh Quân|
|          12|   Dương Quang Giang|
|          13|    Nguyễn Minh Hiếu|
|          14|        Ngô Phi Hùng|
|          15|Nguyễn Đình Thiên...|
|          16|        Đỗ Doãn Khắc|
|          17|      Châu Minh Khải|
|          18|      Phạm Đình Khôi|
|          19|        Lê Bảo Khánh|
|          20|        Lê Minh Phúc|
+------------+--------------------+
only showing top 20 rows



In [4]:
df.write.format("csv").option("header", "false").mode("overwrite")\
    .save("hdfs://namenode:9000/raw_zone/danh_sach_sv_de")

## Read data in hdfs danh_sach_sv_de và /raw_zone/fact/activity 

In [5]:
sv_de_df = spark.read.format("csv").option("header", "false").schema(sv_de_schema)\
    .load("hdfs://namenode:9000/raw_zone/danh_sach_sv_de")
sv_de_df.show(40)

+------------+--------------------+
|student_code|            fullname|
+------------+--------------------+
|           1|          Mai Đức An|
|           2|      Nguyễn Mai Anh|
|           3|   Ngô Ngọc Tuấn Anh|
|           4|      Trần Trung Anh|
|           5|       Trần Ngọc Bảo|
|           6|  Nguyễn Vũ Hòa Bình|
|           7|    Nguyễn Thành Đạt|
|           8|        Đỗ Thành Đạt|
|           9|    Nguyễn Khoa Đoàn|
|          10|    Nguyễn Quốc Dũng|
|          11|     Đường Minh Quân|
|          12|   Dương Quang Giang|
|          13|    Nguyễn Minh Hiếu|
|          14|        Ngô Phi Hùng|
|          15|Nguyễn Đình Thiên...|
|          16|        Đỗ Doãn Khắc|
|          17|      Châu Minh Khải|
|          18|      Phạm Đình Khôi|
|          19|        Lê Bảo Khánh|
|          20|        Lê Minh Phúc|
|          21| Trần Phúc Mạnh Linh|
|          22|       Huỳnh Tấn Lộc|
|          23|Nguyễn Địch Nhật ...|
|          24|     Nguyễn Hoài Nam|
|          25|    Đào Thanh 

In [8]:
action_df = spark.read.parquet("hdfs://namenode:9000/raw_zone/fact/activity",
                               schema=action_schema)
action_df.show()
print(action_df.count())

+------------+--------+------------+---------+
|student_code|activity|numberOfFile|timestamp|
+------------+--------+------------+---------+
|           4|   write|           7|6/10/2024|
|          33|    read|           5|6/12/2024|
|          33| execute|           1|6/13/2024|
|           6|   write|           6|6/15/2024|
|          24| execute|           8|6/12/2024|
|          22|   write|           2|6/12/2024|
|          31|   write|           9|6/13/2024|
|           8|   write|           4|6/13/2024|
|          21|    read|           5|6/12/2024|
|          26| execute|           2|6/10/2024|
|          24|    read|          10|6/12/2024|
|          10|    read|           6|6/15/2024|
|          20|   write|           7|6/14/2024|
|          14| execute|           7|6/11/2024|
|           5| execute|           1|6/10/2024|
|           3|    read|           9|6/13/2024|
|          11|   write|           7|6/11/2024|
|           7|    read|           1|6/15/2024|
|          22

## Process data from /raw_zone/danh_sach_sv_de.csv and save to data/Tran_Phuc_Manh_Linh.csv

Đưa ra tổng số file được tương tác hàng ngày theo mỗi loại activity
mà sinh viên đó thực hiện. Lưu ra 1 file ouput.

In [15]:
from pyspark.sql.functions import date_format, to_date

output_df = action_df\
    .withColumn("timestamp", date_format(to_date(action_df["timestamp"], "M/d/yyyy"), "yyyyMMdd"))\
    .groupBy(["student_code", "activity", "timestamp"])\
    .sum("numberOfFile")\
    .join(sv_de_df, "student_code", "left")\
    .orderBy("timestamp", "student_code", "activity")

In [20]:
with open("./data/Tran_Phuc_Manh_Linh.csv", "w", encoding='utf-8') as f:
    f.write("date,student_code,student_name,activity,totalFile\n")
    for row in output_df.collect():
        f.write(f"{row.timestamp},{row.student_code},{row.fullname},{row.activity},{row['sum(numberOfFile)']}\n")

Lưu file bằng DataFrame API

In [None]:
output_df.withColumnRenamed("sum(numberOfFile)", "totalFile")\
    .withColumnRenamed("timestamp", "date")\
    .withColumnRenamed("fullname", "student_name")\
    .select("date", "student_code", "student_name", "activity", "totalFile")

output_df.write.format("csv").option("header", "true").mode("overwrite")\
    .save("./data/Tran_Phuc_Manh_Linh")