In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
pip install pyspark

**IMPORTING ALL THE NECESSARY FILES**

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
#from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
from pyspark import sql
from pyspark.sql import SparkSession

**CREATING SPARK SESSION**

In [None]:
spark = SparkSession.builder.appName("wine_prediction").getOrCreate()

In [None]:
spark

#![](http://)

![WINE](https://images.unsplash.com/photo-1569153482031-a3cebdedf294?ixlib=rb-4.0.3&ixid=MnwxMjA3fDB8MHxzZWFyY2h8Mnx8d2luZSUyMGdsYXNzfGVufDB8fDB8fA)

**READING THE DATA FILES**

In [None]:
df=spark.read.csv('/kaggle/input/playground-series-s3e5/train.csv',inferSchema=True, header=True)

In [None]:
test_main=spark.read.csv('/kaggle/input/playground-series-s3e5/test.csv',inferSchema=True, header=True)

In [None]:
sub=spark.read.csv('/kaggle/input/playground-series-s3e5/sample_submission.csv',inferSchema=True, header=True)

In [None]:
df.show()

**showing the data in pandas format**

In [None]:
df.toPandas()

**CHECKING THE NULL VALUES**

In [None]:
from pyspark.sql.functions import col,isnan, when, count

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

In [None]:
df=df.drop('Id')

In [None]:
data=df.select('fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide','total sulfur dioxide','density','pH','sulphates','alcohol','quality')
data.show(10)

**CHCEKING DATA TYPES**

In [None]:
df.dtypes

**CREATING VECTORS**

In [None]:
featVect = VectorAssembler(inputCols=['fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide','total sulfur dioxide','density','pH','sulphates','alcohol'], outputCol="features")
data=featVect.transform(df)

**Split the Data**

I will use 70% of the data for training, and reserve 30% for testing. In the testing data, the label column is renamed to trueLabel so I can use it later to compare predicted labels with known actual values.

In [None]:

splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

**Define the Pipeline**


A pipeline consists of a series of transformer and estimator stages that typically prepare a DataFrame for modeling and then train a predictive model. In this case, you will create a pipeline with seven stages:


A StringIndexer estimator that converts string values to indexes for categorical features
A VectorAssembler that combines categorical features into a single vector
A VectorIndexer that creates indexes for a vector of categorical features
A VectorAssembler that creates a vector of continuous numeric features
A MinMaxScaler that normalizes continuous numeric features
A VectorAssembler that creates a vector of categorical and continuous features
A DecisionTreeClassifier that trains a classification model.

In [None]:
#strIdx = StringIndexer(inputCol = "Carrier", outputCol = "CarrierIdx")
#catVect = VectorAssembler(inputCols = ["CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID"], outputCol="catFeatures")
#catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
#numVect = VectorAssembler(inputCols = ["DepDelay"], outputCol="numFeatures")
#minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
#featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="quality",featuresCol="features",maxIter=10,regParam=0.3)
#dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
#pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])


In [None]:
train.show()

In [None]:
test.show()

**RANDOM FOREST MODEL**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf=RandomForestClassifier(labelCol='quality',featuresCol='features',maxDepth=5)

In [None]:
model1=lr.fit(train)

In [None]:
predictions1=model1.transform(test)

In [None]:
evaluator1=MulticlassClassificationEvaluator(labelCol='quality',predictionCol='prediction',metricName='accuracy')

In [None]:
accuracy=evaluator.evaluate(predictions)
print(accuracy)

In [None]:
model=rf.fit(train)

In [None]:
predictions=model.transform(test)

**EVALUATION METRICS**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
evaluator=MulticlassClassificationEvaluator(labelCol='quality',predictionCol='prediction',metricName='accuracy')

**CHECKING THE ACCURACY**

In [None]:
accuracy=evaluator.evaluate(predictions)

In [None]:
print(accuracy)

In [None]:
test_main=test_main.drop("id")

In [None]:
test_main.show()

In [None]:
featVect1 = VectorAssembler(inputCols=['fixed acidity','volatile acidity','citric acid','residual sugar','chlorides','free sulfur dioxide','total sulfur dioxide','density','pH','sulphates','alcohol'], outputCol="features")
data1=featVect1.transform(test_main)

In [None]:
data1.show()

**PREDICTIONS**

In [None]:
pred=model.transform(data1)

In [None]:
pred.show()

In [None]:
pred.count()

In [None]:
sub.count()

In [None]:
pred1=pred.select(col('prediction').cast('integer'))

In [None]:
pred1=pred1.toPandas()

In [None]:
pred1

In [None]:
sub.show()

In [None]:
sub=sub.toPandas()

In [None]:
sub['quality']=pred1['prediction']

**CONVERTING PREDICTIONS TO CSV FILE**

In [None]:
sub.to_csv("wine_quality.csv",index=False)