# **Setting Spark**

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget --no-check-certificate \
    https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz \
    -O /content/spark-2.4.4-bin-hadoop2.7.tgz

--2021-01-17 06:55:28--  https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 2a01:4f8:10a:201a::2
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 219929956 (210M) [application/x-gzip]
Saving to: ‘/content/spark-2.4.4-bin-hadoop2.7.tgz’


2021-01-17 06:55:38 (23.6 MB/s) - ‘/content/spark-2.4.4-bin-hadoop2.7.tgz’ saved [219929956/219929956]



In [None]:
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 76kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 46.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=6779294e50a1778db23505df529d60fd72e9b309a4744abab92a107b532f9e87
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init("spark-3.0.1-bin-hadoop2.7")

In [None]:
from pyspark import SparkContext
sparkContext = SparkContext.getOrCreate()

# **Input & Pre-Processing Data**

In [None]:
from __future__ import print_function

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext
import pyspark

%matplotlib inline

print (sparkContext.version)

3.0.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')
!ls

Mounted at /content/drive
drive  sample_data  spark-2.4.4-bin-hadoop2.7.tgz  spark-3.0.1-bin-hadoop2.7


In [None]:
%cd /content/drive/MyDrive/ABD UAS/
!ls

/content/drive/MyDrive/ABD UAS
dataset.csv


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

data_dir = "dataset.csv"
df = pd.read_csv(data_dir)

df.head()

Unnamed: 0,id,Number of times pregnant,Plasma glucose concentration a 2 hours in an oral glucose tolerance test,Diastolic blood pressure (mm Hg),Triceps skin fold thickness (mm),2-Hour serum insulin (mu U/ml),Body mass index (weight in kg/(height in m)^2),Diabetes pedigree function,Age (years),label
0,1,6,148,72,35,0,33.6,0.627,50,1
1,2,1,85,66,29,0,26.6,0.351,31,0
2,3,8,183,64,0,0,23.3,0.672,32,1
3,4,1,89,66,23,94,28.1,0.167,21,0
4,5,0,137,40,35,168,43.1,2.288,33,1


In [None]:
!ls

dataset.csv


# **Read CSV to Spark data frame**

In [None]:
data_dir = "dataset.csv"
data = pd.read_csv(data_dir)

In [None]:
sqlContext = SQLContext(sparkContext)

In [None]:
FEATURES_COL = df.columns[1:8].to_list()
path = 'drive/MyDrive/ABD UAS/dataset.csv'

In [None]:
df = sqlContext.read.csv(path, header=True) 
df.show()

+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
| id|Number of times pregnant|Plasma glucose concentration a 2 hours in an oral glucose tolerance test|Diastolic blood pressure (mm Hg)| Triceps skin fold thickness (mm)|2-Hour serum insulin (mu U/ml)|Body mass index (weight in kg/(height in m)^2)|Diabetes pedigree function|Age (years)|label|
+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
|  1|                       6|                                                                     148|               

In [None]:
lines = sparkContext.textFile(path)
data = lines.map(lambda line: line.split(","))
data.take(2)

[['id',
  'Number of times pregnant',
  'Plasma glucose concentration a 2 hours in an oral glucose tolerance test',
  'Diastolic blood pressure (mm Hg)',
  ' Triceps skin fold thickness (mm)',
  '2-Hour serum insulin (mu U/ml)',
  'Body mass index (weight in kg/(height in m)^2)',
  'Diabetes pedigree function',
  'Age (years)',
  'label'],
 ['1', '6', '148', '72', '35', '0', '33.6', '0.627', '50', '1']]

In [None]:
df = data.toDF(df.columns)
print (df)
df.show()

DataFrame[id: string, Number of times pregnant: string, Plasma glucose concentration a 2 hours in an oral glucose tolerance test: string, Diastolic blood pressure (mm Hg): string,  Triceps skin fold thickness (mm): string, 2-Hour serum insulin (mu U/ml): string, Body mass index (weight in kg/(height in m)^2): string, Diabetes pedigree function: string, Age (years): string, label: string]
+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
| id|Number of times pregnant|Plasma glucose concentration a 2 hours in an oral glucose tolerance test|Diastolic blood pressure (mm Hg)| Triceps skin fold thickness (mm)|2-Hour serum insulin (mu U/ml)|Body mass index (weight in kg/(height in m)^2)|Diabetes pedigree function|Age (years)|label|
+---+----------------

# **Convert data (toFloat)**

In [None]:
df_feat = df.select(*(df[c].cast("float").alias(c) for c in df.columns[1:8]))
df_feat.show()

+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+
|Number of times pregnant|Plasma glucose concentration a 2 hours in an oral glucose tolerance test|Diastolic blood pressure (mm Hg)| Triceps skin fold thickness (mm)|2-Hour serum insulin (mu U/ml)|Body mass index (weight in kg/(height in m)^2)|Diabetes pedigree function|
+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+
|                    null|                                                                    null|                            null|                             null|                  

In [None]:
for col in df.columns:
    if col in FEATURES_COL:
       # df = df.withColumn(col,df[col].cast('float'),"label", df.label.cast('numeric'))
        df = df.withColumn("label", df.label.cast('numeric'))
df.show()

+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
| id|Number of times pregnant|Plasma glucose concentration a 2 hours in an oral glucose tolerance test|Diastolic blood pressure (mm Hg)| Triceps skin fold thickness (mm)|2-Hour serum insulin (mu U/ml)|Body mass index (weight in kg/(height in m)^2)|Diabetes pedigree function|Age (years)|label|
+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
|  1|                       6|                                                                     148|               

# **Drop Null Value**

In [None]:
df = df.na.drop()
df.show()

+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
| id|Number of times pregnant|Plasma glucose concentration a 2 hours in an oral glucose tolerance test|Diastolic blood pressure (mm Hg)| Triceps skin fold thickness (mm)|2-Hour serum insulin (mu U/ml)|Body mass index (weight in kg/(height in m)^2)|Diabetes pedigree function|Age (years)|label|
+---+------------------------+------------------------------------------------------------------------+--------------------------------+---------------------------------+------------------------------+----------------------------------------------+--------------------------+-----------+-----+
|  1|                       6|                                                                     148|               

In [None]:
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
df_logistic = vecAssembler.transform(df).select('label', 'features')
df_logistic.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|[6.0,148.0,72.0,3...|
|    0|[1.0,85.0,66.0,29...|
|    1|[8.0,183.0,64.0,0...|
|    0|[1.0,89.0,66.0,23...|
|    1|[0.0,137.0,40.0,3...|
|    0|[5.0,116.0,74.0,0...|
|    1|[3.0,78.0,50.0,32...|
|    0|(7,[0,1,5],[10.0,...|
|    1|[2.0,197.0,70.0,4...|
|    1|(7,[0,1,2],[8.0,1...|
|    0|[4.0,110.0,92.0,0...|
|    1|[10.0,168.0,74.0,...|
|    0|[10.0,139.0,80.0,...|
|    1|[1.0,189.0,60.0,2...|
|    1|[5.0,166.0,72.0,1...|
|    1|(7,[0,1,5],[7.0,1...|
|    1|[0.0,118.0,84.0,4...|
|    1|[7.0,107.0,74.0,0...|
|    0|[1.0,103.0,30.0,3...|
|    1|[1.0,115.0,70.0,3...|
+-----+--------------------+
only showing top 20 rows



**SPLITTING DATASET**

In [None]:
data_train, data_test=df_logistic.randomSplit([0.8,0.2], 24)

**MODELING**

In [None]:
from pyspark.ml.classification import LogisticRegression

In [None]:
lr=LogisticRegression(featuresCol='features', labelCol='label')
lr_model = lr.fit(data_train)

#Transform model to data test
lr_result = lr_model.transform(data_test)

In [None]:
#view id, label, prediction and probability from result of modelling
lr_result.select('label', 'prediction', 'probability').show(5)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.93331400166612...|
|    0|       0.0|[0.91797488997918...|
|    0|       0.0|[0.67829077120461...|
|    0|       0.0|[0.93443022463795...|
|    0|       1.0|[0.39032984046243...|
+-----+----------+--------------------+
only showing top 5 rows



***EVALUATION***

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
#Evaluate model by checking accuracy and AUC value
lr_eval = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="label")
lr_eval2= MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
lr_AUC  = lr_eval.evaluate(lr_result)
lr_ACC  = lr_eval2.evaluate(lr_result, {lr_eval2.metricName:"accuracy"})

print("Logistic Regression Performance Measure")
print("Accuracy = %0.2f" % lr_ACC)
print("AUC = %.2f" % lr_AUC)

Logistic Regression Performance Measure
Accuracy = 0.75
AUC = 0.87
