In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col

# Inisialisasi SparkSession
spark = SparkSession.builder \
    .appName("ETL_Pipeline") \
    .getOrCreate()

# Extract: Baca file CSV
df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

# Transform: Filter Revenue > 100 dan hitung total penjualan per kategori
df_filtered = df.filter(col("Revenue") > 100)
df_result = df_filtered.groupBy("Product_Category") \
    .agg(sum("Revenue").alias("total_sales"))

# Tampilkan hasil transformasi
df_result.show()

# Load: Simpan ke file Parquet
df_result.write.mode("overwrite").parquet("output_sales.parquet")

# Tutup sesi Spark
spark.stop()

+----------------+-----------+
|Product_Category|total_sales|
+----------------+-----------+
|        Clothing|    8198902|
|     Accessories|   13559164|
|           Bikes|   61782134|
+----------------+-----------+



In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Sales Analysis") \
    .getOrCreate()

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, sum, count

# 1. Start ulang SparkSession
spark = SparkSession.builder \
    .appName("Sales Analysis") \
    .getOrCreate()

# 2. Load file CSV
df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

# 3. Hitung pendapatan per bulan
df_revenue = df.withColumn("month", month("Date")) \
    .groupBy("month") \
    .agg(sum(df["Unit_Price"] * df["Order_Quantity"]).alias("total_revenue"))

df_revenue.show()

# 4. Produk terlaris
df_top_products = df.groupBy("Product") \
    .agg(count("*").alias("total_orders")) \
    .orderBy("total_orders", ascending=False) \
    .limit(5)

df_top_products.show()

+-----+-------------+
|month|total_revenue|
+-----+-------------+
|   12|     10158080|
|    1|      7832338|
|    6|     10085537|
|    3|      8201790|
|    5|      9859851|
|    9|      6517880|
|    4|      8485163|
|    8|      6348349|
|    7|      6392045|
|   10|      6709394|
|   11|      6977157|
|    2|      7608734|
+-----+-------------+

+--------------------+------------+
|             Product|total_orders|
+--------------------+------------+
|Water Bottle - 30...|       10794|
| Patch Kit/8 Patches|       10416|
|  Mountain Tire Tube|        6816|
|        AWC Logo Cap|        4358|
|Sport-100 Helmet,...|        4220|
+--------------------+------------+



In [10]:
df_revenue.write.mode("overwrite").parquet("revenue_by_month.parquet")
df_top_products.write.mode("overwrite").parquet("top_products.parquet")