<a href="https://colab.research.google.com/github/mangkalapiratjr/spark/blob/main/Cleansing_and_Transform_Codvid_19_data_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Cleansing and Transform Covid19 cases data using PySpark**

**Steps**
1. Import Covid19 dataset (csv file from data.go.th)
2. Drop uninterested columns
3. Handle with missing values
4. Create a new column to group age by range
5. Find the wrong or inconsistent Province name in the dataset and correct them
6. Save output as parquet file

# Install libraries to use

In [21]:
pip install pyspark



# Import libraries and create SparkSession

In [22]:
from pyspark.sql import  SparkSession, Row, Column
from pyspark.sql.functions import regexp_replace, col, count, when, isnan

spark = SparkSession.builder.getOrCreate()

# Exploration, Cleansing and Transformation of Covid19 cases data

In [23]:
# Import Covid19 cases data
df_covid = spark.read.csv('datasets/covid19-case.csv', header=True, inferSchema=True)

In [24]:
# Explore data
df_covid.printSchema()

df_covid.show(5)

df_covid.summary().show()

root
 |-- No.: string (nullable = true)
 |-- announce_date: string (nullable = true)
 |-- Notified date: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- Unit: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- province_of_isolation: string (nullable = true)
 |-- risk: string (nullable = true)
 |-- province_of_onset: string (nullable = true)
 |-- district_of_onset: string (nullable = true)

+---+-------------+-------------+----+----+----+-----------+---------------------+--------------------+-----------------+-----------------+
|No.|announce_date|Notified date| sex| age|Unit|nationality|province_of_isolation|                risk|province_of_onset|district_of_onset|
+---+-------------+-------------+----+----+----+-----------+---------------------+--------------------+-----------------+-----------------+
|  1|   12/01/2020|         null|หญิง|61.0|  ปี|      China|        กรุงเทพมหานคร|คนต่างชาติเดินทาง...|    กรุงเท

In [25]:
# Drop unused columns
df_covid = df_covid.drop('No.', 'Notified date', 'district_of_isolation', 'province_of_isolation', 'district_of_onset')
print(df_covid.show(5))

# Check missing values of all columns
df_covid.select( [ count( when(col(c).isNull() | isnan(col(c)), c) ).alias(c) for c in df_covid.columns]).show()

+-------------+----+----+----+-----------+--------------------+-----------------+
|announce_date| sex| age|Unit|nationality|                risk|province_of_onset|
+-------------+----+----+----+-----------+--------------------+-----------------+
|   12/01/2020|หญิง|61.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|
|   17/01/2020|หญิง|74.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|
|   22/01/2020|หญิง|73.0|  ปี|   Thailand|คนต่างชาติเดินทาง...|           นครปฐม|
|   22/01/2020| ชาย|68.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|
|   24/01/2020|หญิง|66.0|  ปี|      China|คนต่างชาติเดินทาง...|          นนทบุรี|
+-------------+----+----+----+-----------+--------------------+-----------------+
only showing top 5 rows

None
+-------------+-----+-----+-----+-----------+----+-----------------+
|announce_date|  sex|  age| Unit|nationality|risk|province_of_onset|
+-------------+-----+-----+-----+-----------+----+-----------------+
|           10|22175|3438

In [26]:
# Filter only row where age and province_of_onset is not null
df_covid_filtered = df_covid.where( col('age').isNotNull() & col('Unit').isNotNull() & col('province_of_onset').isNotNull()  )

# Verified that column age and province_of_onset have no missing values
df_covid_filtered.select( [ count( when(col(c).isNull() | isnan(col(c)), c) ).alias(c) for c in df_covid.columns]).show()

+-------------+----+---+----+-----------+----+-----------------+
|announce_date| sex|age|Unit|nationality|risk|province_of_onset|
+-------------+----+---+----+-----------+----+-----------------+
|            0|8891|  0|   0|      17845| 924|                0|
+-------------+----+---+----+-----------+----+-----------------+



In [27]:
# Create a new column named age_range for grouping age by range
df_covid_group_age = df_covid_filtered.withColumn('age_range', when( (df_covid_filtered.Unit=='วัน')  | (df_covid_filtered.Unit=='เดือน')  | ( (df_covid_filtered.Unit=='ปี') & (df_covid_filtered.age <=5) )  , '0-5 ปี') 
                                                  .when( (df_covid_filtered.Unit=='ปี') & (df_covid_filtered.age >= 6) & (df_covid_filtered.age <=17) , '6-17 ปี') 
                                                  .when( (df_covid_filtered.Unit=='ปี') & (df_covid_filtered.age >= 18) & (df_covid_filtered.age <= 30), '18-30 ปี') 
                                                  .when( (df_covid_filtered.Unit=='ปี') & (df_covid_filtered.age >= 31) & (df_covid_filtered.age <= 60), '31-60 ปี') 
                                                  .otherwise('มากกว่า 60 ปี') )
df_covid_group_age.show(10)

+-------------+----+----+----+-----------+--------------------+-----------------+-------------+
|announce_date| sex| age|Unit|nationality|                risk|province_of_onset|    age_range|
+-------------+----+----+----+-----------+--------------------+-----------------+-------------+
|   12/01/2020|หญิง|61.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|
|   17/01/2020|หญิง|74.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|
|   22/01/2020|หญิง|73.0|  ปี|   Thailand|คนต่างชาติเดินทาง...|           นครปฐม|มากกว่า 60 ปี|
|   22/01/2020| ชาย|68.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|
|   24/01/2020|หญิง|66.0|  ปี|      China|คนต่างชาติเดินทาง...|          นนทบุรี|มากกว่า 60 ปี|
|   25/01/2020|หญิง|33.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|     31-60 ปี|
|   26/01/2020|หญิง|57.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|     31-60 ปี|
|   26/01/2020|หญิง|73.0|  ปี|      Chin

In [28]:
# Check unique value of Province name
df_covid_group_age.select('province_of_onset').distinct().count()

81

In [29]:
# Since total number of Province need to be less than or equal 77, so we need to cross check with Province name list which collect in province.csv
df_province = spark.read.csv('datasets/province.csv', header=True, inferSchema=True)

In [38]:
# Check total number of Province name and Explore schema
print(f'Total province = {df_province.distinct().count()}')

df_province.printSchema()

Total province = 77
root
 |-- ProvinceNo: integer (nullable = true)
 |-- ProvinceMOI_ID: integer (nullable = true)
 |-- ProvinceNameThai: string (nullable = true)
 |-- ProvinceNameThaiAbbr: string (nullable = true)
 |-- ProvinceBudgetCode_2563: integer (nullable = true)
 |-- ProvinceNameEnglish: string (nullable = true)
 |-- ProvinceOfficialWebsite: string (nullable = true)
 |-- TopExecutivePositionandProvinceName: string (nullable = true)
 |-- AddressforCorrespondence: string (nullable = true)
 |-- RegionID: integer (nullable = true)
 |-- RegionName: string (nullable = true)
 |-- ProvincialClusterID: integer (nullable = true)
 |-- ProvincialClusterName: string (nullable = true)
 |-- RegionCGDWelfareIntegrationSystem: string (nullable = true)
 |-- RegionTMD: string (nullable = true)
 |-- RegionECT: string (nullable = true)
 |-- RegionNSO: string (nullable = true)
 |-- RegionOPSIndustry: string (nullable = true)
 |-- RegionArmy: string (nullable = true)
 |-- RegionPolice: string (nullab

In [31]:
# Select only the ProvinceNameThai column, rename to Province column and remove word 'จังหวัด' from province name
df_province_name = df_province.select('ProvinceNameThai').withColumn('province', regexp_replace('ProvinceNameThai', 'จังหวัด', '')).drop('ProvinceNameThai')

# Keep Province names as list
provinces =[row.province for row in df_province_name.collect()]
print( len(provinces) )

77


In [32]:
# Find incorrect or inconsistent Province name from Covid19 data
df_covid_group_age.select('province_of_onset').distinct().filter( ~ df_covid_group_age['province_of_onset'].isin(provinces) ).show()

+-----------------+
|province_of_onset|
+-----------------+
|         กรุงเทพฯ|
|            โคราช|
|             กทม.|
|          กรุงเทพ|
+-----------------+



In [33]:
# Replace correct Province name
df_covid_final = df_covid_group_age.withColumn('province',  when(( df_covid_group_age['province_of_onset']=='กรุงเทพฯ') | (df_covid_group_age['province_of_onset']=='กรุงเทพ') \
                                                                 | (df_covid_group_age['province_of_onset']=='กทม.') , 'กรุงเทพมหานคร')  \
                                                                 .when( df_covid_group_age['province_of_onset']=='โคราช', 'นครราชสีมา')
                                                                  .otherwise( df_covid_group_age['province_of_onset'])  )

# Verify that total number of Province is 77
df_covid_final.select('province').distinct().count()

77

In [34]:
# Final Output
df_covid_final.show()

+-------------+----+----+----+-----------+--------------------+-----------------+-------------+---------------+
|announce_date| sex| age|Unit|nationality|                risk|province_of_onset|    age_range|       province|
+-------------+----+----+----+-----------+--------------------+-----------------+-------------+---------------+
|   12/01/2020|หญิง|61.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|  กรุงเทพมหานคร|
|   17/01/2020|หญิง|74.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|  กรุงเทพมหานคร|
|   22/01/2020|หญิง|73.0|  ปี|   Thailand|คนต่างชาติเดินทาง...|           นครปฐม|มากกว่า 60 ปี|         นครปฐม|
|   22/01/2020| ชาย|68.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|  กรุงเทพมหานคร|
|   24/01/2020|หญิง|66.0|  ปี|      China|คนต่างชาติเดินทาง...|          นนทบุรี|มากกว่า 60 ปี|        นนทบุรี|
|   25/01/2020|หญิง|33.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|     31-60 ปี|  กรุงเทพ

In [35]:
# Save DataFrame to Parquet File
df_covid_final.write.parquet('output', mode='overwrite')

In [36]:
# Test Output File
df = spark.read.parquet('output')
df.show()

+-------------+----+----+----+-----------+--------------------+-----------------+-------------+---------------+
|announce_date| sex| age|Unit|nationality|                risk|province_of_onset|    age_range|       province|
+-------------+----+----+----+-----------+--------------------+-----------------+-------------+---------------+
|   12/01/2020|หญิง|61.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|  กรุงเทพมหานคร|
|   17/01/2020|หญิง|74.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|  กรุงเทพมหานคร|
|   22/01/2020|หญิง|73.0|  ปี|   Thailand|คนต่างชาติเดินทาง...|           นครปฐม|มากกว่า 60 ปี|         นครปฐม|
|   22/01/2020| ชาย|68.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|มากกว่า 60 ปี|  กรุงเทพมหานคร|
|   24/01/2020|หญิง|66.0|  ปี|      China|คนต่างชาติเดินทาง...|          นนทบุรี|มากกว่า 60 ปี|        นนทบุรี|
|   25/01/2020|หญิง|33.0|  ปี|      China|คนต่างชาติเดินทาง...|    กรุงเทพมหานคร|     31-60 ปี|  กรุงเทพ