Milestones 3

Nama: Tazqia Ranyanisha A

Batch: CODA-RMT-008


Program ini bertujuan untuk menerapkan konsep validasi, eksplorasi, dan pembersihan data menggunakan Python dan PySpark, sekaligus mengotomasi proses ETL dengan Airflow agar alur analisis data menjadi lebih efisien.

Link PPT: https://docs.google.com/presentation/d/18nDG6Y8enVH4ywTpDfKehhyIOKqnQXIM/edit?usp=sharing&ouid=102360167114929655326&rtpof=true&sd=true


Link dataset: https://www.kaggle.com/datasets/ahsan81/superstore-marketing-campaign-dataset

# Superstore

Superstore adalah sebuah platform ritel yang sedang mempersiapkan campaign promosi akhir tahun. 

## Business Goal
Perusahaan berusaha memaksimalkan dampak campaign promosi dengan menargetkan pelanggan yang paling mungkin merespon positif.

## Challenge
Seiring bertambahnya basis pelanggan, volume data meningkat sehingga pengelolaan manual menjadi tidak efisien dan rawan kesalahan.

## Solusi
- Pembersihan dan transformasi data menggunakan **PySpark**  
- Penyimpanan hasil ke **MongoDB**  
- Otomatisasi pipeline dengan **Airflow**

## Atribut (features)
| Atribut               | Deskripsi |
|-----------------------|-----------|
| **Response (target)** | 1 = pelanggan menerima tawaran pada kampanye terakhir, 0 = sebaliknya |
| **ID**                | Identifier unik tiap pelanggan |
| **Year_Birth**        | Tahun lahir (untuk menghitung umur) |
| **Complain**          | 1 = pelanggan mengajukan komplain dalam 2 tahun terakhir |
| **Dt_Customer**       | Tanggal pendaftaran pelanggan pada perusahaan |
| **Education**         | Tingkat pendidikan pelanggan |
| **Marital**           | Status pernikahan pelanggan |
| **Kidhome**           | Jumlah anak kecil di rumah |
| **Teenhome**          | Jumlah remaja di rumah |
| **Income**            | Pendapatan rumah tangga per tahun |
| **MntFishProducts**   | Jumlah pengeluaran untuk produk ikan (2 tahun terakhir) |
| **MntMeatProducts**   | Jumlah pengeluaran untuk produk daging (2 tahun terakhir) |
| **MntFruits**         | Jumlah pengeluaran untuk buah (2 tahun terakhir) |
| **MntSweetProducts**  | Jumlah pengeluaran untuk produk manis (2 tahun terakhir) |
| **MntWines**          | Jumlah pengeluaran untuk wine (2 tahun terakhir) |
| **MntGoldProds**      | Jumlah pengeluaran untuk produk emas (2 tahun terakhir) |
| **NumDealsPurchases** | Jumlah pembelian dengan diskon |
| **NumCatalogPurchases** | Jumlah pembelian via katalog (order kirim pos) |
| **NumStorePurchases** | Jumlah pembelian langsung di toko |
| **NumWebPurchases**   | Jumlah pembelian melalui website |
| **NumWebVisitsMonth** | Jumlah kunjungan ke website dalam 1 bulan terakhir |
| **Recency**           | Jumlah hari sejak pembelian terakhir |



In [1]:
import pyspark
sc = pyspark.SparkContext()
sc

In [2]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("WriteToPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()
spark

In [3]:
# Baca file CSV
file_path = "superstore_data.csv"   
df = spark.read.csv(file_path, header=True, inferSchema=True)

df.limit(10).toPandas()

Unnamed: 0,Id,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,...,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Response,Complain
0,1826,1970,Graduation,Divorced,84835,0,0,6/16/2014,0,189,...,111,189,218,1,4,4,6,1,1,0
1,1,1961,Graduation,Single,57091,0,0,6/15/2014,0,464,...,7,0,37,1,7,3,7,5,1,0
2,10476,1958,Graduation,Married,67267,0,1,5/13/2014,0,134,...,15,2,30,1,3,2,5,2,0,0
3,1386,1967,Graduation,Together,32474,1,1,11/5/2014,0,10,...,0,0,0,1,1,0,2,7,0,0
4,5371,1989,Graduation,Single,21474,1,0,8/4/2014,0,6,...,11,0,34,2,3,1,2,7,1,0
5,7348,1958,PhD,Single,71691,0,0,3/17/2014,0,336,...,240,32,43,1,4,7,5,2,1,0
6,4073,1954,2n Cycle,Married,63564,0,0,1/29/2014,0,769,...,15,34,65,1,10,10,7,6,1,0
7,1991,1967,Graduation,Together,44931,0,1,1/18/2014,0,78,...,0,0,7,1,2,1,3,5,0,0
8,4047,1954,PhD,Married,65324,0,1,11/1/2014,0,384,...,21,32,5,3,6,2,9,4,0,0
9,9477,1954,PhD,Married,65324,0,1,11/1/2014,0,384,...,21,32,5,3,6,2,9,4,0,0


In [4]:
# Lihat struktur schema 
df.printSchema()

# Hitung jumlah baris dan kolom
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")

root
 |-- Id: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: string (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- Response: integer (nullable = true)
 |-- Complain: integer (nullable = true)

Rows: 2240, Columns: 

In [5]:
#  Cek Missing Values

from pyspark.sql.functions import col, sum

# Hitung jumlah nilai null per kolom
df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()


+---+----------+---------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+--------+--------+
| Id|Year_Birth|Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|Response|Complain|
+---+----------+---------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+--------+--------+
|  0|         0|        0|             0|    24|      0|       0|          0|      0|       0|        0|              0|              0|               0|           0|         

In [6]:
df = df.dropna(subset=['Income'])

In [7]:
# Cek schema
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: string (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- Response: integer (nullable = true)
 |-- Complain: integer (nullable = true)



In [8]:
from pyspark.sql.functions import to_date, col
# Ubah Dt_Customer ke tipe Date (format: bulan/hari/tahun)
df = df.withColumn("Dt_Customer", to_date(col("Dt_Customer"), "M/d/yyyy"))
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Year_Birth: integer (nullable = true)
 |-- Education: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Kidhome: integer (nullable = true)
 |-- Teenhome: integer (nullable = true)
 |-- Dt_Customer: date (nullable = true)
 |-- Recency: integer (nullable = true)
 |-- MntWines: integer (nullable = true)
 |-- MntFruits: integer (nullable = true)
 |-- MntMeatProducts: integer (nullable = true)
 |-- MntFishProducts: integer (nullable = true)
 |-- MntSweetProducts: integer (nullable = true)
 |-- MntGoldProds: integer (nullable = true)
 |-- NumDealsPurchases: integer (nullable = true)
 |-- NumWebPurchases: integer (nullable = true)
 |-- NumCatalogPurchases: integer (nullable = true)
 |-- NumStorePurchases: integer (nullable = true)
 |-- NumWebVisitsMonth: integer (nullable = true)
 |-- Response: integer (nullable = true)
 |-- Complain: integer (nullable = true)



In [9]:
from pyspark.sql.functions import lit, col

# Tambah kolom Age (2025 - Year_Birth)
df = df.withColumn("Age", lit(2025) - col("Year_Birth"))

# Drop kolom Year_Birth
df = df.drop("Year_Birth")

# Cek hasil
df.select("Id", "Age").show(10)


+-----+---+
|   Id|Age|
+-----+---+
| 1826| 55|
|    1| 64|
|10476| 67|
| 1386| 58|
| 5371| 36|
| 7348| 67|
| 4073| 71|
| 1991| 58|
| 4047| 71|
| 9477| 71|
+-----+---+
only showing top 10 rows


In [10]:
from pyspark.sql.functions import col


# Filter data: drop semua baris yang Age > 95
df = df.filter(col("Age") <= 95)

# Cek hasil
df.select("Id", "Age").show(5)

+-----+---+
|   Id|Age|
+-----+---+
| 1826| 55|
|    1| 64|
|10476| 67|
| 1386| 58|
| 5371| 36|
+-----+---+
only showing top 5 rows


In [11]:
df

DataFrame[Id: int, Education: string, Marital_Status: string, Income: int, Kidhome: int, Teenhome: int, Dt_Customer: date, Recency: int, MntWines: int, MntFruits: int, MntMeatProducts: int, MntFishProducts: int, MntSweetProducts: int, MntGoldProds: int, NumDealsPurchases: int, NumWebPurchases: int, NumCatalogPurchases: int, NumStorePurchases: int, NumWebVisitsMonth: int, Response: int, Complain: int, Age: int]

VALIDASI DATA - GREAT EXPECTATIONS

In [12]:
!pip install "great-expectations==0.18.19"



In [13]:
!pip install numpy==1.26.4



In [16]:
# 1. Import & buat Data Context
from great_expectations.data_context import FileDataContext

context = FileDataContext.create(project_root_dir='./')


# 2. Buat atau update Expectation Suite

expectation_suite_name = 'expectation-superstore'
context.add_or_update_expectation_suite(expectation_suite_name)


# 3. Daftarkan datasource & asset
datasource_name = 'superstore-csv-v5'  
asset_name = 'superstore-data'
path_to_data = './superstore_data.csv'

# Hapus datasource lama kalau ada, agar tidak error
if datasource_name in context.list_datasources():
    context.sources.delete(datasource_name)

# Tambahkan datasource baru
datasource = context.sources.add_pandas(datasource_name)

# Tambahkan CSV asset
asset = datasource.add_csv_asset(asset_name, filepath_or_buffer=path_to_data)

# Build batch request
batch_request = asset.build_batch_request()

# 4. Buat validator pakai batch_request & expectation suite
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name
)

# 5. Cek 5 baris pertama dataset
validator.head()


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,Id,Year_Birth,Education,Marital_Status,Income,Kidhome,Teenhome,Dt_Customer,Recency,MntWines,...,MntFishProducts,MntSweetProducts,MntGoldProds,NumDealsPurchases,NumWebPurchases,NumCatalogPurchases,NumStorePurchases,NumWebVisitsMonth,Response,Complain
0,1826,1970,Graduation,Divorced,84835.0,0,0,6/16/2014,0,189,...,111,189,218,1,4,4,6,1,1,0
1,1,1961,Graduation,Single,57091.0,0,0,6/15/2014,0,464,...,7,0,37,1,7,3,7,5,1,0
2,10476,1958,Graduation,Married,67267.0,0,1,5/13/2014,0,134,...,15,2,30,1,3,2,5,2,0,0
3,1386,1967,Graduation,Together,32474.0,1,1,11/5/2014,0,10,...,0,0,0,1,1,0,2,7,0,0
4,5371,1989,Graduation,Single,21474.0,1,0,8/4/2014,0,6,...,11,0,34,2,3,1,2,7,1,0


1. Expectation 1:  kolom respon (thdp campaign) tidak boleh kosong

In [53]:
# Expectation: Column 'Response' tidak boleh kosong
validator.expect_column_values_to_not_be_null('Response')





Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 2240,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

2. Expectation 2: Kolom 'Id' harus unik

In [54]:
validator.expect_column_values_to_be_unique('Id')




Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 2240,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

3. Expectation 3: Dt_Customer harus berformat date

In [20]:
import datetime
# Expectation: kolom harus DateType
validator.expect_column_values_to_be_of_type(
    'Dt_Customer',
    'DateType'
)

{
  "success": true,
  "result": {
    "observed_value": "DateType"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

4. Expectation: Maksimum nilai kolom 'Complain' harus 1

In [56]:
# Expectation: Maksimum nilai kolom 'Complain' harus 1
# Artinya, tidak ada nilai > 1

validator.expect_column_max_to_be_between(
    column='Complain',   # kolom yang dicek
    min_value=0,         # minimum boleh 0 (tdk komplain)
    max_value=1          # maksimum harus 1
)





Calculating Metrics:   0%|          | 0/4 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": 1
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

5. Expectation : Education harus termasuk dalam set kategori

In [62]:
validator.expect_column_values_to_be_in_set(
    "Education",
    ["Basic", "2n Cycle", "Graduation", "Master", "PhD"]
)




Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 2240,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

6. Expectation : Income minimal 0 (tidak boleh negatif)

In [63]:
validator.expect_column_values_to_be_between("Income", min_value=0, max_value=1000000)




Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 2240,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 24,
    "missing_percent": 1.0714285714285714,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

7. Expectation : Kolom age harus dipastikan exist

In [21]:
validator.expect_column_to_exist(column='Age')

{
  "success": true,
  "result": {},
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [65]:
# Save into Expectation Suite

validator.save_expectation_suite(discard_failed_expectations=False)

CHECKPOINT

In [66]:
checkpoint_1 = context.add_or_update_checkpoint(
    name='checkpoint_superstore',
    validator=validator,
)


In [67]:
checkpoint_result = checkpoint_1.run()

Calculating Metrics:   0%|          | 0/37 [00:00<?, ?it/s]

DATA DOCS

In [68]:
# Build data docs

context.build_data_docs()

{'local_site': 'file:///home/jovyan/gx/uncommitted/data_docs/local_site/index.html'}

## **KEY TAKEAWAYS**

1. **Reliable Pipeline** – ETL yang otomatis memastikan data selalu diextract, transform, validate, dan load dengan konsisten tanpa manual intervention, sehingga error dan delay bisa diminimalisir.  

2. **Data Quality** – Dengan Great Expectations, data types, ranges, dan categories bisa divalidasi, sehingga segmentation lebih akurat, feature engineering lebih reliable, dan customer response analysis tetap konsisten.  

3. **Scalable Process** – Pipeline otomatis untuk transformation dan loading mampu handle dataset yang terus bertambah seiring growth customer base, tanpa mengorbankan performance maupun accuracy.  

4. **Customer Insights** – Feature hasil transform (contoh: age) membuat segmentation lebih presisi dan prediction terhadap response customer lebih baik.  
