In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, date_format, to_timestamp
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline


In [2]:
spark = SparkSession.builder.appName("Beer analysis").getOrCreate()

train = spark.read.csv(
    # "gs://shaun05122024/brewery_data_complete_extended.csv",
    "data/15_percent_validation_data.csv",
    header=True,
    inferSchema=True,
)

In [3]:
train.count()

1500000

In [4]:
train = train.dropDuplicates()
train = train.na.drop()

In [5]:
train = train.withColumn("Total_Sales", col("Total_Sales").cast(FloatType()))

train = train.withColumn(
    "Brew_Date", to_timestamp(col("Brew_Date"), "yyyy-MM-dd HH:mm:ss")
)
train = (
    train.withColumn("Month", date_format(col("Brew_Date"), "MM"))
    .withColumn("Day", date_format(col("Brew_Date"), "dd"))
    .withColumn("Year", date_format(col("Brew_Date"), "yyyy"))
)

In [6]:
train.show(3)

+--------+-------------------+----------+-------+-----------+-----------------+------------------+-----------------+------------------+-----------------+----------+-----+----------------+---------------+-----------+-----------------+--------------------+-------------------+------------------------+----------------------------+-----+---+----+
|Batch_ID|          Brew_Date|Beer_Style|    SKU|   Location|Fermentation_Time|       Temperature|         pH_Level|           Gravity|  Alcohol_Content|Bitterness|Color|Ingredient_Ratio|Volume_Produced|Total_Sales|    Quality_Score|Brewhouse_Efficiency|Loss_During_Brewing|Loss_During_Fermentation|Loss_During_Bottling_Kegging|Month|Day|Year|
+--------+-------------------+----------+-------+-----------+-----------------+------------------+-----------------+------------------+-----------------+----------+-----+----------------+---------------+-----------+-----------------+--------------------+-------------------+------------------------+-------------

In [7]:
categoricalColumns = ['Beer_Style', 'SKU', 'Location']

In [8]:
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "OHE"])
    stages += [stringIndexer, encoder]

In [9]:
numeric_columns = ['Fermentation_Time', 'Temperature', 'pH_Level', 'Gravity', 'Alcohol_Content', 'Bitterness', 'Color', 'Volume_Produced', 'Quality_Score', 'Brewhouse_Efficiency', 'Loss_During_Brewing', 'Loss_During_Fermentation', 'Loss_During_Bottling_Kegging']

In [10]:
assemblerInputs = [c + "OHE" for c in categoricalColumns] + numeric_columns

In [11]:
assembler = VectorAssembler(
    inputCols=assemblerInputs,
    outputCol="features",
)
stages += [assembler]

In [12]:
train_data, test_data = train.randomSplit([0.8, 0.2], seed=42)

In [13]:
rf = LinearRegression(featuresCol="features", labelCol="Total_Sales")
stages += [rf]

In [14]:
pipeline = Pipeline(stages=stages)

In [15]:
model = pipeline.fit(train_data)

In [17]:
predictions = model.transform(test_data)

evaluator = RegressionEvaluator(
    labelCol="Total_Sales", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

evaluator = RegressionEvaluator(
    labelCol="Total_Sales", predictionCol="prediction", metricName="mse"
)

mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error on test data = {mse}")

Root Mean Squared Error (RMSE) on test data = 5489.715745223387
Mean Squared Error on test data = 30136978.96335357
