# MLlib PySpark

As you may know, PySpark is an open source programming framework for a quick data distributed processing. It was developed by Apache.

Inside the framework you can find different tool, each of the with a particular purpose. In this case, we are going to use the library MLlib, which is a ML distributed library for training and testing regression, classification and clustering (and more) models. 

We will use diabetes dataset, where we have to predict if a particular person will have diabetes or not, depending on different blood and health indicators.

First thing, as always, is importing different libraries we will use

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

### CREATE SESSION

One important thing is creating a SparkSession, which is the connector (kind of an API) between our script and Spark Core. In this case we will call the app name *diabetesPrediction*

In [2]:
session = SparkSession.builder.appName("diabetesPrediction").getOrCreate()

### READ DATA

Once the session is created, is time to import data:

In [3]:
df = session.read.option("header","True").option("inferSchema","True").option("sep",",").csv("./dataPySpark/diabetes.csv")

We can also count different rows and columns in our dataset, and also show the first rows. The most interesting thing is that you can see this different outputs by using same functions as pandas dataframes

In [4]:
print("Hay ",df.count(),"rows",len(df.columns),"columnas en los datos")

Hay  768 rows 9 columnas en los datos


In [5]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|     0.35100000000000003| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|     0.16699999999999998| 21|      0|
|          0|    137|           40|           35|    168|43.1|      2.2880000000000003| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


Also, we can see the schema (variable types) of the dataset:

In [6]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



Now, time to remane columns and creating a list with the columns we will use in the future. Is important to rename the target column because, by default, pyspark takes the label column as a target one:

In [None]:
df = df.withColumnRenamed("Outcome","label")
columnas = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age']

### PIPELINE CREATION FOR DATA TRANSFORMATION

Once renamed we will start by creating our pipeline. First stape will be adding an assembler in order to create one column for each row that will contain a vector with all row information. The result of this transformation will be the vectorized_features column

In [8]:
assembler = VectorAssembler(inputCols = columnas,outputCol = "vectorized_features")

Then we will create a scaler function for transforming all data to a normalized values

In [None]:
scaler = MinMaxScaler(inputCol = "vectorized_features",outputCol = "features")

This will be pipeline result:

In [9]:
stages = []
stages += [assembler]
stages += [scaler]

And now, let's transform data:

In [10]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)

In [11]:
df = pipelineModel.transform(df)

As you can see, the result after the transformations is a features column that contains a vector for each row and a label column for the supervised training

In [13]:
df = df.select(["features","label"])
df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.74371859296482...|    1|
|[0.42713567839195...|    0|
|[0.91959798994974...|    1|
|[0.44723618090452...|    0|
|[0.68844221105527...|    1|
|[0.58291457286432...|    0|
|[0.39195979899497...|    1|
|[0.57788944723618...|    0|
|[0.98994974874371...|    1|
|[0.62814070351758...|    1|
|[0.55276381909547...|    0|
|[0.84422110552763...|    1|
|[0.69849246231155...|    0|
|[0.94974874371859...|    1|
|[0.83417085427135...|    1|
|[0.50251256281407...|    1|
|[0.59296482412060...|    1|
|[0.53768844221105...|    1|
|[0.51758793969849...|    0|
|[0.57788944723618...|    1|
+--------------------+-----+
only showing top 20 rows



### TRAIN-TEST SPLIT

In this moment we will split the dataset in two groups, a train and a test sets:

In [12]:
train, test = df.randomSplit([0.8,0.2],seed = 1234)

### MODEL EVALUATION WITH AUC DEFINITION

Now is time to define different models. We will try by using an easy logistic regression and a decission tree classifier and AUC metric for measuring our model quality. PySpark has the BinaryClassificationEvaluator class for AUC calculation

In [14]:
evaluator = BinaryClassificationEvaluator()

### LOGISTIC REGRESSION MODEL

First, let's start with logisticRegression, that we are defining with the input data (features column), output data (labels) and max iterations

In [15]:
lr = LogisticRegression(featuresCol = "features", labelCol = "label", maxIter = 10)

#### Training model over train dataset

Now is time for training the model:

In [16]:
lrModel = lr.fit(train)

And now, predict the results both in our training and test sets

#### Prediction over train dataset

In [17]:
lr_predictionsTrain = lrModel.transform(train)

#### Prediction over test dataset

In [18]:
lr_predictionsTest = lrModel.transform(test)

#### Logistic regresion evaluation

In [19]:
print('Training Area Under ROC', evaluator.evaluate(lr_predictionsTrain))

Test Area Under ROC 0.8357641867875993


In [20]:
print('Test Area Under ROC', evaluator.evaluate(lr_predictionsTest))

Test Area Under ROC 0.8190164954870834


As a result you can see a 0.83 AUC for the training set and a 0.82 AUC for the test set

### DECISION TREE CLASSIFIER

Let's repeat the prediction process but now using a DecissionTreeClassifier

As before, we define the input (featuresCol) and output (labelCol) for training the model:

In [21]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

#### Training model over train dataset

Then, train the model:

In [22]:
dtModel = dt.fit(train)

And last step, predict over the training and test data:

#### Prediction over train dataset

In [23]:
dt_predictionTrain = dtModel.transform(train)

#### Prediction over test dataset

In [24]:
dt_predictionTest = dtModel.transform(test)

#### Decision tree evaluation

In [25]:
print('Training Area Under ROC', evaluator.evaluate(dt_predictionTrain))

Test Area Under ROC 0.7174592474567961


In [26]:
print('Test Area Under ROC', evaluator.evaluate(dt_predictionTest))

Test Area Under ROC 0.6316526610644257


As a result you will see a 0.71 AUC for training set and 0.63 AUC for test set

Looks like in this case a LogisticRegression fits better than a DecissionTreeClassifier model.