In [None]:
# Create a SparkSession object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("log_reg").getOrCreate()

# Read the Parquet file and create a DataFrame
dfv = spark.read.parquet("dbfs:/FileStore/tables/beer_data.parquet")

In [None]:
# Dropping beer type and beer type num as they are not useful for our purposes
df = dfv.drop('beer_type','beer_type_num')

In [None]:
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler

In [None]:
# Using list comprehension to remove review_overall as part of input Cols in VectorAssmbler and the creating an output col of features that will have them all except review_overall
assembler = VectorAssembler(inputCols=[col for col in df.columns if col !='review_overall'], outputCol="features")

In [None]:
df=assembler.transform(df)

In [None]:
# Renamed count for sake of clarity

df = df.withColumnRenamed("count", "reviewer_total_reviews")

In [None]:
# Displaying current df so far

df.show()

+--------------+------------+-----------------+-------------+------------+--------+----------------------+-------------------+------------------+----------------+--------------------+
|review_overall|review_aroma|review_appearance|review_palate|review_taste|beer_abv|reviewer_total_reviews|total_brewery_beers|total_beer_reviews|beer_type_vector|            features|
+--------------+------------+-----------------+-------------+------------+--------+----------------------+-------------------+------------------+----------------+--------------------+
|           4.0|         4.0|              4.0|          4.0|         3.5|    12.0|                  1587|                 52|                 4|  (10,[1],[1.0])|(18,[0,1,2,3,4,5,...|
|           4.0|         4.5|              4.0|          4.0|         4.5|    9.46|                   388|                 93|               238|  (10,[0],[1.0])|(18,[0,1,2,3,4,5,...|
|           4.5|         4.0|              4.0|          4.5|         4.5|     6

## Correlations between each column and the outcome variable, review_overall

In [None]:
from pyspark.ml.stat import Correlation

### Aroma

In [None]:

aroma_overall = df.select("review_aroma", "review_overall")
aroma_overall.show(3)

+------------+--------------+
|review_aroma|review_overall|
+------------+--------------+
|         4.0|           4.0|
|         4.5|           4.0|
|         4.0|           4.5|
+------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=aroma_overall.columns, outputCol="features")
df_new_aro_ov=assembler.transform(aroma_overall)

In [None]:
df_new_aro_ov.show(3)

+------------+--------------+---------+
|review_aroma|review_overall| features|
+------------+--------------+---------+
|         4.0|           4.0|[4.0,4.0]|
|         4.5|           4.0|[4.5,4.0]|
|         4.0|           4.5|[4.0,4.5]|
+------------+--------------+---------+
only showing top 3 rows



In [None]:
# With spearman score of .5545 the features are slightly strong correlation
spearman_co_aro=Correlation.corr(df_new_aro_ov,'features',"spearman")
spearman_co_aro.display(2,False)

spearman(features)
1.0 0.5545310454436013 0.5545310454436013 1.0


### Appearance

In [None]:
appear_overall = df.select("review_appearance", "review_overall")
appear_overall.show(3)

+-----------------+--------------+
|review_appearance|review_overall|
+-----------------+--------------+
|              4.0|           4.0|
|              4.0|           4.0|
|              4.0|           4.5|
+-----------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=appear_overall.columns, outputCol="features")
df_new_app_ov=assembler.transform(appear_overall)

In [None]:
df_new_app_ov.show(3)

+-----------------+--------------+---------+
|review_appearance|review_overall| features|
+-----------------+--------------+---------+
|              4.0|           4.0|[4.0,4.0]|
|              4.0|           4.0|[4.0,4.0]|
|              4.0|           4.5|[4.0,4.5]|
+-----------------+--------------+---------+
only showing top 3 rows



In [None]:
# With spearman score of .4513 the features have a moderate correlation

spearman_co_app=Correlation.corr(df_new_app_ov,'features',"spearman")
spearman_co_app.show(2,False)

+--------------------------------------------------------------------------------------+
|spearman(features)                                                                    |
+--------------------------------------------------------------------------------------+
|1.0                  0.45131790078699796  \n0.45131790078699796  1.0                  |
+--------------------------------------------------------------------------------------+



### Palate

In [None]:
palate_overall = df.select("review_palate", "review_overall")
palate_overall.show(3)

+-------------+--------------+
|review_palate|review_overall|
+-------------+--------------+
|          4.0|           4.0|
|          4.0|           4.0|
|          4.5|           4.5|
+-------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=palate_overall.columns, outputCol="features")
df_new_pal_ov=assembler.transform(palate_overall)

In [None]:
df_new_pal_ov.show(3)

+-------------+--------------+---------+
|review_palate|review_overall| features|
+-------------+--------------+---------+
|          4.0|           4.0|[4.0,4.0]|
|          4.0|           4.0|[4.0,4.0]|
|          4.5|           4.5|[4.5,4.5]|
+-------------+--------------+---------+
only showing top 3 rows



In [None]:
# With spearman score of .6502 the features have a decently strong correlation

spearman_co_pal=Correlation.corr(df_new_pal_ov,'features',"spearman")
spearman_co_pal.show(2,False)

+----------------------------------------------------------------------------------+
|spearman(features)                                                                |
+----------------------------------------------------------------------------------+
|1.0                 0.6501944858199998  \n0.6501944858199998  1.0                 |
+----------------------------------------------------------------------------------+



### Taste

In [None]:
taste_overall = df.select("review_taste", "review_overall")
taste_overall.show(3)

+------------+--------------+
|review_taste|review_overall|
+------------+--------------+
|         3.5|           4.0|
|         4.5|           4.0|
|         4.5|           4.5|
+------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=taste_overall.columns, outputCol="features")
df_new_taste_ov=assembler.transform(taste_overall)

In [None]:
df_new_taste_ov.show(3)

+------------+--------------+---------+
|review_taste|review_overall| features|
+------------+--------------+---------+
|         3.5|           4.0|[3.5,4.0]|
|         4.5|           4.0|[4.5,4.0]|
|         4.5|           4.5|[4.5,4.5]|
+------------+--------------+---------+
only showing top 3 rows



In [None]:
# With spearman score of .7267 the features have a strong correlation

spearman_co_taste=Correlation.corr(df_new_taste_ov,'features',"spearman")
spearman_co_taste.display(2,False)

spearman(features)
1.0 0.726737163589635 0.726737163589635 1.0


### Beer ABV

In [None]:
abv_overall = df.select("beer_abv", "review_overall")
abv_overall.show(3)

+--------+--------------+
|beer_abv|review_overall|
+--------+--------------+
|    12.0|           4.0|
|    9.46|           4.0|
|     6.0|           4.5|
+--------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=abv_overall.columns, outputCol="features")
df_new_abv_ov=assembler.transform(abv_overall)

In [None]:
df_new_abv_ov.show(3)

+--------+--------------+----------+
|beer_abv|review_overall|  features|
+--------+--------------+----------+
|    12.0|           4.0|[12.0,4.0]|
|    9.46|           4.0|[9.46,4.0]|
|     6.0|           4.5| [6.0,4.5]|
+--------+--------------+----------+
only showing top 3 rows



In [None]:
# With spearman score of .1740 the features have no real correlation at all

spearman_co_abv=Correlation.corr(df_new_abv_ov,'features', "spearman")
spearman_co_abv.display(2,False)

spearman(features)
1.0 0.17401109082896948 0.17401109082896948 1.0


### Reviewer Total Reviews

In [None]:
count_overall = df.select("reviewer_total_reviews", "review_overall")
count_overall.show(3)

+----------------------+--------------+
|reviewer_total_reviews|review_overall|
+----------------------+--------------+
|                  1587|           4.0|
|                   388|           4.0|
|                   144|           4.5|
+----------------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=count_overall.columns, outputCol="features")
df_new_ct_ov=assembler.transform(count_overall)

In [None]:
df_new_ct_ov.show(3)

+----------------------+--------------+------------+
|reviewer_total_reviews|review_overall|    features|
+----------------------+--------------+------------+
|                  1587|           4.0|[1587.0,4.0]|
|                   388|           4.0| [388.0,4.0]|
|                   144|           4.5| [144.0,4.5]|
+----------------------+--------------+------------+
only showing top 3 rows



In [None]:
# With spearman score of -0.0663 the features have no correlation even if it is a negative one

spearman_co_ct=Correlation.corr(df_new_ct_ov,'features', "spearman")
spearman_co_ct.display(2,False)

spearman(features)
1.0 -0.0663082712349847 -0.0663082712349847 1.0


### Total Brewery Beers

In [None]:
tbb_overall = df.select("total_brewery_beers", "review_overall")
tbb_overall.show(3)

+-------------------+--------------+
|total_brewery_beers|review_overall|
+-------------------+--------------+
|                 52|           4.0|
|                 93|           4.0|
|                 10|           4.5|
+-------------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=tbb_overall.columns, outputCol="features")
df_new_tbb_ov=assembler.transform(tbb_overall)

In [None]:
df_new_tbb_ov.show(3)

+-------------------+--------------+----------+
|total_brewery_beers|review_overall|  features|
+-------------------+--------------+----------+
|                 52|           4.0|[52.0,4.0]|
|                 93|           4.0|[93.0,4.0]|
|                 10|           4.5|[10.0,4.5]|
+-------------------+--------------+----------+
only showing top 3 rows



In [None]:
# With spearman score of .1107 the features have no real correlation at all

spearman_co_tbb=Correlation.corr(df_new_tbb_ov,'features', "spearman")
spearman_co_tbb.display(2,False)

spearman(features)
1.0 0.11074428992754695 0.11074428992754695 1.0


### Total Beer Reviews

In [None]:
tbr_overall = df.select("total_beer_reviews", "review_overall")
tbr_overall.show(3)

+------------------+--------------+
|total_beer_reviews|review_overall|
+------------------+--------------+
|                 4|           4.0|
|               238|           4.0|
|               811|           4.5|
+------------------+--------------+
only showing top 3 rows



In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=tbr_overall.columns, outputCol="features")
df_new_tbr_ov=assembler.transform(tbr_overall)

In [None]:
df_new_tbr_ov.show(3)

+------------------+--------------+-----------+
|total_beer_reviews|review_overall|   features|
+------------------+--------------+-----------+
|                 4|           4.0|  [4.0,4.0]|
|               238|           4.0|[238.0,4.0]|
|               811|           4.5|[811.0,4.5]|
+------------------+--------------+-----------+
only showing top 3 rows



In [None]:
# With spearman score of .1740 the features have no real correlation at all

spearman_co_tbr=Correlation.corr(df_new_tbr_ov,'features', "spearman")
spearman_co_tbr.display(2,False)

spearman(features)
1.0 0.17000425009366174 0.17000425009366174 1.0


## Scaling - MinMaxScaler

In [None]:
from pyspark.ml.feature import MinMaxScaler

In [None]:
# Using min_max scaler to scale features from -1 to 1 in the mm_scaled_features column vector

mm_scaler =MinMaxScaler(inputCol="features",outputCol="mm_scaled_features", min=-1,max=1)
mm_scaler_model = mm_scaler.fit(df)

In [None]:
rescaled_df = mm_scaler_model.transform(df)
rescaled_df.show(10)

+--------------+------------+-----------------+-------------+------------+--------+----------------------+-------------------+------------------+----------------+--------------------+--------------------+
|review_overall|review_aroma|review_appearance|review_palate|review_taste|beer_abv|reviewer_total_reviews|total_brewery_beers|total_beer_reviews|beer_type_vector|            features|  mm_scaled_features|
+--------------+------------+-----------------+-------------+------------+--------+----------------------+-------------------+------------------+----------------+--------------------+--------------------+
|           4.0|         4.0|              4.0|          4.0|         3.5|    12.0|                  1587|                 52|                 4|  (10,[1],[1.0])|(18,[0,1,2,3,4,5,...|[0.5,0.6000000000...|
|           4.0|         4.5|              4.0|          4.0|         4.5|    9.46|                   388|                 93|               238|  (10,[0],[1.0])|(18,[0,1,2,3,4,5,.

In [None]:
### Indexing target variable

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:

label_indexer = StringIndexer(inputCol="review_overall", outputCol="label", stringOrderType="alphabetAsc")
df_lr = label_indexer.fit(df).transform(df)

In [None]:
from pyspark.sql.functions import col

# Group the DataFrame by the label column and count the number of rows in each group
label_counts = df_lr.groupBy('label').count().orderBy('label')

# Show the distribution of values in the label column
label_counts.show()


+-----+------+
|label| count|
+-----+------+
|  0.0|     7|
|  1.0| 10211|
|  2.0| 12032|
|  3.0| 35737|
|  4.0| 54660|
|  5.0|155840|
|  6.0|286901|
|  7.0|559790|
|  8.0|314303|
|  9.0| 88997|
+-----+------+



In [None]:
## Train, Test Split

In [None]:
# Split 80/20 train to test
train_data, test_data = df_lr.randomSplit([.8, .2], seed=22)

In [None]:
# Import necessary libraries
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a LogisticRegression object
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.01, elasticNetParam=0.01)

# Train the model
lr_model = lr.fit(train_data)

# Make predictions on the testing data
predictions = lr_model.transform(test_data)

# Evaluate the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Print the accuracy
print("Accuracy = %g" % accuracy)


Accuracy = 0.4628


In [None]:
# Extract the predicted labels and true labels from the predictions DataFrame
from pyspark.sql.functions import col
predAndLabels = predictions.select(col("prediction"), col("label"))

# Convert the predicted and true labels to an RDD
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(predAndLabels.rdd)

# Compute the precision, recall, and F1-score for each label
labels = predAndLabels.rdd.map(lambda x: x[1]).distinct().collect()
precision = {}
recall = {}
f1Score = {}
for label in labels:
    precision[label] = metrics.precision(label)
    recall[label] = metrics.recall(label)
    f1Score[label] = metrics.fMeasure(label)


# Print the classification report
print("Precision:")
for label in sorted(precision.keys()):
    print("Label %s: %.4f" % (label, precision[label]))
    
print("\nRecall:")
for label in sorted(recall.keys()):
    print("Label %s: %.4f" % (label, recall[label]))
    
print("\nF1-score:")
for label in sorted(f1Score.keys()):
    print("Label %s: %.4f" % (label, f1Score[label]))



Precision:
Label 0.0: 0.0000
Label 1.0: 0.0000
Label 2.0: 0.0000
Label 3.0: 0.1721
Label 4.0: 0.0392
Label 5.0: 0.2949
Label 6.0: 0.3678
Label 7.0: 0.5046
Label 8.0: 0.4915
Label 9.0: 0.8821

Recall:
Label 0.0: 0.0000
Label 1.0: 0.0000
Label 2.0: 0.0000
Label 3.0: 0.0439
Label 4.0: 0.0044
Label 5.0: 0.2468
Label 6.0: 0.2622
Label 7.0: 0.8658
Label 8.0: 0.3210
Label 9.0: 0.0105

F1-score:
Label 0.0: 0.0000
Label 1.0: 0.0000
Label 2.0: 0.0000
Label 3.0: 0.0700
Label 4.0: 0.0080
Label 5.0: 0.2687
Label 6.0: 0.3061
Label 7.0: 0.6376
Label 8.0: 0.3884
Label 9.0: 0.0207


In [None]:
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes

In [None]:
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dtc_model = dtc.fit(train_data)

# Make predictions on the testing data
dtc_predictions = dtc_model.transform(test_data)

# Evaluate the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(dtc_predictions)

# Print the accuracy
print("Accuracy = %g" % accuracy)

Accuracy = 0.509421


In [None]:
nb = NaiveBayes(featuresCol="features", labelCol="label")
nb_model = nb.fit(train_data)

# Make predictions on the testing data
nb_predictions = nb_model.transform(test_data)

# Evaluate the model using accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(nb_predictions)

# Print the accuracy
print("Accuracy = %g" % accuracy)

Accuracy = 0.0895771


In [None]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, NaiveBayes, MultilayerPerceptronClassifier

# Define the list of classification models to try
models = [LogisticRegression(maxIter=10, regParam=0.01), 
          DecisionTreeClassifier(),
          RandomForestClassifier(numTrees=10),
          NaiveBayes()]

results = []

# Train each model and evaluate its accuracy on test data
for model in models:
    # Train the model
    model_fit = model.fit(train_data.select(['features', 'label']))
    
    # Make predictions on the test data
    predictions = model_fit.transform(test_data.select(['features', 'label']))
    
    # Evaluate the model's accuracy
    evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
    accuracy = evaluator.evaluate(predictions)
    
    # Store the model name and accuracy in a dictionary
    model_result = {'Model': type(model).__name__, 'Accuracy': accuracy}

    # Append the dictionary to the results list
    results.append(model_result)

    # Print the model's name and accuracy
    print(f'{model.__class__.__name__}: accuracy = {accuracy:.4f}')

LogisticRegression: accuracy = 0.4642
DecisionTreeClassifier: accuracy = 0.5094
RandomForestClassifier: accuracy = 0.4918
NaiveBayes: accuracy = 0.0896
