
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WineQualityModel").getOrCreate()

# File location and type
file_location = "/FileStore/tables/TrainingDataset.csv"
file_type = "csv"

df = spark.read.format('csv').load(path=file_location, header=True, inferSchema=True, sep=';')



In [0]:
df.printSchema()

root
 |-- """""fixed acidity"""": double (nullable = true)
 |-- """"volatile acidity"""": double (nullable = true)
 |-- """"citric acid"""": double (nullable = true)
 |-- """"residual sugar"""": double (nullable = true)
 |-- """"chlorides"""": double (nullable = true)
 |-- """"free sulfur dioxide"""": double (nullable = true)
 |-- """"total sulfur dioxide"""": double (nullable = true)
 |-- """"density"""": double (nullable = true)
 |-- """"pH"""": double (nullable = true)
 |-- """"sulphates"""": double (nullable = true)
 |-- """"alcohol"""": double (nullable = true)
 |-- """"quality""""": integer (nullable = true)



In [0]:
for col_names in df.columns:
    clean_names=col_names.replace('"','').strip()
    df = df.withColumnRenamed(col_names, clean_names)

df.printSchema()


root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [0]:
df.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          8.9|            0.22|       0.48|           1.8|    0.077|               29.0|                60.0| 0.9968|3.39|     0.53|    9.4|      6|
|          7.6|            0.39|       0.31|           2.3|    0.082|               23.0|                71.0| 0.9982|3.52|     0.65|    9.7|      5|
|          7.9|            0.43|       0.21|           1.6|    0.106|               10.0|                37.0| 0.9966|3.17|     0.91|    9.5|      5|
|          8.5|            0.49|       0.11|           2.3|    0.084|                9.0|           

In [0]:
df.columns

Out[5]: ['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [0]:
df.groupBy("quality").count().show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|  522|
|      3|    9|
|      5|  529|
|      4|   45|
|      8|   13|
|      7|  161|
+-------+-----+



In [0]:
from pyspark.sql.functions import col
majority_classes=[5,6]
minority_classes=[3,4,7,8]
df_majority = df.filter(col("quality").isin(majority_classes))
oversampled_dataframes = [df_majority]
majority_count=df_majority.count()
for minority_class in minority_classes:
    df_minority=df.filter(col("quality")==minority_class)
    minority_count=df_minority.count()

    if minority_count>0:
        oversample_ratio=majority_count/minority_count
        df_oversampled=df_minority.sample(withReplacement=True, fraction=oversample_ratio)
        oversampled_dataframes.append(df_oversampled)

    

df_balanced = oversampled_dataframes[0]
for oversampled_df in oversampled_dataframes[1:]:
    df_balanced=df_balanced.union(oversampled_df)

df_balanced.groupby("quality").count().show()


+-------+-----+
|quality|count|
+-------+-----+
|      6|  522|
|      5|  529|
|      3| 1018|
|      4| 1066|
|      7|  997|
|      8| 1076|
+-------+-----+



In [0]:
df=df_balanced

In [0]:
feature_cols = df.columns[:-1]
from pyspark.ml.feature import VectorAssembler, StandardScaler
vect_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [0]:
df_features = vect_assembler.transform(df)
df_features.show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|            features|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+--------------------+
|          8.9|            0.22|       0.48|           1.8|    0.077|               29.0|                60.0| 0.9968|3.39|     0.53|    9.4|      6|[8.9,0.22,0.48,1....|
|          7.6|            0.39|       0.31|           2.3|    0.082|               23.0|                71.0| 0.9982|3.52|     0.65|    9.7|      5|[7.6,0.39,0.31,2....|
|          7.9|            0.43|       0.21|           1.6|    0.106|               10.0|                37.0| 0.9966|3.17|     0.91|    9.5|    

In [0]:
df_final=df_features.select("features","quality")
df_final.show(5)

+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[8.9,0.22,0.48,1....|      6|
|[7.6,0.39,0.31,2....|      5|
|[7.9,0.43,0.21,1....|      5|
|[8.5,0.49,0.11,2....|      5|
|[6.9,0.4,0.14,2.4...|      6|
+--------------------+-------+
only showing top 5 rows



In [0]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler_model=scaler.fit(df_final)
df_train_scaled=scaler_model.transform(df_final)
df_scaled_final=df_train_scaled.select("scaledFeatures", "quality")

In [0]:
df_scaled_final.describe().show()

+-------+------------------+
|summary|           quality|
+-------+------------------+
|  count|              5208|
|   mean| 5.507296466973886|
| stddev|1.8588424679297129|
|    min|                 3|
|    max|                 8|
+-------+------------------+



In [0]:
df_scaled_final.printSchema()

root
 |-- scaledFeatures: vector (nullable = true)
 |-- quality: integer (nullable = true)



In [0]:
df_scaled_final.show(5)

+--------------------+-------+
|      scaledFeatures|quality|
+--------------------+-------+
|[0.37679404013317...|      6|
|[-0.3697371460185...|      5|
|[-0.1974607184450...|      5|
|[0.14709213670188...|      5|
|[-0.7717154770233...|      6|
+--------------------+-------+
only showing top 5 rows



In [0]:
df_scaled_final.groupBy("quality").count().show()

+-------+-----+
|quality|count|
+-------+-----+
|      6|  522|
|      5|  529|
|      3| 1018|
|      4| 1066|
|      7|  997|
|      8| 1076|
+-------+-----+



In [0]:
print(f"Training feature vector size: {len(df_scaled_final.select('scaledFeatures').first()[0])}")

Training feature vector size: 11


In [0]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="quality")
model=lr.fit(df_scaled_final)

In [0]:
file_location = "/FileStore/tables/ValidationDataset.csv"
file_type = "csv"

df_test = spark.read.format('csv').load(path=file_location, header=True, inferSchema=True, sep=';')

In [0]:
for col_name in df_test.columns:
    new_clean_names = col_name.replace('"','').strip()
    df_test = df_test.withColumnRenamed(col_name, new_clean_names)

df_test.printSchema()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: integer (nullable = true)
 |-- total sulfur dioxide: integer (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)



In [0]:
df_test.columns

Out[21]: ['fixed acidity',
 'volatile acidity',
 'citric acid',
 'residual sugar',
 'chlorides',
 'free sulfur dioxide',
 'total sulfur dioxide',
 'density',
 'pH',
 'sulphates',
 'alcohol',
 'quality']

In [0]:
from pyspark.ml.feature import StandardScaler
feature_cols_test=df_test.columns[:-1]
from pyspark.ml.feature import VectorAssembler
vect_assembler_test = VectorAssembler(inputCols=feature_cols_test, outputCol="features_assembled")
df_test = vect_assembler_test.transform(df_test)
df_features_final=df_test.select("features_assembled", "quality")


df_features_final.show(5)

+--------------------+-------+
|  features_assembled|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
+--------------------+-------+
only showing top 5 rows



In [0]:
df_features_final.printSchema()


root
 |-- features_assembled: vector (nullable = true)
 |-- quality: integer (nullable = true)



In [0]:
scaler_test = StandardScaler(inputCol="features_assembled", outputCol="scaledFeatures", withStd=True, withMean=True)
scaler_model_test=scaler_test.fit(df_features_final)
df_test_scaled=scaler_model_test.transform(df_features_final)
df_test_final=df_test_scaled.select("scaledFeatures", "quality")

In [0]:
df_test_final.describe().show()

+-------+------------------+
|summary|           quality|
+-------+------------------+
|  count|               160|
|   mean|           5.73125|
| stddev|0.8296538264228721|
|    min|                 3|
|    max|                 8|
+-------+------------------+



In [0]:
df_test_final.printSchema()

root
 |-- scaledFeatures: vector (nullable = true)
 |-- quality: integer (nullable = true)



In [0]:
df_test_final.show(5)

+--------------------+-------+
|      scaledFeatures|quality|
+--------------------+-------+
|[-0.9458482018892...|      5|
|[-0.7512667653064...|      5|
|[-0.7512667653064...|      5|
|[0.90267544564740...|      6|
|[-0.9458482018892...|      5|
+--------------------+-------+
only showing top 5 rows



In [0]:
print(f"Test feature vector size: {len(df_test_final.select('scaledFeatures').first()[0])}")

Test feature vector size: 11


In [0]:
predictions = model.transform(df_test_final)
predictions.printSchema()


root
 |-- scaledFeatures: vector (nullable = true)
 |-- quality: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [0]:
predictions.select("quality", "prediction").show(10)

+-------+----------+
|quality|prediction|
+-------+----------+
|      5|       3.0|
|      5|       4.0|
|      5|       4.0|
|      6|       6.0|
|      5|       3.0|
|      5|       4.0|
|      5|       4.0|
|      7|       4.0|
|      7|       4.0|
|      5|       4.0|
+-------+----------+
only showing top 10 rows



In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName="accuracy")
accuracy=evaluator.evaluate(predictions)
print(f"Validation Accuracy: {accuracy:.2f}")


Validation Accuracy: 0.14


In [0]:
predictions.select("quality", "prediction", "scaledFeatures").show(10)



+-------+----------+--------------------+
|quality|prediction|      scaledFeatures|
+-------+----------+--------------------+
|      5|       3.0|[-0.9458482018892...|
|      5|       4.0|[-0.7512667653064...|
|      5|       4.0|[-0.7512667653064...|
|      6|       6.0|[0.90267544564740...|
|      5|       3.0|[-0.9458482018892...|
|      5|       4.0|[-0.9458482018892...|
|      5|       4.0|[-0.7026214061607...|
|      7|       4.0|[-0.9944935610349...|
|      7|       4.0|[-0.7512667653064...|
|      5|       4.0|[-0.8972028427435...|
+-------+----------+--------------------+
only showing top 10 rows



In [0]:
evaluator.setMetricName("f1")
f1_score=evaluator.evaluate(predictions)
print(f"Validation F1 Score: {f1_score:.2f}")

Validation F1 Score: 0.19


In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf=RandomForestClassifier(featuresCol="scaledFeatures", labelCol="quality", numTrees=50, maxDepth=10, seed=42)

In [0]:
rf_model=rf.fit(df_scaled_final)
rf_predictions=rf_model.transform(df_test_final)
rf_predictions.printSchema()

root
 |-- scaledFeatures: vector (nullable = true)
 |-- quality: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [0]:
rf_predictions.select("quality", "prediction", "scaledFeatures").show(10)

+-------+----------+--------------------+
|quality|prediction|      scaledFeatures|
+-------+----------+--------------------+
|      5|       4.0|[-0.9458482018892...|
|      5|       6.0|[-0.7512667653064...|
|      5|       3.0|[-0.7512667653064...|
|      6|       6.0|[0.90267544564740...|
|      5|       4.0|[-0.9458482018892...|
|      5|       4.0|[-0.9458482018892...|
|      5|       3.0|[-0.7026214061607...|
|      7|       6.0|[-0.9944935610349...|
|      7|       4.0|[-0.7512667653064...|
|      5|       6.0|[-0.8972028427435...|
+-------+----------+--------------------+
only showing top 10 rows



In [0]:
rf_evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="rf_prediction", metricName="rf_accuracy")
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Validation Accuracy: {rf_accuracy:.2f}")

Random Forest Validation Accuracy: 0.55


In [0]:
rf_evaluator.setMetricName("f1")
rf_f1_score = evaluator.evaluate(rf_predictions)
print(f"Random Forest Validation F1 Score: {rf_f1_score:.2f}")

Random Forest Validation F1 Score: 0.55


In [0]:
rf_predictions.select("quality", "prediction", "scaledFeatures")

Out[39]: DataFrame[quality: int, prediction: double, scaledFeatures: vector]

In [0]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

gbt = GBTClassifier(featuresCol="scaledFeatures", labelCol="quality", maxIter=100, maxDepth=5)

gbt_model=gbt.fit(df_scaled_final)

predictions_gbt = gbt_model.transform(df_test_final)




Unexpected exception formatting exception. Falling back to standard exception


Traceback (most recent call last):
  File "/databricks/python/lib/python3.9/site-packages/IPython/core/interactiveshell.py", line 3378, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<command-3988865129309903>", line 6, in <module>
    gbt_model=gbt.fit(df_scaled_final)
  File "/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py", line 30, in patched_method
    result = original_method(self, *args, **kwargs)
  File "/databricks/spark/python/pyspark/ml/base.py", line 205, in fit
    return self._fit(dataset)
  File "/databricks/spark/python/pyspark/ml/wrapper.py", line 383, in _fit
    java_model = self._fit_java(dataset)
  File "/databricks/spark/python/pyspark/ml/wrapper.py", line 380, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/databricks/spark/python/pyspark/errors/exceptio

