In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ClassificationwithSpark").getOrCreate()

In [4]:

from itertools import chain
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [6]:

df1 = spark.read.csv('/content/drive/MyDrive/iot/dementia_dataset.csv',
                     header=True, inferSchema=True)

In [7]:

df1.printSchema()

root
 |-- Subject ID: string (nullable = true)
 |-- MRI ID: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Visit: integer (nullable = true)
 |-- MR Delay: integer (nullable = true)
 |-- M/F: string (nullable = true)
 |-- Hand: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- SES: integer (nullable = true)
 |-- MMSE: integer (nullable = true)
 |-- CDR: double (nullable = true)
 |-- eTIV: integer (nullable = true)
 |-- nWBV: double (nullable = true)
 |-- ASF: double (nullable = true)



In [8]:

df1.show()

+----------+-------------+-----------+-----+--------+---+----+---+----+----+----+---+----+-----+-----+
|Subject ID|       MRI ID|      Group|Visit|MR Delay|M/F|Hand|Age|EDUC| SES|MMSE|CDR|eTIV| nWBV|  ASF|
+----------+-------------+-----------+-----+--------+---+----+---+----+----+----+---+----+-----+-----+
| OAS2_0001|OAS2_0001_MR1|Nondemented|    1|       0|  M|   R| 87|  14|   2|  27|0.0|1987|0.696|0.883|
| OAS2_0001|OAS2_0001_MR2|Nondemented|    2|     457|  M|   R| 88|  14|   2|  30|0.0|2004|0.681|0.876|
| OAS2_0002|OAS2_0002_MR1|   Demented|    1|       0|  M|   R| 75|  12|null|  23|0.5|1678|0.736|1.046|
| OAS2_0002|OAS2_0002_MR2|   Demented|    2|     560|  M|   R| 76|  12|null|  28|0.5|1738|0.713| 1.01|
| OAS2_0002|OAS2_0002_MR3|   Demented|    3|    1895|  M|   R| 80|  12|null|  22|0.5|1698|0.701|1.034|
| OAS2_0004|OAS2_0004_MR1|Nondemented|    1|       0|  F|   R| 88|  18|   3|  28|0.0|1215| 0.71|1.444|
| OAS2_0004|OAS2_0004_MR2|Nondemented|    2|     538|  F|   R| 90|  18|  

In [9]:
df1.limit(100).toPandas()

Unnamed: 0,Subject ID,MRI ID,Group,Visit,MR Delay,M/F,Hand,Age,EDUC,SES,MMSE,CDR,eTIV,nWBV,ASF
0,OAS2_0001,OAS2_0001_MR1,Nondemented,1,0,M,R,87,14,2.0,27,0.0,1987,0.696,0.883
1,OAS2_0001,OAS2_0001_MR2,Nondemented,2,457,M,R,88,14,2.0,30,0.0,2004,0.681,0.876
2,OAS2_0002,OAS2_0002_MR1,Demented,1,0,M,R,75,12,,23,0.5,1678,0.736,1.046
3,OAS2_0002,OAS2_0002_MR2,Demented,2,560,M,R,76,12,,28,0.5,1738,0.713,1.010
4,OAS2_0002,OAS2_0002_MR3,Demented,3,1895,M,R,80,12,,22,0.5,1698,0.701,1.034
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,OAS2_0047,OAS2_0047_MR1,Nondemented,1,0,F,R,77,16,2.0,29,0.0,1433,0.723,1.225
96,OAS2_0047,OAS2_0047_MR2,Nondemented,2,486,F,R,78,16,2.0,27,0.0,1414,0.727,1.242
97,OAS2_0048,OAS2_0048_MR1,Demented,1,0,M,R,66,16,1.0,19,1.0,1695,0.711,1.036
98,OAS2_0048,OAS2_0048_MR2,Demented,2,248,M,R,66,16,1.0,21,1.0,1708,0.703,1.028


In [10]:
df1 = df1.drop("Subject ID", "MRI ID")
df1.show()

+-----------+-----+--------+---+----+---+----+----+----+---+----+-----+-----+
|      Group|Visit|MR Delay|M/F|Hand|Age|EDUC| SES|MMSE|CDR|eTIV| nWBV|  ASF|
+-----------+-----+--------+---+----+---+----+----+----+---+----+-----+-----+
|Nondemented|    1|       0|  M|   R| 87|  14|   2|  27|0.0|1987|0.696|0.883|
|Nondemented|    2|     457|  M|   R| 88|  14|   2|  30|0.0|2004|0.681|0.876|
|   Demented|    1|       0|  M|   R| 75|  12|null|  23|0.5|1678|0.736|1.046|
|   Demented|    2|     560|  M|   R| 76|  12|null|  28|0.5|1738|0.713| 1.01|
|   Demented|    3|    1895|  M|   R| 80|  12|null|  22|0.5|1698|0.701|1.034|
|Nondemented|    1|       0|  F|   R| 88|  18|   3|  28|0.0|1215| 0.71|1.444|
|Nondemented|    2|     538|  F|   R| 90|  18|   3|  27|0.0|1200|0.718|1.462|
|Nondemented|    1|       0|  M|   R| 80|  12|   4|  28|0.0|1689|0.712|1.039|
|Nondemented|    2|    1010|  M|   R| 83|  12|   4|  29|0.5|1701|0.711|1.032|
|Nondemented|    3|    1603|  M|   R| 85|  12|   4|  30|0.0|1699

In [12]:

print('Number of rows: \t', df1.count())
print('Number of columns: \t', len(df1.columns))

Number of rows: 	 373
Number of columns: 	 13


In [13]:
#Grouping target cloumn according to result
df1.groupBy('Group').count().show()

+-----------+-----+
|      Group|count|
+-----------+-----+
|   Demented|  146|
|Nondemented|  190|
|  Converted|   37|
+-----------+-----+



In [14]:
for col in df1.columns:
    print(col.ljust(15), df1.filter(df1[col].isNull()).count())

Group           0
Visit           0
MR Delay        0
M/F             0
Hand            0
Age             0
EDUC            0
SES             19
MMSE            2
CDR             0
eTIV            0
nWBV            0
ASF             0


In [15]:
#Finding max and meaan value
df = df1.select('MMSE', 'MMSE').summary('max', "50%", "mean")
df.show()

+-------+------------------+------------------+
|summary|              MMSE|              MMSE|
+-------+------------------+------------------+
|    max|                30|                30|
|    50%|                29|                29|
|   mean|27.342318059299192|27.342318059299192|
+-------+------------------+------------------+



In [16]:
#Filling missing value
df1 = df1.fillna({'MMSE': '27', 'MMSE':14})

In [17]:
#Finding max and meaan value
df = df1.select('SES', 'SES').summary('max', "50%", "mean")
df.show()

+-------+------------------+------------------+
|summary|               SES|               SES|
+-------+------------------+------------------+
|    max|                 5|                 5|
|    50%|                 2|                 2|
|   mean|2.4604519774011298|2.4604519774011298|
+-------+------------------+------------------+



In [18]:
#Filling missing values
df1 = df1.fillna({'SES': '2', 'SES':2})

In [19]:
for col in df1.columns:
    print(col.ljust(15), df1.filter(df1[col].isNull()).count())

Group           0
Visit           0
MR Delay        0
M/F             0
Hand            0
Age             0
EDUC            0
SES             0
MMSE            0
CDR             0
eTIV            0
nWBV            0
ASF             0


IMPLEMENTATION

In [20]:

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression,\
                    RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [21]:
stringIndex = StringIndexer(inputCols=['Group', 'M/F','Hand'],
                       outputCols=['Result', 'Gender','HAND'])

stringIndex_model = stringIndex.fit(df1)

df1_ = stringIndex_model.transform(df1).drop('Group', 'M/F','Hand')
df1_.show(10)

+-----+--------+---+----+---+----+---+----+-----+-----+------+------+
|Visit|MR Delay|Age|EDUC|SES|MMSE|CDR|eTIV| nWBV|  ASF|Result|Gender|
+-----+--------+---+----+---+----+---+----+-----+-----+------+------+
|    1|       0| 87|  14|  2|  27|0.0|1987|0.696|0.883|   0.0|   1.0|
|    2|     457| 88|  14|  2|  30|0.0|2004|0.681|0.876|   0.0|   1.0|
|    1|       0| 75|  12|  2|  23|0.5|1678|0.736|1.046|   1.0|   1.0|
|    2|     560| 76|  12|  2|  28|0.5|1738|0.713| 1.01|   1.0|   1.0|
|    3|    1895| 80|  12|  2|  22|0.5|1698|0.701|1.034|   1.0|   1.0|
|    1|       0| 88|  18|  3|  28|0.0|1215| 0.71|1.444|   0.0|   0.0|
|    2|     538| 90|  18|  3|  27|0.0|1200|0.718|1.462|   0.0|   0.0|
|    1|       0| 80|  12|  4|  28|0.0|1689|0.712|1.039|   0.0|   1.0|
|    2|    1010| 83|  12|  4|  29|0.5|1701|0.711|1.032|   0.0|   1.0|
|    3|    1603| 85|  12|  4|  30|0.0|1699|0.705|1.033|   0.0|   1.0|
+-----+--------+---+----+---+----+---+----+-----+-----+------+------+
only showing top 10 

In [22]:
#convert dataset to model
vec_asmbl = VectorAssembler(inputCols=df1_.columns[1:],
                           outputCol='features')

df1_ = vec_asmbl.transform(df1_).select('features', 'Result')
df1_.show(4, truncate=False)

+---------------------------------------------------------+------+
|features                                                 |Result|
+---------------------------------------------------------+------+
|[0.0,87.0,14.0,2.0,27.0,0.0,1987.0,0.696,0.883,0.0,1.0]  |0.0   |
|[457.0,88.0,14.0,2.0,30.0,0.0,2004.0,0.681,0.876,0.0,1.0]|0.0   |
|[0.0,75.0,12.0,2.0,23.0,0.5,1678.0,0.736,1.046,1.0,1.0]  |1.0   |
|[560.0,76.0,12.0,2.0,28.0,0.5,1738.0,0.713,1.01,1.0,1.0] |1.0   |
+---------------------------------------------------------+------+
only showing top 4 rows



In [23]:
# Spliting train and validation set
train_df, valid_df = df1_.randomSplit([0.7, 0.3])

In [24]:
evaluator = MulticlassClassificationEvaluator(labelCol='Result',
                                          metricName='accuracy')

In [25]:
#Ridge Regression model
ridge = LogisticRegression(labelCol='Result',
                        maxIter=200,
                        elasticNetParam=0,
                        regParam=0.05)

model = ridge.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.9847328244274809

In [26]:
#Lasso Regression model
lasso = LogisticRegression(labelCol='Result',
                           maxIter=150,
                           elasticNetParam=1,
                           regParam=0.0003)

model = lasso.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.9847328244274809