In [20]:
from pyspark.sql.functions import col
import numpy as np
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext, SparkConf
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest, GradientBoostedTrees


In [2]:

import pyspark

from pyspark.sql import SparkSession


conf = pyspark.SparkConf().setAppName('winequality').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
spark

In [4]:
df = spark.read.format("csv").load("TrainingDataset.csv" , header = True ,sep =";")
df.printSchema()
df.show()

root
 |-- """""fixed acidity"""": string (nullable = true)
 |-- """"volatile acidity"""": string (nullable = true)
 |-- """"citric acid"""": string (nullable = true)
 |-- """"residual sugar"""": string (nullable = true)
 |-- """"chlorides"""": string (nullable = true)
 |-- """"free sulfur dioxide"""": string (nullable = true)
 |-- """"total sulfur dioxide"""": string (nullable = true)
 |-- """"density"""": string (nullable = true)
 |-- """"pH"""": string (nullable = true)
 |-- """"sulphates"""": string (nullable = true)
 |-- """"alcohol"""": string (nullable = true)
 |-- """"quality""""": string (nullable = true)

+----------------------+------------------------+-------------------+----------------------+-----------------+---------------------------+----------------------------+---------------+----------+-----------------+---------------+----------------+
|"""""fixed acidity""""|""""volatile acidity""""|""""citric acid""""|""""residual sugar""""|""""chlorides""""|""""free sulfur dioxid

In [5]:
df.describe()

DataFrame[summary: string, """""fixed acidity"""": string, """"volatile acidity"""": string, """"citric acid"""": string, """"residual sugar"""": string, """"chlorides"""": string, """"free sulfur dioxide"""": string, """"total sulfur dioxide"""": string, """"density"""": string, """"pH"""": string, """"sulphates"""": string, """"alcohol"""": string, """"quality""""": string]

In [6]:
df.columns[0:-1]

['"""""fixed acidity""""',
 '""""volatile acidity""""',
 '""""citric acid""""',
 '""""residual sugar""""',
 '""""chlorides""""',
 '""""free sulfur dioxide""""',
 '""""total sulfur dioxide""""',
 '""""density""""',
 '""""pH""""',
 '""""sulphates""""',
 '""""alcohol""""']

In [7]:

for col_name in df.columns[0:-1]+['""""quality"""""']:
    df = df.withColumn(col_name, col(col_name).cast('float'))
df = df.withColumnRenamed('""""quality"""""', "label")

In [8]:
df

DataFrame["""""fixed acidity"""": float, """"volatile acidity"""": float, """"citric acid"""": float, """"residual sugar"""": float, """"chlorides"""": float, """"free sulfur dioxide"""": float, """"total sulfur dioxide"""": float, """"density"""": float, """"pH"""": float, """"sulphates"""": float, """"alcohol"""": float, label: float]

In [9]:
#checking null values
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

+----------------------+------------------------+-------------------+----------------------+-----------------+---------------------------+----------------------------+---------------+----------+-----------------+---------------+-----+
|"""""fixed acidity""""|""""volatile acidity""""|""""citric acid""""|""""residual sugar""""|""""chlorides""""|""""free sulfur dioxide""""|""""total sulfur dioxide""""|""""density""""|""""pH""""|""""sulphates""""|""""alcohol""""|label|
+----------------------+------------------------+-------------------+----------------------+-----------------+---------------------------+----------------------------+---------------+----------+-----------------+---------------+-----+
|                     0|                       0|                  0|                     0|                0|                          0|                           0|              0|         0|                0|              0|    0|
+----------------------+------------------------+-----------

In [10]:
df.dtypes

[('"""""fixed acidity""""', 'float'),
 ('""""volatile acidity""""', 'float'),
 ('""""citric acid""""', 'float'),
 ('""""residual sugar""""', 'float'),
 ('""""chlorides""""', 'float'),
 ('""""free sulfur dioxide""""', 'float'),
 ('""""total sulfur dioxide""""', 'float'),
 ('""""density""""', 'float'),
 ('""""pH""""', 'float'),
 ('""""sulphates""""', 'float'),
 ('""""alcohol""""', 'float'),
 ('label', 'float')]

In [25]:
features =np.array(df.select(df.columns[0:-1]).collect())
label = np.array(df.select('label').collect())

In [26]:
label

array([[6.],
       [5.],
       [5.],
       ...,
       [6.],
       [5.],
       [6.]])

In [27]:
#creating the feature vector
VectorAssembler = VectorAssembler(inputCols = df.columns[0:-1] , outputCol = 'features')
df_tr = VectorAssembler.transform(df)
df_tr = df_tr.select(['features','label'])

In [28]:
df_tr.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[8.89999961853027...|  6.0|
|[7.59999990463256...|  5.0|
|[7.90000009536743...|  5.0|
|[8.5,0.4900000095...|  5.0|
|[6.90000009536743...|  6.0|
|[6.30000019073486...|  5.0|
|[7.59999990463256...|  5.0|
|[7.90000009536743...|  5.0|
|[7.09999990463256...|  5.0|
|[7.80000019073486...|  6.0|
|[6.69999980926513...|  5.0|
|[6.90000009536743...|  6.0|
|[8.30000019073486...|  5.0|
|[6.90000009536743...|  6.0|
|[5.19999980926513...|  5.0|
|[7.80000019073486...|  6.0|
|[7.80000019073486...|  6.0|
|[8.10000038146972...|  7.0|
|[5.69999980926513...|  4.0|
|[7.30000019073486...|  5.0|
+--------------------+-----+
only showing top 20 rows



In [15]:
#The following function creates the labeledpoint and parallelize it to convert it into RDD
def to_labeled_point(sc, features, labels, categorical=False):
    labeled_points = []
    for x, y in zip(features, labels):
        lp = LabeledPoint(y, x)
        labeled_points.append(lp)
    return sc.parallelize(labeled_points)

In [18]:
#rdd converted dataset
dataset = to_labeled_point(sc, features, label)

#Splitting the dataset into train and test
training, test = dataset.randomSplit([0.8, 0.2],seed = 5)

In [23]:
dataset

ParallelCollectionRDD[40] at readRDDFromFile at PythonRDD.scala:274

In [24]:
training

PythonRDD[43] at RDD at PythonRDD.scala:53