In [None]:
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, lit, when,expr
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BooleanType, DoubleType, IntegerType, DateType

In [None]:
spark = SparkSession.builder.appName("BreweryApp").getOrCreate()
spark

In [None]:
product_range_path = './craft_beer_bar_sales/Product_range.csv'
transactions_path = './craft_beer_bar_sales/Transactions.csv'

# Product

In [None]:
product_df = spark.read.csv(product_range_path, header=True)
product_df.show(2, truncate = False)

In [None]:
product_df.describe().show()

In [None]:
for column in product_df.columns:
    col_cnt = product_df.select(column).distinct().count()
    print(f"Count Distinct {column}: {col_cnt}")

In [None]:
not_null_counts = product_df.select([sum((~col(c).isNull()).cast("int")).alias(c) for c in product_df.columns])
not_null_counts.show()

In [None]:
not_null_counts_pandas = not_null_counts.toPandas().transpose()
ax = not_null_counts_pandas.plot(kind="bar", stacked=True, legend=False, colormap="plasma")
plt.title("Product Value Counts in Columns")
plt.xlabel("Columns")
plt.ylabel("Number of Null Values")
plt.axhline(y=product_df.count(), color='red', linestyle='--', label='Upper Limit')
plt.show()

# Transactions

In [None]:
transaction_df = spark.read.csv(transactions_path, header=True)
transaction_df.show(2, truncate = False)

In [None]:
transaction_df.describe().show()

In [None]:
for column in transaction_df.columns:
    col_cnt = transaction_df.select(column).distinct().count()
    print(f"Count Distinct {column}: {col_cnt}")

In [None]:
not_null_counts = transaction_df.select([sum((~col(c).isNull()).cast("int")).alias(c) for c in transaction_df.columns])
not_null_counts.show()

In [None]:
not_null_counts_pandas = not_null_counts.toPandas().transpose()
ax = not_null_counts_pandas.plot(kind="bar", stacked=True, legend=False, colormap="plasma")
plt.title("Transactions Value Counts in Columns")
plt.xlabel("Columns")
plt.ylabel("Number of Null Values")
plt.axhline(y=transaction_df.count(), color='red', linestyle='--', label='Upper Limit')
plt.show()

# Combined DataFrame Sales

In [None]:
sales_df = transaction_df\
    .join(product_df, transaction_df['product_code'] == product_df['Product_code'], how="left")\
        .drop(product_df['Product_code'])\
        .select("Date_and_time_of_unloading",  "Product_code",  "Vendor_code",  "Name",  "Retail_price",  "Base_unit",  "Country_of_Origin",  "Size",  "ABV",  "Amount",  "Sale_amount",  "Discount_amount",  "Profit",  "Percentage_markup",  "Discount_percentage")
sales_df.show(2)

# Data Cleaning

In [None]:
print("Total Product Rows:", product_df.count())
print("Total Transactions Rows:", transaction_df.count())
print("Total Sales Rows:", sales_df.count())

In [None]:
inactive_product_count = (
    product_df
    .join(
        transaction_df.select("product_code").distinct(),
        on="product_code",
        how="left_anti"
    )
    .count()
)
print(" Total Number of Products:", product_df.count())
print("Inactive List of Products:", inactive_product_count)

In [None]:
for column in sales_df.columns:
    col_cnt = sales_df.select(column).distinct().count()
    print(f"Count Distinct {column}: {col_cnt}")

## Null Values

In [None]:
not_null_counts = sales_df.select([sum((~col(c).isNull()).cast("int")).alias(c) for c in sales_df.columns])
print("Not Null Count")
not_null_counts.show()

In [None]:
not_null_counts_pandas = not_null_counts.toPandas().transpose()
ax = not_null_counts_pandas.plot(kind="bar", stacked=True, legend=False, colormap="plasma")
plt.title("Sales Value Counts in Columns")
plt.xlabel("Columns")
plt.ylabel("Number of Null Values")
plt.axhline(y=sales_df.count(), color='red', linestyle='--', label='Upper Limit')
plt.show()

In [None]:
total_sales = sales_df.count()
null_percentage_df = sales_df.agg(*[(sum(col(c).isNull().cast("int"))*100 / total_sales).alias(c) for c in sales_df.columns])
null_percentage_pandas = null_percentage_df.toPandas().transpose()
print("Null Percentage")
display(null_percentage_pandas)

In [None]:
null_counts = sales_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in sales_df.columns])
print("Null Value Count")
null_counts.show()

In [None]:
print('Sales Product without Retail Price')
sales_df.filter(col('Retail_price').isNull()).select('Product_code', 'Name').show()

In [None]:
print('Name: Snacks')
sales_df.filter(col('Name') == 'Snacks').show(5)
print('Vendor_code: Snacks')
sales_df.filter(col('Vendor_code') == 'Snacks').show(5)

In [None]:
print("Vendor_Code Count")
sales_df.groupBy('Vendor_code').count().orderBy('count', ascending = False).show(10)

In [None]:
print('Name: Soft drinks')
sales_df.filter(col('Name') == 'Soft drinks').show()

In [None]:
# Remove Snacks & Soft Drinks: Vendor_code & Name
sales_df = sales_df.filter(
    (col('Vendor_code') != 'Snacks') | 
    (col('Name') != 'Snacks') 
)
sales_df = sales_df.filter(
    (col('Name') != 'Soft drinks')
)

In [None]:
print('Sales with Vendor_code Null')
sales_df.filter(col('Vendor_code').isNull()).show(5)
print('Sales with Country_of_Origin Null')
sales_df.filter(col('Country_of_Origin').isNull()).show(5)

In [None]:
print('Sales with Size Null')
sales_df.filter(col('Size').isNull()).show(5)

In [None]:
# Remove Null Values from Vendor_Code & Country_of_Oriigin
sales_df = sales_df.na.fill("Other",["Vendor_code", "Country_of_Origin"]) 

In [None]:
# Unknown Value in Country of Origin
sales_df.filter(col('Country_of_Origin') == "???").show(5)

In [None]:
# Remove Unknown Value in Country of Origin
sales_df = sales_df.withColumn("Country_of_Origin", when(col("Country_of_Origin") == "???", "Other").otherwise(col("Country_of_Origin")))

In [None]:
# Remove unrequired fields
sales_df = sales_df.drop('Percentage_markup', 'Discount_percentage')
sales_df.show(5)

In [None]:
# Replacing Null with 0 for Discount
sales_df = sales_df.withColumn("Discount_amount", \
       when(col("Discount_amount").isNull() ,0) \
          .otherwise(col("Discount_amount"))) 

In [None]:
temp_df = sales_df.withColumn('cal_sale', expr("round((Retail_price * Amount)-Discount_amount,2)"))\
    .select('Retail_price', 'Amount', 'Discount_amount', 'Sale_amount', 'cal_sale')
print('Recalculate Sales_Amount')
temp_df.show(2)

In [None]:
print('Correct & Incorrect Sales Amount')
print("Correct: ",)
print("Incorrect: ",temp_df.filter(col('Sale_amount')!=col('cal_sale')).count())

In [None]:
labels = ['Correct', 'Miscalculated']
 
data = [
    temp_df.filter(col('Sale_amount')==col('cal_sale')).count(),
    temp_df.filter(col('Sale_amount')!=col('cal_sale')).count()
]
 
fig = plt.figure(figsize =(8, 5))
plt.pie(data, labels = labels, startangle = 90, autopct='%1.1f%%')

plt.title('Miscalculated Sales Amount Ratio')
 
plt.show()

In [None]:
# Correcting Sale_Amount with Actual Formula
sales_df = sales_df.withColumn('Sale_amount', expr("round((Retail_price * Amount)-Discount_amount,2)"))
sales_df.show(10)

## Casting Columns

In [None]:
sales_df = sales_df\
    .withColumn('Entry_date', col('Date_and_time_of_unloading').cast(DateType()))\
    .drop('Date_and_time_of_unloading')

In [None]:
sales_df = sales_df\
    .withColumn('Retail_price', col('Retail_price').cast(DoubleType()))\
    .withColumn('Size', col('Size').cast(DoubleType()))\
    .withColumn('ABV', col('ABV').cast(DoubleType()))\
    .withColumn('Amount', col('Amount').cast(DoubleType()))\
    .withColumn('Sale_amount', col('Sale_amount').cast(DoubleType()))\
    .withColumn('Discount_amount', col('Discount_amount').cast(DoubleType()))\
    .withColumn('Profit', col('Profit').cast(DoubleType()))

In [None]:
for column in sales_df.dtypes:
    print(column[0],column[1])

In [None]:
sales_df.show()

## Rename Columns

In [None]:
sales_df = sales_df.withColumnsRenamed({
    "Product_code": "pid",
    "Vendor_code": "vendor",
    "Name": "product",
    "Retail_price": "retail_price",
    "Base_unit": "base_unit",
    "Country_of_Origin": "origin_country",
    "Size": "size",
    "ABV": "abv",
    "Amount": "quantity",
    "Sale_amount": "total_sale",
    "Discount_amount": "discount",
    "Profit": "profit",
    "Entry_date": "entry_date"
})

In [None]:
# Reorder Columns
columns = sales_df.columns
columns.insert(0, columns.pop())
sales_df = sales_df.select(*columns)

In [None]:
sales_df.show()

In [None]:
# sales_df.coalesce(1).write.csv("output/sales", header = True)

# Explore

In [None]:
sales_df.createOrReplaceTempView('sales')
spark.sql("select * from sales").show()

In [None]:
print("Top 10 Profit Product of All Time")
spark.sql("""
SELECT
    pid,
    product,
    round(sum(profit)) total_profit
FROM SALES
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 10
""").show(truncate = False)

In [None]:
print("Top 5 Loss Product of All Time")
spark.sql("""
SELECT
    pid,
    product,
    round(sum(profit)) total_loss
FROM SALES
WHERE profit < 0
GROUP BY 1,2
ORDER BY 3
LIMIT 5
""").show(truncate = False)

In [None]:
print("Top 5 Country By Sales")
spark.sql("""
SELECT
    origin_country,
    CAST(sum(total_sale) AS DECIMAL) total_sale
FROM SALES
WHERE origin_country <> 'Other'
GROUP BY 1
ORDER BY 2 DESC
LIMIT 5
""").show(truncate = False)

In [None]:
print("Top 5 Vendor By Sales")
spark.sql("""
SELECT
    vendor,
    CAST(sum(total_sale) AS DECIMAL) total_sale
FROM SALES
WHERE vendor <> "Other"
GROUP BY 1
ORDER BY 2 DESC
LIMIT 5
""").show(truncate = False)

In [None]:
print("Top 10 Discounted Product")
spark.sql("""
SELECT
    pid,
    product,
    round(sum(discount)) total_discount
FROM SALES
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 10
""").show(truncate = False)

In [None]:
print("Top 10 Discounted Product with Profit")
spark.sql("""
SELECT
    pid,
    product,
    round(sum(profit)) total_profit,
    round(sum(discount)) total_discount
FROM SALES
WHERE discount IS NOT NULL
GROUP BY 1,2
ORDER BY 4 DESC,3 DESC
LIMIT 10
""").show(truncate = False)

# Clustering
Profitable and Sales Cluster


In [None]:
sales_df.show(5)

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

In [None]:
beer_data = sales_df\
    .filter(col('profit').isNotNull())\
    .groupBy('product')\
    .agg(sum("total_sale").alias("total_sale"), sum("profit").alias("profit"))

In [None]:
feature_columns = ['total_sale', 'profit']
assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features')
transformed_data = assembler.transform(beer_data)

In [None]:
kmeans = KMeans( k=4 , seed = 42 , featuresCol = "features")
model = kmeans.fit(transformed_data)

In [None]:
predictions = model.transform(transformed_data)
predictions.show(5)

In [None]:
predictions_pd = predictions.select("prediction", "total_sale", "profit").toPandas()
predictions_pd

In [None]:
plt.scatter(predictions_pd["total_sale"],predictions_pd["profit"],
           c = predictions_pd["prediction"], cmap = 'viridis' )
plt.xlabel("total_sale")
plt.ylabel("profit")
plt.title("K-means Clustering of Beer Products")
plt.show()

In [None]:
save_df = predictions.drop('features')
# save_df.coalesce(1).write.csv("output/clustering", header = True)

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol='features', predictionCol='prediction', metricName='silhouette')
silhouette = evaluator.evaluate(predictions)

print(f"Silhouette Score: {silhouette}")

In [None]:
spark.stop()