<a href="https://colab.research.google.com/github/suben-mk/Apache-Spark-Data-Cleansing-R2DE2.0/blob/main/Apache_Spark_Data_Cleansing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# R2DE2.0 Workshop : Data Cleansing with Apache Spark

## 1) ติดตั้ง Spark และ PySpark

In [None]:
!apt-get update                                                                          # อัพเดท Package ทั้งหมดใน VM ตัวนี้
!apt-get install openjdk-8-jdk-headless -qq > /dev/null                                  # ติดตั้ง Java Development Kit (จำเป็นสำหรับการติดตั้ง Spark)
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz # ติดตั้ง Spark 3.1.2
!tar xzvf spark-3.1.2-bin-hadoop2.7.tgz                                                  # Unzip ไฟล์ Spark 3.1.2
!pip install -q findspark==1.3.0                                                         # ติดตั้ง Package Python สำหรับเชื่อมต่อกับ Spark

In [4]:
# Set enviroment variable ให้ Python รู้จัก Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# ติดตั้ง PySpark ลงใน Python
!pip install pyspark==3.1.2

## 2) Data Cleansing with Spark

In [143]:
# Import module
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import when

# สร้าง Spark Session เพื่อใช้งาน Spark
spark = SparkSession.builder.master("local[*]").getOrCreate()

# อ่านข้อมูลจากไฟล์ CSV
df_transection = spark.read.csv('/content/ws2_data.csv', header = True, inferSchema = True, )

### 2.1 Spark DataFrames

#### Data Type
* type คอลัมน์ timestamp จาก string เป็น datetime

In [144]:
# เช็ค Data type
df_transection.dtypes

[('timestamp', 'string'),
 ('user_id', 'string'),
 ('book_id', 'int'),
 ('country', 'string'),
 ('price', 'double')]

In [145]:
# แปลง type คอลัมน์ timestamp จาก string เป็น datetime
df_transection_clean = df_transection.withColumn("timestamp", f.to_timestamp(df_transection.timestamp, 'yyyy-MM-dd HH:mm:ss'))

#### Syntactical Anomalies
* คอลัมน์ Country เปลี่ยนชื่อประเทศจาก Japane เป็น Japan

In [146]:
# เช็คคอลัมน์ Country == 'Japane'
df_transection_clean.where(df_transection_clean['Country'] == "Japane").show(5)

+-------------------+--------+-------+-------+-----+
|          timestamp| user_id|book_id|country|price|
+-------------------+--------+-------+-------+-----+
|2021-05-01 13:11:11|a904217a|   1129| Japane|29.65|
|2021-05-02 10:27:42|a6052e97|   1466| Japane| 35.0|
|2021-05-04 10:44:41|a0f44c26|   1497| Japane|17.49|
|2021-05-04 05:40:46|ad0f9907|   2269| Japane|35.71|
|2021-05-05 01:08:25|aa169f5d|   1378| Japane|59.93|
+-------------------+--------+-------+-------+-----+
only showing top 5 rows



In [147]:
# เปลี่ยนชื่อประเทศ japane เป็น japan และสร้างคอลัมน์ใหม่ชื่อ "CountryUpdate"
df_clean_country = df_transection_clean.withColumn("CountryUpdate", when(df_transection_clean['Country'] == 'Japane', 'Japan')
                                                  .otherwise(df_transection_clean['Country']))

# เอาคอลัมน์ CountryUpdate ไปแทนที่คอลัมน์ Country
df_transection_clean = df_clean_country.drop("Country").withColumnRenamed('CountryUpdate', 'Country')

#### Semantic Anomalies
* ข้อกำหนดคอลัมน์ user_id ค่าจะต้องเป็นตัวเลขหรือตัวหนังสือ 8 ตัวอักษร

In [148]:
# ใช้ Regular Expression สำหรับเช็ครูปแบบของคอลัมน์ user_id
df_correct_userid = df_transection_clean.filter(df_transection_clean["user_id"].rlike("^[a-z0-9]{8}$"))
df_incorrect_userid = df_transection_clean.subtract(df_correct_userid)
df_incorrect_userid.show()

+-------------------+----------+-------+-----+--------------------+
|          timestamp|   user_id|book_id|price|             Country|
+-------------------+----------+-------+-----+--------------------+
|2021-05-01 14:41:20|      null|    516|27.37|              Brazil|
|2021-06-30 02:27:00|      null|   1390|27.99|               Japan|
|2021-06-19 16:00:22|ca86d17200|   2269|35.71|United States of ...|
|2021-07-15 18:53:09|ca86d17200|    322|30.79|United States of ...|
+-------------------+----------+-------+-----+--------------------+



In [149]:
# เปลี่ยน ca86d17200 เป็น ca86d172 และสร้างคอลัมน์ใหม่ชื่อ "user_id_update"
df_clean_userid = df_transection_clean.withColumn("user_id_update", when(df_transection_clean['user_id'] == 'ca86d17200', 'ca86d172')
                                                  .otherwise(df_transection_clean['user_id']))

# เอาคอลัมน์ user_id_update ไปแทนที่ user_id
df_transection_clean = df_clean_userid.drop("user_id").withColumnRenamed('user_id_update', 'user_id')

#### Missing values
* การเช็คและแก้ไข Missing Values (หากจำเป็น)

In [150]:
# เช็ค Missing Value
df_transection_clean.summary("count").show()

+-------+-------+------+-------+-------+
|summary|book_id| price|Country|user_id|
+-------+-------+------+-------+-------+
|  count| 500000|500000| 500000| 499998|
+-------+-------+------+-------+-------+



In [151]:
# เช็ค Missing Value จากคอลัมน์ user_id เป็นค่าว่างเปล่า
df_transection_clean.where(df_transection_clean.user_id.isNull()).show()

+-------------------+-------+-----+-------+-------+
|          timestamp|book_id|price|Country|user_id|
+-------------------+-------+-----+-------+-------+
|2021-05-01 14:41:20|    516|27.37| Brazil|   null|
|2021-06-30 02:27:00|   1390|27.99|  Japan|   null|
+-------------------+-------+-----+-------+-------+



In [152]:
# แทน user_id ที่เป็น NULL ด้วย 00000000
df_clean_userid = df_transection_clean.withColumn("user_id_update", when(df_transection_clean['user_id'].isNull(), '00000000')
                                      .otherwise(df_transection_clean['user_id']))

# เอาคอลัมน์ user_id_update ไปแทนที่ user_id
df_transection_clean = df_clean_userid.drop("user_id").withColumnRenamed('user_id_update', 'user_id')

### 2.2 Spark SQL

In [153]:
# แปลงข้อมูลจาก Spark DataFrame ให้เป็น TempView
df_transection.createOrReplaceTempView("transection")
sql_transection = spark.sql("SELECT * FROM transection")
sql_transection.show(5)

+-------------------+--------+-------+------------+-----+
|          timestamp| user_id|book_id|     country|price|
+-------------------+--------+-------+------------+-----+
|2021-05-01 08:03:44|824dc295|      2|      Israel|24.95|
|2021-05-02 02:48:34|f80fe106|      6|    Thailand|31.49|
|2021-05-01 16:54:57|01cb9fe0|      9|       Italy|29.65|
|2021-05-01 21:45:28|59508f47|     13|South Africa| 22.4|
|2021-05-01 14:50:52|3553ebbb|     14|      Canada| 28.0|
+-------------------+--------+-------+------------+-----+
only showing top 5 rows



In [154]:
sql_transection_clean = spark.sql("""

SELECT
/*แปลง type คอลัมน์ timestamp จาก string เป็น datetime*/
to_timestamp(timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp,
CASE
  /*เปลี่ยน 'ca86d17200' เป็น 'ca86d172'*/
  WHEN user_id = 'ca86d17200' THEN 'ca86d172'
  /*เปลี่ยน NULL เป็น '00000000'*/
  WHEN user_id IS NULL THEN '00000000'
  ELSE user_id
END AS user_id,
book_id,
/*เปลี่ยนชื่อประเทศจาก Japane เป็น Japan*/
CASE
  WHEN country = 'Japane' THEN 'Japan'
  ELSE country
END AS country,
price
FROM transection

""")

### 2.3 Save data to CSV

In [156]:
# save เป็น 1 ไฟล์ (ใช้ single worker)
df_transection_clean.coalesce(1).write.csv('/content/output_transection_clean.csv', header = True)