## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark Cluster, read a local CSV and store it to Hadoop as partitioned parquet files.

## 2. Connection to Spark Cluster

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

24/06/03 16:45:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 3. Load and Store Data
We will now load data from a local CSV and store it to Hadoop partitioned by column.
Afterward you can access Hadoop UI to explore the saved parquet files.
Access Hadoop UI on 'http://localhost:9870' (Utilities -> Browse the files system )

In [2]:
import pandas
from pyspark.sql.types import *
from pyspark.sql import functions as F
import os
import time    
epochNow = int(time.time())

### Đọc File từ HDFS

In [22]:
log_path = "hdfs://namenode:9000/raw_zone/fact/activity"
list_path = "hdfs://namenode:9000/raw_zone/vdt2024/data_engineering/danh_sach_sv_de.csv"

try:
    logDF = spark.read.parquet(log_path)
    
    # Hiển thị một vài dòng dữ liệu
    logDF.show(5)
except Exception as e:
    print(f"Error reading parquet files: {e}")
    
try:
    listDF = spark.read \
                .format("csv") \
                .option("header", "false") \
                .option("inferSchema", "true") \
                .load(list_path)
    # Hiển thị một vài dòng dữ liệu
    listDF.show(5)
except Exception as e:
    print(f"Error reading csv file: {e}")

+------+-------+------+---------+
|field1| field2|field3|   field4|
+------+-------+------+---------+
|    31|  write|     9|6/13/2024|
|    11|  write|     7|6/11/2024|
|    37|   read|     8|6/10/2024|
|     8|   read|     6|6/15/2024|
|    38|execute|     9|6/11/2024|
+------+-------+------+---------+
only showing top 5 rows

+---+-----------------+
|_c0|              _c1|
+---+-----------------+
|  1|       Mai Đức An|
|  2|   Nguyễn Mai Anh|
|  3|Ngô Ngọc Tuấn Anh|
|  4|   Trần Trung Anh|
|  5|    Trần Ngọc Bảo|
+---+-----------------+
only showing top 5 rows



### Đổi tên cột

In [23]:
listDF = listDF.withColumnRenamed("_c0", "student_code") \
                        .withColumnRenamed("_c1", "name")

logDF = logDF.withColumnRenamed("field1", "student_code") \
                        .withColumnRenamed("field2", "activity") \
                        .withColumnRenamed("field3", "numberOfFile") \
                        .withColumnRenamed("field4", "timestamp")

In [27]:
logDF.printSchema()

root
 |-- student_code: integer (nullable = true)
 |-- activity: string (nullable = true)
 |-- numberOfFile: integer (nullable = true)
 |-- date: string (nullable = true)



### Chuyển cột timestamp thành dạng 'yyyyMMdd' và chuyển tên cột thành date

In [26]:
logDF = logDF.withColumn("timestamp", F.date_format(F.to_date(F.col("timestamp"), "M/d/yyyy"), "yyyyMMdd")) \
                .withColumnRenamed("timestamp", "date")

### Kiểm tra sau khi rename

In [28]:
listDF.show(5)

logDF.show(5)

+------------+-----------------+
|student_code|             name|
+------------+-----------------+
|           1|       Mai Đức An|
|           2|   Nguyễn Mai Anh|
|           3|Ngô Ngọc Tuấn Anh|
|           4|   Trần Trung Anh|
|           5|    Trần Ngọc Bảo|
+------------+-----------------+
only showing top 5 rows

+------------+--------+------------+--------+
|student_code|activity|numberOfFile|    date|
+------------+--------+------------+--------+
|          31|   write|           9|20240613|
|          11|   write|           7|20240611|
|          37|    read|           8|20240610|
|           8|    read|           6|20240615|
|          38| execute|           9|20240611|
+------------+--------+------------+--------+
only showing top 5 rows



### Join hai DF đã được xử lý

In [29]:
joinedDF = logDF.join(listDF, "student_code", "inner")
joinedDF.orderBy("student_code").show()

+------------+--------+------------+--------+-----------------+
|student_code|activity|numberOfFile|    date|             name|
+------------+--------+------------+--------+-----------------+
|           1|    read|           5|20240613|       Mai Đức An|
|           1| execute|           3|20240611|       Mai Đức An|
|           1|   write|           6|20240610|       Mai Đức An|
|           1| execute|          10|20240612|       Mai Đức An|
|           1|    read|           8|20240610|       Mai Đức An|
|           1|    read|           5|20240611|       Mai Đức An|
|           1|    read|           9|20240613|       Mai Đức An|
|           1|    read|           8|20240610|       Mai Đức An|
|           1|   write|           4|20240614|       Mai Đức An|
|           1|   write|          10|20240613|       Mai Đức An|
|           1|    read|           9|20240610|       Mai Đức An|
|           1|    read|           7|20240615|       Mai Đức An|
|           1|    read|           6|2024

### Xem schema

In [30]:
joinedDF.printSchema()

root
 |-- student_code: integer (nullable = true)
 |-- activity: string (nullable = true)
 |-- numberOfFile: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- name: string (nullable = true)



### Sort

In [31]:
joinedDF.orderBy("student_code", "date", "activity").show()

+------------+--------+------------+--------+-----------------+
|student_code|activity|numberOfFile|    date|             name|
+------------+--------+------------+--------+-----------------+
|           1|    read|           8|20240610|       Mai Đức An|
|           1|    read|           8|20240610|       Mai Đức An|
|           1|    read|           9|20240610|       Mai Đức An|
|           1|   write|           6|20240610|       Mai Đức An|
|           1| execute|           3|20240611|       Mai Đức An|
|           1|    read|           5|20240611|       Mai Đức An|
|           1|    read|           6|20240611|       Mai Đức An|
|           1| execute|          10|20240612|       Mai Đức An|
|           1|    read|           9|20240613|       Mai Đức An|
|           1|    read|           5|20240613|       Mai Đức An|
|           1|   write|          10|20240613|       Mai Đức An|
|           1|   write|           4|20240614|       Mai Đức An|
|           1|    read|           7|2024

### Tạo bảng để sử dụng Spark SQL

In [32]:
joinedDF.createOrReplaceTempView("student_activity")

### Giải bài toán bằng Spark SQL

In [37]:
result_sql = spark.sql("SELECT \
                        date \
                        , student_code \
                        , name AS student_name \
                        , activity \
                        , SUM(numberOfFile) AS totalFile \
                  FROM student_activity \
                  GROUP BY date, activity, student_code, name \
                  ORDER BY student_code, date, activity ASC")
result_sql.show()

+--------+------------+-----------------+--------+---------+
|    date|student_code|     student_name|activity|totalFile|
+--------+------------+-----------------+--------+---------+
|20240610|           1|       Mai Đức An|    read|       25|
|20240610|           1|       Mai Đức An|   write|        6|
|20240611|           1|       Mai Đức An| execute|        3|
|20240611|           1|       Mai Đức An|    read|       11|
|20240612|           1|       Mai Đức An| execute|       10|
|20240613|           1|       Mai Đức An|    read|       14|
|20240613|           1|       Mai Đức An|   write|       10|
|20240614|           1|       Mai Đức An|   write|        4|
|20240615|           1|       Mai Đức An|    read|        7|
|20240611|           2|   Nguyễn Mai Anh|   write|        1|
|20240612|           2|   Nguyễn Mai Anh| execute|        1|
|20240612|           2|   Nguyễn Mai Anh|   write|       19|
|20240613|           2|   Nguyễn Mai Anh|    read|        3|
|20240615|           2| 

### Giải bài toán bằng Spark Dataframe

In [42]:
result_df = joinedDF.groupBy("date", "student_code", "name", "activity") \
              .agg(F.sum("numberOfFile").alias("totalFile")) \
              .orderBy(F.col("student_code"), F.col("date").asc(), F.col("activity"))

result_df.count()

298

In [47]:
result_df.repartition(1).write.csv("hdfs://namenode:9000/gold_zone/asignments/result/38_Vu_Huu_Sy", header=True, mode="overwrite")