In [81]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('INFOSYS722_Lteration4_zzho921').getOrCreate()

In [82]:
#Use spark to read data.
data_ilp = spark.read.csv('indian_liver_patient.csv', header=True, inferSchema=True)
data_ilp.show()

+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
|Age|Gender|Total_Bilirubin|Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|Total_Protiens|Albumin|Albumin_and_Globulin_Ratio|Dataset|
+---+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
| 65|Female|            0.7|             0.1|                 187|                      16|                        18|           6.8|    3.3|                       0.9|      1|
| 62|  Male|           10.9|             5.5|                 699|                      64|                       100|           7.5|    3.2|                      0.74|      1|
| 62|  Male|            7.3|             4.1|                 490|                      60|                        

In [83]:
#View the columns of the data.
data_ilp.columns

['Age',
 'Gender',
 'Total_Bilirubin',
 'Direct_Bilirubin',
 'Alkaline_Phosphotase',
 'Alamine_Aminotransferase',
 'Aspartate_Aminotransferase',
 'Total_Protiens',
 'Albumin',
 'Albumin_and_Globulin_Ratio',
 'Dataset']

In [84]:
#View the size of the data.
data_ilp.count(),len(data_ilp.columns)

(583, 11)

In [85]:
#View the format of each attribute column of the data.
data_ilp.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Total_Bilirubin: double (nullable = true)
 |-- Direct_Bilirubin: double (nullable = true)
 |-- Alkaline_Phosphotase: integer (nullable = true)
 |-- Alamine_Aminotransferase: integer (nullable = true)
 |-- Aspartate_Aminotransferase: integer (nullable = true)
 |-- Total_Protiens: double (nullable = true)
 |-- Albumin: double (nullable = true)
 |-- Albumin_and_Globulin_Ratio: double (nullable = true)
 |-- Dataset: integer (nullable = true)



In [86]:
#View the data mean value of each attribute of the data, min, max, etc.
data_ilp.describe().show()

+-------+------------------+------+-----------------+------------------+--------------------+------------------------+--------------------------+------------------+-----------------+--------------------------+------------------+
|summary|               Age|Gender|  Total_Bilirubin|  Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|    Total_Protiens|          Albumin|Albumin_and_Globulin_Ratio|           Dataset|
+-------+------------------+------+-----------------+------------------+--------------------+------------------------+--------------------------+------------------+-----------------+--------------------------+------------------+
|  count|               583|   583|              583|               583|                 583|                     583|                       583|               583|              583|                       579|               583|
|   mean| 44.74614065180103|  null|3.298799313893652|1.4861063464837074|  290.576329

In [87]:
#View summary of age attributes.
data_ilp.select('Age').describe().show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|               583|
|   mean| 44.74614065180103|
| stddev|16.189833304694375|
|    min|                 4|
|    max|                90|
+-------+------------------+



In [88]:
#View the proportion of missing values in the data.
import pyspark.sql.functions as fn
data_ilp.agg(*[(1-(fn.count(c) /fn.count('*'))).alias(c+'_missing') for c in data_ilp.columns]).show()

+-----------+--------------+-----------------------+------------------------+----------------------------+--------------------------------+----------------------------------+----------------------+---------------+----------------------------------+---------------+
|Age_missing|Gender_missing|Total_Bilirubin_missing|Direct_Bilirubin_missing|Alkaline_Phosphotase_missing|Alamine_Aminotransferase_missing|Aspartate_Aminotransferase_missing|Total_Protiens_missing|Albumin_missing|Albumin_and_Globulin_Ratio_missing|Dataset_missing|
+-----------+--------------+-----------------------+------------------------+----------------------------+--------------------------------+----------------------------------+----------------------+---------------+----------------------------------+---------------+
|        0.0|           0.0|                    0.0|                     0.0|                         0.0|                             0.0|                               0.0|                   0.0|        

In [89]:
missing_num = data_ilp[data_ilp['Albumin_and_Globulin_Ratio'].isNull()]
missing_num.count()

4

In [91]:
#Convert data to pandas dataframe format
pandas_df = data_ilp.toPandas()

In [92]:
#Change the male and female of gender attribute to 0 and 1.
import numpy as np
#Label gender attributes and change data types.
class_mapping1 = {label:idx for idx,label in enumerate(np.unique(pandas_df['Gender']))}
pandas_df['Gender'] = pandas_df['Gender'].map(class_mapping1)
print(pandas_df['Gender'].value_counts())

1    441
0    142
Name: Gender, dtype: int64


In [93]:
#Convert data back to spark dataframe format.
values = pandas_df.values.tolist()
columns = pandas_df.columns.tolist()
data_ilp_temp = spark.createDataFrame(values, columns)
data_ilp_temp.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- Total_Bilirubin: double (nullable = true)
 |-- Direct_Bilirubin: double (nullable = true)
 |-- Alkaline_Phosphotase: double (nullable = true)
 |-- Alamine_Aminotransferase: double (nullable = true)
 |-- Aspartate_Aminotransferase: double (nullable = true)
 |-- Total_Protiens: double (nullable = true)
 |-- Albumin: double (nullable = true)
 |-- Albumin_and_Globulin_Ratio: double (nullable = true)
 |-- Dataset: double (nullable = true)



In [94]:
#Deal with missing values in data.
data_ilp_temp.count()

583

In [95]:
data_ilp_temp.na.drop().count()

579

In [96]:
data_ilp_c = data_ilp_temp.na.drop()
data_ilp_c.agg(*[(1-(fn.count(c) /fn.count('*'))).alias(c+'_missing') for c in data_ilp_c.columns]).show()

+-----------+--------------+-----------------------+------------------------+----------------------------+--------------------------------+----------------------------------+----------------------+---------------+----------------------------------+---------------+
|Age_missing|Gender_missing|Total_Bilirubin_missing|Direct_Bilirubin_missing|Alkaline_Phosphotase_missing|Alamine_Aminotransferase_missing|Aspartate_Aminotransferase_missing|Total_Protiens_missing|Albumin_missing|Albumin_and_Globulin_Ratio_missing|Dataset_missing|
+-----------+--------------+-----------------------+------------------------+----------------------------+--------------------------------+----------------------------------+----------------------+---------------+----------------------------------+---------------+
|        0.0|           0.0|                    0.0|                     0.0|                         0.0|                             0.0|                               0.0|                   0.0|        

In [97]:
data_1 = spark.read.csv("indian_liver_patient_2.csv",inferSchema =True,header=True)
#View the size of the data.
data_1.count(),len(data_1.columns)

(221, 11)

In [98]:
data_2 = spark.read.csv("indian_liver_patient_3.csv",inferSchema =True,header=True)
#View the size of the data.
data_2.count(),len(data_2.columns)

(362, 11)

In [99]:
data_union = data_1.union(data_2)
#View the size of the data.
data_union.count(),len(data_union.columns)

(583, 11)

In [100]:
data_ilp_2 = data_ilp_c.orderBy('Age',ascending=True)
data_ilp_2.show()

+----+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
| Age|Gender|Total_Bilirubin|Direct_Bilirubin|Alkaline_Phosphotase|Alamine_Aminotransferase|Aspartate_Aminotransferase|Total_Protiens|Albumin|Albumin_and_Globulin_Ratio|Dataset|
+----+------+---------------+----------------+--------------------+------------------------+--------------------------+--------------+-------+--------------------------+-------+
| 4.0|   1.0|            0.9|             0.2|               348.0|                    30.0|                      34.0|           8.0|    4.0|                       1.0|    2.0|
| 4.0|   1.0|            0.8|             0.2|               460.0|                   152.0|                     231.0|           6.5|    3.2|                       0.9|    2.0|
| 6.0|   1.0|            0.6|             0.1|               289.0|                    38.0|                  

In [102]:
#Convert data to pandas dataframe format
pandas_df_2 = data_ilp_2.toPandas()

In [103]:
print(pandas_df_2.corr()["Dataset"])

Age                          -0.133164
Gender                       -0.081349
Total_Bilirubin              -0.220218
Direct_Bilirubin             -0.246273
Alkaline_Phosphotase         -0.183363
Alamine_Aminotransferase     -0.163117
Aspartate_Aminotransferase   -0.151834
Total_Protiens                0.033614
Albumin                       0.159770
Albumin_and_Globulin_Ratio    0.163131
Dataset                       1.000000
Name: Dataset, dtype: float64


In [104]:
final_data_temp = pandas_df_2.drop(["Total_Bilirubin","Direct_Bilirubin","Alkaline_Phosphotase","Alamine_Aminotransferase"],axis = 1)

In [105]:
#Convert data back to spark dataframe format.
values = final_data_temp.values.tolist()
columns = final_data_temp.columns.tolist()
data_ilp_temp_2 = spark.createDataFrame(values, columns)
data_ilp_temp_2.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Gender: double (nullable = true)
 |-- Aspartate_Aminotransferase: double (nullable = true)
 |-- Total_Protiens: double (nullable = true)
 |-- Albumin: double (nullable = true)
 |-- Albumin_and_Globulin_Ratio: double (nullable = true)
 |-- Dataset: double (nullable = true)



In [106]:
#Convert data to pandas dataframe format
pandas_df_3 = data_ilp_temp_2.toPandas()

In [107]:
final_data = 1.0*(pandas_df_3 - pandas_df_3.mean())/pandas_df_3.std()

In [108]:
#Convert data back to spark dataframe format.
values = final_data.values.tolist()
columns = final_data.columns.tolist()
final_data_ilp = spark.createDataFrame(values, columns)

In [109]:
final_data_ilp.show()

+-------------------+-------------------+--------------------------+--------------------+-------------------+--------------------------+------------------+
|                Age|             Gender|Aspartate_Aminotransferase|      Total_Protiens|            Albumin|Albumin_and_Globulin_Ratio|           Dataset|
+-------------------+-------------------+--------------------------+--------------------+-------------------+--------------------------+------------------+
|-2.5140501831170847| 0.5642302874813497|       -0.2636346341671258|  1.3998247524124001| 1.0844004016934279|        0.1656364329645698| 1.582642542947409|
|-2.5140501831170847| 0.5642302874813497|       0.41602717934442085|0.016878787823424297| 0.0773950567367384|       -0.1472624077743717| 1.582642542947409|
| -2.390759194591714| 0.5642302874813497|      -0.27743487403538053|  -1.550459972044082|-1.4331129606982962|        -0.773060089252255| 1.582642542947409|
|-2.3291137003290285|-1.7692649728879466|         3.241626292369

In [110]:
import findspark

In [111]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns: "label" and "features".
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [112]:
final_data_ilp.columns

['Age',
 'Gender',
 'Aspartate_Aminotransferase',
 'Total_Protiens',
 'Albumin',
 'Albumin_and_Globulin_Ratio',
 'Dataset']

In [113]:
# Combine all features into one vector named features.
assembler = VectorAssembler(
  inputCols=[
      'Age',
      'Gender',
      'Aspartate_Aminotransferase',
      'Total_Protiens',
      'Albumin',
      'Albumin_and_Globulin_Ratio'],
       outputCol="features")

In [114]:
#transform the data. 
output = assembler.transform(data_ilp_temp_2)

In [115]:
# select the two columns we want. Features (which contains vectors), and the predictor.
final_data = output.select("features",'Dataset')

In [116]:
# Split the training and testing set.
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [117]:
# import the relevant classifiers. 
from pyspark.ml.classification import DecisionTreeClassifier,GBTClassifier,RandomForestClassifier
from pyspark.ml import Pipeline

In [118]:
# Use defaults to make the comparison "fair". This simplifies the comparison process.
dtc = DecisionTreeClassifier(labelCol='Dataset',featuresCol='features')
rfc = RandomForestClassifier(labelCol='Dataset',featuresCol='features')

In [119]:
# Train the models (it's three models, so it might take some time).
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)

In [120]:
dtc_predictions = dtc_model.transform(test_data)
rfc_predictions = rfc_model.transform(test_data)

In [121]:
# Let's start off with binary classification.
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Note that the label column isn't named label, it's named PrivateIndex in this case.
my_binary_eval = BinaryClassificationEvaluator(labelCol = 'Dataset')

In [127]:
# This is the area under the curve. This indicates that the data is highly seperable.
print("DTC")
print(my_binary_eval.evaluate(dtc_predictions))
# RFC improves accuracy but also model complexity. RFC outperforms DTC in nearly every situation.
print("RFC")
print(my_binary_eval.evaluate(rfc_predictions))

DTC
1.0
RFC
1.0


In [128]:
# import the evaluator.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [129]:
# Select (prediction, true label) and compute test error. 
acc_evaluator = MulticlassClassificationEvaluator(labelCol="Dataset", predictionCol="prediction", metricName="accuracy")

In [130]:
dtc_acc = acc_evaluator.evaluate(dtc_predictions)
rfc_acc = acc_evaluator.evaluate(rfc_predictions)

In [131]:
# Let's do something a bit more complex in terms of printing, just so it's formatted nicer. 
print("Here are the results!")
print('-'*40)
print('A single decision tree has an accuracy of: {0:2.2f}%'.format(dtc_acc*100))
print('-'*40)
print('A random forest ensemble has an accuracy of: {0:2.2f}%'.format(rfc_acc*100))

Here are the results!
----------------------------------------
A single decision tree has an accuracy of: 67.02%
----------------------------------------
A random forest ensemble has an accuracy of: 69.68%


In [132]:
#The establishment of logistic regression model.
from pyspark.ml.classification import LogisticRegression

In [133]:
from pyspark.ml import Pipeline
log_reg_fet = LogisticRegression(featuresCol='features',labelCol='Dataset')
pipeline = Pipeline(stages=[assembler, log_reg_fet])
train_fet_data, test_fet_data = data_ilp_temp_2.randomSplit([0.7,.3])
fit_model = pipeline.fit(train_fet_data)
results = fit_model.transform(data_ilp_temp_2)

In [134]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [135]:
eva = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Dataset')

In [136]:
AUC_num = eva.evaluate(results)
AUC_num

1.0

In [137]:
total_res = results.select('Dataset','prediction')

corr_res = total_res.filter(total_res['Dataset'] == total_res['prediction'])

countTR = total_res.count()
print("Correct: " + str(countTR))

countTC = corr_res.count()
print("Total Correct: " + str(countTC)) 

Correct: 579
Total Correct: 417


In [138]:
#Forecast accuracy.
countTC/countTR

0.7202072538860104