In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MinIO Read Example") \
    .config("spark.jars", "jars/hadoop-aws-3.3.2.jar,jars/aws-java-sdk-bundle-1.11.1026.jar") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .getOrCreate()

In [4]:
df_transactions = spark.read.option("header", True) \
    .csv("s3a://raw-data/transactions.csv")

In [5]:
df_customers = spark.read.option("header", True) \
    .csv("s3a://raw-data/customers.csv")

In [6]:
df_transactions.printSchema()

root
 |-- Invoice ID: string (nullable = true)
 |-- Line: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Unit Price: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Line Total: string (nullable = true)
 |-- Store ID: string (nullable = true)
 |-- Employee ID: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- Currency Symbol: string (nullable = true)
 |-- SKU: string (nullable = true)
 |-- Transaction Type: string (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Invoice Total: string (nullable = true)



In [7]:
df_customers.printSchema()

root
 |-- Customer ID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Date Of Birth: string (nullable = true)
 |-- Job Title: string (nullable = true)



In [8]:
from pyspark.sql.functions import col, sum, when

missing_counts = df_transactions.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_transactions.columns
])

missing_counts.show()

+----------+----+-----------+----------+------+-------+----------+--------+----+--------+----------+--------+-----------+--------+---------------+---+----------------+--------------+-------------+
|Invoice ID|Line|Customer ID|Product ID|  Size|  Color|Unit Price|Quantity|Date|Discount|Line Total|Store ID|Employee ID|Currency|Currency Symbol|SKU|Transaction Type|Payment Method|Invoice Total|
+----------+----+-----------+----------+------+-------+----------+--------+----+--------+----------+--------+-----------+--------+---------------+---+----------------+--------------+-------------+
|         0|   0|          0|         0|413102|4350783|         0|       0|   0|       0|         0|       0|          0|       0|              0|  0|               0|             0|            0|
+----------+----+-----------+----------+------+-------+----------+--------+----+--------+----------+--------+-----------+--------+---------------+---+----------------+--------------+-------------+



In [9]:
from pyspark.sql.functions import col, when, count

total_rows = df_transactions.count()

missing_size = df_transactions.filter(col("Size").isNull()).count()
missing_color = df_transactions.filter(col("Color").isNull()).count()

print(f"Size missing: {missing_size} ({missing_size/total_rows:.2%})")
print(f"Color missing: {missing_color} ({missing_color/total_rows:.2%})")

Size missing: 413102 (6.44%)
Color missing: 4350783 (67.80%)


In [10]:
from pyspark.sql.functions import col, count, when

# 1. Hitung nilai yang paling sering muncul (modus) untuk kolom kategorikal
most_common_size = (
    df_transactions.filter(col("Size").isNotNull())
      .groupBy("Size")
      .count()
      .orderBy("count", ascending=False)
      .first()[0]
)

most_common_color = (
    df_transactions.filter(col("Color").isNotNull())
      .groupBy("Color")
      .count()
      .orderBy("count", ascending=False)
      .first()[0]
)

# 2. Isi missing value dengan nilai modus tersebut
df_transactions_filled = df_transactions.fillna({
    "Size": most_common_size,
    "Color": most_common_color
})

In [11]:
from pyspark.sql.functions import col, sum, when

missing_counts = df_transactions_filled.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_transactions_filled.columns
])

missing_counts.show()

+----------+----+-----------+----------+----+-----+----------+--------+----+--------+----------+--------+-----------+--------+---------------+---+----------------+--------------+-------------+
|Invoice ID|Line|Customer ID|Product ID|Size|Color|Unit Price|Quantity|Date|Discount|Line Total|Store ID|Employee ID|Currency|Currency Symbol|SKU|Transaction Type|Payment Method|Invoice Total|
+----------+----+-----------+----------+----+-----+----------+--------+----+--------+----------+--------+-----------+--------+---------------+---+----------------+--------------+-------------+
|         0|   0|          0|         0|   0|    0|         0|       0|   0|       0|         0|       0|          0|       0|              0|  0|               0|             0|            0|
+----------+----+-----------+----------+----+-----+----------+--------+----+--------+----------+--------+-----------+--------+---------------+---+----------------+--------------+-------------+



In [12]:
from pyspark.sql.functions import col, sum, when

missing_counts = df_customers.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_customers.columns
])

missing_counts.show()

+-----------+----+-----+---------+----+-------+------+-------------+---------+
|Customer ID|Name|Email|Telephone|City|Country|Gender|Date Of Birth|Job Title|
+-----------+----+-----+---------+----+-------+------+-------------+---------+
|          0|   0|    0|        0|   0|      0|     0|            0|   584185|
+-----------+----+-----+---------+----+-------+------+-------------+---------+



In [13]:
most_common_size = (
    df_customers.filter(col("Job Title").isNotNull())
      .groupBy("Job Title")
      .count()
      .orderBy("count", ascending=False)
      .first()[0]
)

# 2. Isi missing value dengan nilai modus tersebut
df_customers_filled = df_customers.fillna({
    "Job Title": most_common_size
})

In [14]:
from pyspark.sql.functions import col, sum, when

missing_counts = df_customers_filled.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_customers_filled.columns
])

missing_counts.show()

+-----------+----+-----+---------+----+-------+------+-------------+---------+
|Customer ID|Name|Email|Telephone|City|Country|Gender|Date Of Birth|Job Title|
+-----------+----+-----+---------+----+-------+------+-------------+---------+
|          0|   0|    0|        0|   0|      0|     0|            0|        0|
+-----------+----+-----+---------+----+-------+------+-------------+---------+



In [15]:
df_transactions_filled.show(2)
df_customers_filled.show(2)

+-------------------+----+-----------+----------+----+-----+----------+--------+-------------------+--------+----------+--------+-----------+--------+---------------+-----------+----------------+--------------+-------------+
|         Invoice ID|Line|Customer ID|Product ID|Size|Color|Unit Price|Quantity|               Date|Discount|Line Total|Store ID|Employee ID|Currency|Currency Symbol|        SKU|Transaction Type|Payment Method|Invoice Total|
+-------------------+----+-----------+----------+----+-----+----------+--------+-------------------+--------+----------+--------+-----------+--------+---------------+-----------+----------------+--------------+-------------+
|INV-US-001-03558761|   1|      47162|       485|   M| BLUE|      80.5|       1|2023-01-01 15:42:00|     0.0|      80.5|       1|          7|     USD|              $| MASU485-M-|            Sale|          Cash|        126.7|
|INV-US-001-03558761|   2|      47162|      2779|   G| BLUE|      31.5|       1|2023-01-01 15:42:00|

In [19]:
from pyspark.sql import functions as F

# Join kedua tabel di customer ID
df_joined = df_transactions_filled.join(df_customers_filled, on="Customer ID", how="inner")

# 1. Total penjualan dan jumlah transaksi per customer
agg_customer = df_joined.groupBy("Customer ID", "Name").agg(
    F.countDistinct("Invoice ID").alias("num_transactions"),
    F.sum("Invoice Total").alias("total_sales"),
    F.avg("Invoice Total").alias("avg_sales")
).orderBy(F.desc("total_sales"))

agg_customer.show(5, truncate=False)

+-----------+------+----------------+-----------------+------------------+
|Customer ID|Name  |num_transactions|total_sales      |avg_sales         |
+-----------+------+----------------+-----------------+------------------+
|577063     |李秀华|15              |86566.68999999999|2278.070789473684 |
|671898     |王慧  |10              |81611.0          |3022.6296296296296|
|450001     |张峰  |18              |79720.96         |1771.576888888889 |
|694236     |赵娟  |19              |77675.53         |1726.1228888888888|
|482569     |花刚  |18              |75297.15         |2596.453448275862 |
+-----------+------+----------------+-----------------+------------------+
only showing top 5 rows



In [20]:
# 2. Total penjualan per city
agg_city = df_joined.groupBy("City").agg(
    F.countDistinct("Invoice ID").alias("num_transactions"),
    F.sum("Invoice Total").alias("total_sales"),
    F.countDistinct("Customer ID").alias("num_customers")
).orderBy(F.desc("total_sales"))

agg_city.show(5, truncate=False)

+----+----------------+--------------------+-------------+
|City|num_transactions|total_sales         |num_customers|
+----+----------------+--------------------+-------------+
|深圳|207214          |2.1415250447999996E8|55941        |
|上海|203540          |2.1169211219E8      |40976        |
|广州|193603          |2.0375263251999995E8|32996        |
|北京|159529          |1.667636002999999E8 |46232        |
|重庆|107921          |1.1371054553000012E8|24003        |
+----+----------------+--------------------+-------------+
only showing top 5 rows



In [21]:
# 3. Penjualan dan jumlah customer berdasarkan gender
agg_gender = df_joined.groupBy("Gender").agg(
    F.countDistinct("Invoice ID").alias("num_transactions"),
    F.sum("Invoice Total").alias("total_sales"),
    F.countDistinct("Customer ID").alias("num_customers")
)

agg_gender.show()

+------+----------------+-------------------+-------------+
|Gender|num_transactions|        total_sales|num_customers|
+------+----------------+-------------------+-------------+
|     F|         2901929|9.343590403999864E8|       596099|
|     M|         1635652|6.273548965599934E8|       686397|
|     D|            2823|          954399.91|         1211|
+------+----------------+-------------------+-------------+



In [22]:
# 4. Top 10 customer by total sales
top_customers = agg_customer.limit(10)

top_customers.show(truncate=False)

+-----------+------+----------------+-----------------+------------------+
|Customer ID|Name  |num_transactions|total_sales      |avg_sales         |
+-----------+------+----------------+-----------------+------------------+
|577063     |李秀华|15              |86566.68999999999|2278.070789473684 |
|671898     |王慧  |10              |81611.0          |3022.6296296296296|
|450001     |张峰  |18              |79720.96         |1771.576888888889 |
|694236     |赵娟  |19              |77675.53         |1726.1228888888888|
|482569     |花刚  |18              |75297.15         |2596.453448275862 |
|363327     |陈慧  |21              |72841.45         |1348.9157407407406|
|502106     |齐秀芳|26              |72105.18000000001|1360.4750943396227|
|411743     |郑涛  |5               |72068.25         |2771.855769230769 |
|484353     |柏秀华|22              |71872.74         |1437.4548000000002|
|549169     |孙建军|16              |70282.75         |2129.780303030303 |
+-----------+------+----------------+------------