# Data Cleansing

## Import necessary libraries

In [1]:
%pip install -qq -r ../requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
# Add current directory to Python path for imports
import os
import sys
from dotenv import load_dotenv

import findspark

from pyspark.sql import SparkSession

# Add the parent directory (project root) to Python path so we can import from src
project_root = os.path.dirname(os.getcwd())
if project_root not in sys.path:
    sys.path.append(project_root)

In [3]:
# Utility Functions
from src.utils import get_dot_env_path

# Load environment variables from .env file
load_dotenv(dotenv_path=get_dot_env_path())

HADOOP_HOME = os.getenv("HADOOP_HOME", "")
SPARK_HOME = os.getenv("SPARK_HOME", "")
JAVA_HOME = os.getenv("JAVA_HOME", "")

In [4]:
# Set Python executable for PySpark
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Set Hadoop, Spark and Java home
os.environ["HADOOP_HOME"] = HADOOP_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
os.environ["JAVA_HOME"] = JAVA_HOME

# Update system PATH
os.environ["PATH"] += os.pathsep + os.path.join(HADOOP_HOME, "bin")

In [5]:
# Initialize findspark
findspark.init(SPARK_HOME)

# Initialize Spark session (reuse existing session if available)
spark = (
    SparkSession.builder.appName("CleansingData")
    .config("spark.executorEnv.PYTHONPATH", project_root)
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

## Loading Datasets

In [6]:
from src.utils import read_config_path

# Load data using configuration file
filepath = read_config_path(key="raw_data_path")

df = spark.read.csv(
    filepath,
    header=True,
    inferSchema=True,
    multiLine=True,
    escape='"',
    quote='"',
)

df.show(10)

+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+-------------+--------------------+---------+----+------------+--------------------+
|  ticket_id|               type|        organization|             comment|               photo|         photo_after|            coords|             address|subdistrict|district|     province|           timestamp|    state|star|count_reopen|       last_activity|
+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+-------------+--------------------+---------+----+------------+--------------------+
|2021-FYJTFP|        {ความสะอาด}|          เขตบางซื่อ|             ขยะเยอะ|https://storage.g...|                NULL|100.53084,13.81865|12/14 ถนน กรุงเทพ...|       NULL|    NULL|กรุงเทพมหานคร|2021-09-03 19:51:..

---

## Testing Transformers

### Ingestion Preprocessor

In [None]:
from src.pipelines_spark import IngestionPreprocessorSpark

preprocessor = IngestionPreprocessorSpark()
cleaned_df = preprocessor.transform(df)

cleaned_df.show(10)

In [None]:
cleaned_df.printSchema()

### Date Transformer

In [None]:
from src.pipelines_spark import DateTransformerSpark

dt = DateTransformerSpark()
df_transformed = dt.transform(df)

df_transformed.show(10)

### Province Transformer

In [None]:
from src.pipelines_spark import ProvinceTransformerSpark

pt = ProvinceTransformerSpark()
df_transformed = pt.transform(df)

df_area_count = df_transformed.groupBy("province").count()
df_area_count.orderBy("count", ascending=False).show(10)

### District and Subdistrict Transformer

In [None]:
from src.pipelines_spark import DistrictSubdistrictTransformerSpark

dst = DistrictSubdistrictTransformerSpark()
df_transformed = dst.transform(df)

df_area_count = df_transformed.groupBy("district", "subdistrict").count()
df_area_count.orderBy("count", ascending=False).show(10)

### Coordinate Transformer

In [None]:
# from src.pipelines import CoordinateTransformer

# ct = CoordinateTransformer()
# df_transformed = pd.DataFrame(ct.fit_transform(df))

# df_transformed.head(10)

### Address Transformer

In [None]:
from src.pipelines_spark import AddressTransformerSpark

at = AddressTransformerSpark()
df_transformed = at.transform(df)

df_transformed.show(10)

### State to Status Transformer

In [None]:
from src.pipelines_spark import StateToStatusTransformerSpark

stst = StateToStatusTransformerSpark()
df_transformed = stst.transform(df_transformed)

df_status_count = df_transformed.groupBy("status").count()
df_status_count.show()

---

## Applying Cleansing Pipeline

In [7]:
from src.pipelines_spark import CleansingPipelineSpark

cleansing_pipeline = CleansingPipelineSpark()
df_cleansed = cleansing_pipeline.transform(df)

df_cleansed.show(10)

+-----------+-------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+-------------+--------------+---------------+--------------+------------------+-------------------+------------------+------+
|  ticket_id|               type|        organization|             comment|            coords|             address|subdistrict|district|     province|timestamp_date|timestamp_month|timestamp_year|last_activity_date|last_activity_month|last_activity_year|status|
+-----------+-------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+-------------+--------------+---------------+--------------+------------------+-------------------+------------------+------+
|2021-CGPMUN|{น้ำท่วม,ร้องเรียน}|เขตประเวศ,ฝ่ายโยธ...|น้ำท่วมเวลาฝนตกแล...|100.66709,13.67891|189 เฉลิมพระเกียร...|    หนองบอน|  ประเวศ|กรุงเทพมหานคร|            19|              9|          2021|                21

In [8]:
df_cleansed.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp_date: integer (nullable = true)
 |-- timestamp_month: integer (nullable = true)
 |-- timestamp_year: integer (nullable = true)
 |-- last_activity_date: integer (nullable = true)
 |-- last_activity_month: integer (nullable = true)
 |-- last_activity_year: integer (nullable = true)
 |-- status: string (nullable = true)



In [9]:
import shutil

# Define base data directory and final file path
base_data_dir = os.path.join("..", "data", "processed")
temp_output_dir = os.path.join(base_data_dir, "cleansed_data_spark-temp")

final_file = os.path.join(base_data_dir, "cleansed_data_spark.csv")

# Write to single partition (one CSV file)
df_cleansed.coalesce(1).write.csv(temp_output_dir, header=True, mode="overwrite")

# Find the generated part file inside the output directory
for filename in os.listdir(temp_output_dir):
    if filename.startswith("part-") and filename.endswith(".csv"):
        part_file = os.path.join(temp_output_dir, filename)
        shutil.move(part_file, final_file)
        break

shutil.rmtree(temp_output_dir, ignore_errors=True)

---