In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [6]:
# 1. kita baca CSV 
df_ramen = spark.read.csv("./MsRamen.csv", header=True, inferSchema=True)
df_th = spark.read.csv("./TransactionHeader.csv", header=True, inferSchema=True)
df_td = spark.read.csv("./TransactionDetail.csv", header=True, inferSchema=True)
df_customer = spark.read.csv("./MsCustomer.csv", header=True, inferSchema=True)
# df_ramen.show()

In [7]:
df_ramen.registerTempTable("MsRamen")
df_th.registerTempTable("TransactionHeader")
df_td.registerTempTable("TransactionDetail")
df_customer.registerTempTable("MsCustomer")

In [31]:
# Raw SQL
# 1. Cari expanse customer
# 2. Cari tau rata-rata expense customer

raw  = """
        WITH cte AS (
        SELECT AVG(sq.Expense) AS AverageExpense
        FROM (
            SELECT 
                mc.CustomerName,
                SUM(td.Quantity * mr.RamenPrice) AS Expense
                FROM MsCustomer mc JOIN TransactionHeader th 
                     ON mc.CustomerId = th.CustomerId
                     JOIN TransactionDetail td 
                     ON td.TransactionId = th.TransactionId
                     JOIN MsRamen mr 
                     ON mr.RamenId = td.RamenId
                GROUP BY CustomerName
            ) AS sq )
            SELECT
            mc.CustomerName, SUM(td.Quantity * mr.RamenPrice) AS Expense,
            CASE
                WHEN SUM(td.Quantity * mr.RamenPrice) > (SELECT AverageExpense FROM cte)
                THEN "Above Average"
                ELSE "Below Average"
            END AS SpendingCategory
            FROM MsCustomer mc JOIN TransactionHeader th 
                     ON mc.CustomerId = th.CustomerId
                     JOIN TransactionDetail td 
                     ON td.TransactionId = th.TransactionId
                     JOIN MsRamen mr 
                     ON mr.RamenId = td.RamenId
                     GROUP BY CustomerName
            
"""

# CTE -> Common Table Expression
spark.sql(raw).show()

+----------------+-------+----------------+
|    CustomerName|Expense|SpendingCategory|
+----------------+-------+----------------+
|Lorene MacParlan|1431000|   Above Average|
|   Jacobo Zemler|2055000|   Above Average|
|   Arney Dunkley|1061000|   Below Average|
|  Rhona Sutterby|1388000|   Above Average|
|     Marve Dagon|3172000|   Above Average|
|  Marilin Balfre|1925000|   Above Average|
| Salomi Georgins|2723000|   Above Average|
|  Clint Faulkner|1722000|   Above Average|
| Kaitlyn McKaile|1482000|   Above Average|
|    Stan Twydell| 495000|   Below Average|
|   Willey Mepham|2250000|   Above Average|
| Gabbey Silcocks|2294000|   Above Average|
|    Debee Kenson|2240000|   Above Average|
|Ellette Gonsalvo|1715000|   Above Average|
|     Rhett Marco|1220000|   Below Average|
| Windham Houlson| 946000|   Below Average|
|     Lenci Brace| 930000|   Below Average|
| Kingsly Dabling|1110000|   Below Average|
| Aila Tomaszczyk| 531000|   Below Average|
| Rudolph Tetther|1300000|   Abo

In [38]:
# Query Builder
from pyspark.sql.functions import *

customer_spending = df_th\
.join(df_td, "TransactionId")\
.join(df_customer, "CustomerId")\
.join(df_ramen, "RamenId")\
.groupBy("CustomerName")\
.agg(sum(col("Quantity") * col("RamenPrice"))\
     .alias("Expense"))




customer_spending.show()

+----------------+-------+
|    CustomerName|Expense|
+----------------+-------+
|Lorene MacParlan|1431000|
|   Jacobo Zemler|2055000|
|   Arney Dunkley|1061000|
|  Rhona Sutterby|1388000|
|     Marve Dagon|3172000|
|  Marilin Balfre|1925000|
| Salomi Georgins|2723000|
|  Clint Faulkner|1722000|
| Kaitlyn McKaile|1482000|
|    Stan Twydell| 495000|
|   Willey Mepham|2250000|
| Gabbey Silcocks|2294000|
|    Debee Kenson|2240000|
|Ellette Gonsalvo|1715000|
|     Rhett Marco|1220000|
| Windham Houlson| 946000|
|     Lenci Brace| 930000|
| Kingsly Dabling|1110000|
| Aila Tomaszczyk| 531000|
| Rudolph Tetther|1300000|
+----------------+-------+
only showing top 20 rows



In [39]:
# 2. Cari rata-rata

average_spending = customer_spending\
.agg(avg("Expense").alias("AverageSpending"))

average_spending.show()

+------------------+
|   AverageSpending|
+------------------+
|1291062.9539951575|
+------------------+



In [40]:
result = customer_spending.select(
    "CustomerName",
    "Expense",
    when(col("Expense") > average_spending.\
        first()["AverageSpending"], "Above Average")
    .when(col("Expense") == average_spending.\
         first()["AverageSpending"], "Average")
    .otherwise("Below Average")\
    .alias("SpendingCategory")
)

result.show()

+----------------+-------+----------------+
|    CustomerName|Expense|SpendingCategory|
+----------------+-------+----------------+
|Lorene MacParlan|1431000|   Above Average|
|   Jacobo Zemler|2055000|   Above Average|
|   Arney Dunkley|1061000|   Below Average|
|  Rhona Sutterby|1388000|   Above Average|
|     Marve Dagon|3172000|   Above Average|
|  Marilin Balfre|1925000|   Above Average|
| Salomi Georgins|2723000|   Above Average|
|  Clint Faulkner|1722000|   Above Average|
| Kaitlyn McKaile|1482000|   Above Average|
|    Stan Twydell| 495000|   Below Average|
|   Willey Mepham|2250000|   Above Average|
| Gabbey Silcocks|2294000|   Above Average|
|    Debee Kenson|2240000|   Above Average|
|Ellette Gonsalvo|1715000|   Above Average|
|     Rhett Marco|1220000|   Below Average|
| Windham Houlson| 946000|   Below Average|
|     Lenci Brace| 930000|   Below Average|
| Kingsly Dabling|1110000|   Below Average|
| Aila Tomaszczyk| 531000|   Below Average|
| Rudolph Tetther|1300000|   Abo

In [None]:
# Bar Plot

raw_query = """
    SELECT MONTH(TransactionDate) AS Month,
    COUNT(*) AS TransactionCount
    From TransactionHeader
    GROUP BY MONTH(TransactionDate)
    ORDER BY Month
    
"""

import mathlib.plot


In [None]:
# Pie Plot

plt.pie(transaction_count["TransactionCOunt"],\
        labels=transaction_count["Month"],
       autopct="%1.1f%%",
       startangle=140,
       )
plt.title("Transaction Per Month")
plt.axis("equal")
plt.show()

In [None]:
# Line Plot

plt.line(transaction_count["Month"],\
         transaction_count["TransactionCount"],\
         marker="o"
         linestyle="-"
         color="red"     
       )
plt.title("Transaction Per Month")
plt.xlabel("Month")
plt.ylabel("Count")
plt.grid(True)
plt.show()