# Logistic Regression with PySpark

## Download dataset

In [None]:
! wget -P dataset https://www.dropbox.com/s/d0t4lgw1gsq9t2r/pima-indians-diabetes.csv

In [None]:
! ls -lah dataset

In [None]:
! wc -l dataset/pima-indians-diabetes.csv

In [None]:
! head dataset/pima-indians-diabetes.csv

Dataset description

*   Pregnancies: Number of times pregnant
*   Glucose: Plasma glucose concentration a 2 hours in an oral glucose tolerance test
*   BloodPressure: Diastolic blood pressure (mm Hg)
*   SkinThickness: Triceps skin fold thickness (mm)
*   Insulin: 2-Hour serum insulin (mu U/ml)
*   BMI: Body mass index (weight in kg/(height in m)^2)
*   DiabetesPedigreeFunction: Diabetes pedigree function
*   Age: Age (years)
*   Outcome: Class variable --> 0 : no diabetes, 1: diabetes

https://www.kaggle.com/uciml/pima-indians-diabetes-database


## Apache Spark Installation

Installing openjdk-8-jdk

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Installing Apache Spark 2.4.7

In [None]:
!wget https://archive.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

## Creating Spark Session

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
print("Spark version : " + spark.version)

In [None]:
import sys
print("Python version : " + sys.version)

In [None]:
#sc=spark.sparkContext
#sqlContext=SQLContext(sc)

## Read File

In [None]:
df_diabetes = spark.read.csv(path='dataset/pima-indians-diabetes.csv', 
                            sep=',', 
                            header = True, 
                            inferSchema = True)

In [None]:
df_diabetes.count()

In [None]:
df_diabetes.show(4)

In [None]:
df_diabetes.printSchema()

## Statistics

In [None]:
df_diabetes.describe().show()

In [None]:
numeric_features = [t[0] for t in df_diabetes.dtypes if t[1] == 'int']
df_diabetes.select(numeric_features).describe().toPandas()

## Distribution of Target/Outcome

In [None]:
df_diabetes.groupby("Outcome").count().show()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

sns.set_context('notebook')

In [None]:
data = df_diabetes.groupby("Outcome").count().toPandas()

In [None]:
data.head()

In [None]:
sns.barplot(x='Outcome', y='count', data=data)

## Distribution of Features

In [None]:
df_diabetes.groupby("Pregnancies").count().show()

In [None]:
plt.figure(figsize=(15,8))
sns.barplot(x='Pregnancies', y='count', data=df_diabetes.groupby("Pregnancies").count().toPandas())

In [None]:
plt.figure(figsize=(25,8))
sns.barplot(x='Glucose', y='count', data=df_diabetes.groupby("Glucose").count().toPandas())

## Correlation

In [None]:
numeric_features = [t[0] for t in df_diabetes.dtypes if t[1] != 'string']
numeric_features_df=df_diabetes.select(numeric_features)
numeric_features_df.toPandas().head()

In [None]:
from pyspark.mllib.stat import Statistics

col_names =numeric_features_df.columns
features = numeric_features_df.rdd.map(lambda row: row[0:])
corr_mat=Statistics.corr(features, method="pearson")

corr_mat

In [None]:
corr_df = pd.DataFrame(corr_mat)
corr_df.index, corr_df.columns = col_names, col_names

corr_df

In [None]:
sns.heatmap(corr_df, 
        xticklabels=corr_df.columns,
        yticklabels=corr_df.columns)

## Feature Extraction

In [None]:
from pyspark.ml.feature import VectorAssembler

inputCols = [
 'Glucose',
 'Insulin',
 'BMI'
]
outputCol = "features"

df_va = VectorAssembler(inputCols = inputCols, outputCol = outputCol)

df_diabetes = df_va.transform(df_diabetes)
#df.select(['features']).toPandas().head(5)

In [None]:
df_diabetes.show(5)

## Model Training

In [None]:
train, test = df_diabetes.randomSplit([0.8, 0.2], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
train.show(5)

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=5)
lrModel = lr.fit(train)

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

## Evaluate Model

In [None]:
predictions = lrModel.transform(test)
predictions.select('Outcome', 'features',  'rawPrediction', 'prediction', 'probability').toPandas().head(5)

In [None]:
accuracy = predictions.filter(predictions.Outcome == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

In [None]:
from sklearn.metrics import confusion_matrix
class_names=[1.0,0.0]
import itertools
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('Outcome label')
    plt.xlabel('Predicted label')

In [None]:
y_true = predictions.select("Outcome")
y_true = y_true.toPandas()

y_pred = predictions.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_names)
#cnf_matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names,
                      title='Confusion matrix')
plt.show()