## PySpark GCS Project

### Connection & Read Data from GCS

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = "/home/jovyan/cred/credentials.json"

conf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("Test Connection") \
    .set("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [4]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [5]:
spark

In [6]:
df = spark.read.csv("gs://supawat-workshop-bucket/raw_data/financial_transactions.csv", header=True)

In [7]:
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- status: string (nullable = true)



In [8]:
df.show()

+--------------+-----------+----------------+-------+-------------+-----------------+--------------+--------------+------------+---------+
|transaction_id|customer_id|transaction_date| amount|     merchant|         category|payment_method|          city|     country|   status|
+--------------+-----------+----------------+-------+-------------+-----------------+--------------+--------------+------------+---------+
|         T0001|       C164|      2025-09-16|  714.9|  Tops Market|Mobile & Internet|   Credit Card|     khon kaen|    THAILAND|   Failed|
|         T0002|       C167|      2025/09/23|   NULL|Central World|         Shopping|     PromptPay|        phuket|    thailand|  Pending|
|         T0003|       C025|      20-09-2025|3654.06| SCB Easy Pay|         Shopping|          Cash|       hat yai|    thailand|   Failed|
|         T0004|       C059|      2025-09-19|1913.03|   TrueMove H|         Shopping|          Cash|       hat yai|    THAILAND|   Failed|
|         T0005|       C187

In [9]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

### Standardize Data

In [10]:
df = df.withColumn("transaction_id", F.trim(F.upper(F.col("transaction_id"))))
df = df.withColumn("customer_id", F.trim(F.upper(F.col("customer_id"))))

In [11]:
df = df.withColumn("transaction_date",
              F.coalesce(
                  F.to_date("transaction_date", "yyyy-MM-dd"),
                  F.to_date("transaction_date", "yyyy/MM/dd"),
                  F.to_date("transaction_date", "dd-MM-yyyy"),
                  F.to_date("transaction_date", "dd/MM/yyyy")
              ))

In [12]:
df = df.withColumn("merchant", F.trim(F.initcap(F.regexp_replace(F.col("merchant"), "\s{2,}", " "))))

In [13]:
df = df.withColumn("category", F.trim(F.initcap(F.regexp_replace(F.col("category"), "\s{2,}", " "))))

In [14]:
df = df.withColumn("payment_method", F.trim(F.initcap(F.regexp_replace(F.col("payment_method"), "\s{2,}", " "))))

In [15]:
df = df.withColumn("city", F.initcap(F.trim(F.regexp_replace(F.col("city"), "\s{2,}", " "))))

In [16]:
df = df.withColumn("country", F.initcap(F.trim(F.col("country"))))

In [17]:
df = df.withColumn("status", F.initcap(F.trim(F.col("status"))))

### Correct Data Types

In [18]:
df = df.withColumn("amount", F.round(F.col("amount").cast("Float"), 2))

### Handle Missing Values

In [19]:
median_amount = df.select("amount").agg({"amount": "median"}).collect()[0][0]

In [20]:
df = df.na.fill(median_amount, subset=["amount"])

In [21]:
df.show()

+--------------+-----------+----------------+-------+-------------+-----------------+--------------+----------+--------+---------+
|transaction_id|customer_id|transaction_date| amount|     merchant|         category|payment_method|      city| country|   status|
+--------------+-----------+----------------+-------+-------------+-----------------+--------------+----------+--------+---------+
|         T0001|       C164|      2025-09-16|  714.9|  Tops Market|Mobile & Internet|   Credit Card| Khon Kaen|Thailand|   Failed|
|         T0002|       C167|      2025-09-23| 2250.9|Central World|         Shopping|     Promptpay|    Phuket|Thailand|  Pending|
|         T0003|       C025|      2025-09-20|3654.06| Scb Easy Pay|         Shopping|          Cash|   Hat Yai|Thailand|   Failed|
|         T0004|       C059|      2025-09-19|1913.03|   Truemove H|         Shopping|          Cash|   Hat Yai|Thailand|   Failed|
|         T0005|       C187|      2025-09-18|4628.45| Bts Skytrain|  Food & Beverag

### Drop Duplicate Values

In [22]:
df = df.dropDuplicates(subset=["transaction_id"])

In [23]:
df.show()

+--------------+-----------+----------------+-------+-------------+-----------------+--------------+----------+--------+---------+
|transaction_id|customer_id|transaction_date| amount|     merchant|         category|payment_method|      city| country|   status|
+--------------+-----------+----------------+-------+-------------+-----------------+--------------+----------+--------+---------+
|         T0001|       C164|      2025-09-16|  714.9|  Tops Market|Mobile & Internet|   Credit Card| Khon Kaen|Thailand|   Failed|
|         T0002|       C167|      2025-09-23| 2250.9|Central World|         Shopping|     Promptpay|    Phuket|Thailand|  Pending|
|         T0003|       C025|      2025-09-20|3654.06| Scb Easy Pay|         Shopping|          Cash|   Hat Yai|Thailand|   Failed|
|         T0004|       C059|      2025-09-19|1913.03|   Truemove H|         Shopping|          Cash|   Hat Yai|Thailand|   Failed|
|         T0005|       C187|      2025-09-18|4628.45| Bts Skytrain|  Food & Beverag

### Save Data to GCS

In [24]:
df.write.mode("overwrite").parquet("gs://supawat-workshop-bucket/clean_data/financial_transactions_parquet")

In [25]:
df_test = spark.read.parquet("gs://supawat-workshop-bucket/clean_data/financial_transactions_parquet/*")

In [26]:
df_test.show()

+--------------+-----------+----------------+-------+-------------+-----------------+--------------+----------+--------+---------+
|transaction_id|customer_id|transaction_date| amount|     merchant|         category|payment_method|      city| country|   status|
+--------------+-----------+----------------+-------+-------------+-----------------+--------------+----------+--------+---------+
|         T0001|       C164|      2025-09-16|  714.9|  Tops Market|Mobile & Internet|   Credit Card| Khon Kaen|Thailand|   Failed|
|         T0002|       C167|      2025-09-23| 2250.9|Central World|         Shopping|     Promptpay|    Phuket|Thailand|  Pending|
|         T0003|       C025|      2025-09-20|3654.06| Scb Easy Pay|         Shopping|          Cash|   Hat Yai|Thailand|   Failed|
|         T0004|       C059|      2025-09-19|1913.03|   Truemove H|         Shopping|          Cash|   Hat Yai|Thailand|   Failed|
|         T0005|       C187|      2025-09-18|4628.45| Bts Skytrain|  Food & Beverag