In [33]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('water_protability').getOrCreate()
spark

In [34]:
data = spark.read.csv("C:\\ML\\Machine-Learning-Projects-master\\Water Quality Classification\\water_potability.csv",header = True,inferSchema = True)
data.show(5)

+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+----------+
|               ph|          Hardness|            Solids|      Chloramines|           Sulfate|      Conductivity|    Organic_carbon|   Trihalomethanes|         Turbidity|Potability|
+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+----------+
|             null| 204.8904554713363|20791.318980747026|7.300211873184757|368.51644134980336| 564.3086541722439|  10.3797830780847|  86.9909704615088|2.9631353806316407|         0|
| 3.71608007538699|129.42292051494425|18630.057857970347|   6.635245883862|              null| 592.8853591348523|15.180013116357259| 56.32907628451764| 4.500656274942408|         0|
|8.099124189298397|224.23625939355776|19909.541732292393|9.275883602694089|              n

In [35]:
data.printSchema()

root
 |-- ph: double (nullable = true)
 |-- Hardness: double (nullable = true)
 |-- Solids: double (nullable = true)
 |-- Chloramines: double (nullable = true)
 |-- Sulfate: double (nullable = true)
 |-- Conductivity: double (nullable = true)
 |-- Organic_carbon: double (nullable = true)
 |-- Trihalomethanes: double (nullable = true)
 |-- Turbidity: double (nullable = true)
 |-- Potability: integer (nullable = true)



In [36]:
from pyspark.sql.functions import isnan,count,when,col
data.select([count(when(col(c).isNull(),c)).alias(c) for c in data.columns]).show()

+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+
| ph|Hardness|Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+
|491|       0|     0|          0|    781|           0|             0|            162|        0|         0|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+



In [37]:
#TO fill null values 
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = ['ph','Sulfate','Trihalomethanes'],
                 outputCols =['{}_imputed'.format(c) for c in ['ph','Sulfate','Trihalomethanes']]).setStrategy('mean')

In [38]:
data = imputer.fit(data).transform(data)
data.show(5)

+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+----------+-----------------+------------------+-----------------------+
|               ph|          Hardness|            Solids|      Chloramines|           Sulfate|      Conductivity|    Organic_carbon|   Trihalomethanes|         Turbidity|Potability|       ph_imputed|   Sulfate_imputed|Trihalomethanes_imputed|
+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+----------+-----------------+------------------+-----------------------+
|             null| 204.8904554713363|20791.318980747026|7.300211873184757|368.51644134980336| 564.3086541722439|  10.3797830780847|  86.9909704615088|2.9631353806316407|         0|7.080794504276819|368.51644134980336|       86.9909704615088|
| 3.71608007538699|129.42292

In [39]:
data = data.drop('ph','Sulfate','Trihalomethanes')

In [40]:
data.show(5)

+------------------+------------------+-----------------+------------------+------------------+------------------+----------+-----------------+------------------+-----------------------+
|          Hardness|            Solids|      Chloramines|      Conductivity|    Organic_carbon|         Turbidity|Potability|       ph_imputed|   Sulfate_imputed|Trihalomethanes_imputed|
+------------------+------------------+-----------------+------------------+------------------+------------------+----------+-----------------+------------------+-----------------------+
| 204.8904554713363|20791.318980747026|7.300211873184757| 564.3086541722439|  10.3797830780847|2.9631353806316407|         0|7.080794504276819|368.51644134980336|       86.9909704615088|
|129.42292051494425|18630.057857970347|   6.635245883862| 592.8853591348523|15.180013116357259| 4.500656274942408|         0| 3.71608007538699| 333.7757766108134|      56.32907628451764|
|224.23625939355776|19909.541732292393|9.275883602694089| 418.606

In [41]:
len(data.columns)

10

In [42]:
cols = data.columns
cols = cols[:9]
cols

['Hardness',
 'Solids',
 'Chloramines',
 'Conductivity',
 'Organic_carbon',
 'Turbidity',
 'Potability',
 'ph_imputed',
 'Sulfate_imputed']

In [43]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = cols,outputCol = 'Independent Features')
output = assembler.transform(data)
output.show(5)

+------------------+------------------+-----------------+------------------+------------------+------------------+----------+-----------------+------------------+-----------------------+--------------------+
|          Hardness|            Solids|      Chloramines|      Conductivity|    Organic_carbon|         Turbidity|Potability|       ph_imputed|   Sulfate_imputed|Trihalomethanes_imputed|Independent Features|
+------------------+------------------+-----------------+------------------+------------------+------------------+----------+-----------------+------------------+-----------------------+--------------------+
| 204.8904554713363|20791.318980747026|7.300211873184757| 564.3086541722439|  10.3797830780847|2.9631353806316407|         0|7.080794504276819|368.51644134980336|       86.9909704615088|[204.890455471336...|
|129.42292051494425|18630.057857970347|   6.635245883862| 592.8853591348523|15.180013116357259| 4.500656274942408|         0| 3.71608007538699| 333.7757766108134|      

In [44]:
finilized_data = output.select('Independent Features','Potability')
finilized_data.show(5)

+--------------------+----------+
|Independent Features|Potability|
+--------------------+----------+
|[204.890455471336...|         0|
|[129.422920514944...|         0|
|[224.236259393557...|         0|
|[214.373394085622...|         0|
|[181.101509236125...|         0|
+--------------------+----------+
only showing top 5 rows



In [46]:
#model
from pyspark.ml.classification import LogisticRegression

#train_test_split
train_data,test_data = finilized_data.randomSplit([0.75,0.25])
LR = LogisticRegression(labelCol = 'Potability',featuresCol = 'Independent Features')
LR  = LR.fit(train_data)
pred_test = LR.transform(test_data)
pred_test.show(10)

+--------------------+----------+--------------------+--------------------+----------+
|Independent Features|Potability|       rawPrediction|         probability|prediction|
+--------------------+----------+--------------------+--------------------+----------+
|[47.432,19237.949...|         1|[-18.877288446527...|[6.33428779796262...|       1.0|
|[77.4595861004437...|         1|[-18.905443034270...|[6.15843567552977...|       1.0|
|[81.7108952702466...|         1|[-18.895179981135...|[6.22196547492403...|       1.0|
|[98.4529305095862...|         0|[19.0168288298188...|[0.99999999449070...|       0.0|
|[103.173586978107...|         0|[19.0188399256736...|[0.99999999450177...|       0.0|
|[107.341981878690...|         1|[-18.874831426403...|[6.34987040600098...|       1.0|
|[111.246411596634...|         1|[-18.859525321967...|[6.44780980825371...|       1.0|
|[112.820253975762...|         0|[19.0905208822833...|[0.99999999488209...|       0.0|
|[113.024471731305...|         0|[19.017136

In [47]:
pred_test.select('Potability','prediction').show(10)

+----------+----------+
|Potability|prediction|
+----------+----------+
|         1|       1.0|
|         1|       1.0|
|         1|       1.0|
|         0|       0.0|
|         0|       0.0|
|         1|       1.0|
|         1|       1.0|
|         0|       0.0|
|         0|       0.0|
|         1|       1.0|
+----------+----------+
only showing top 10 rows



In [48]:
lr_summary = LR.summary

In [49]:
#overall accuracy of classification model
print(lr_summary.accuracy*100)

100.0


In [50]:
print('areaUnderROC:',lr_summary.areaUnderROC*100)

areaUnderROC: 100.0


In [51]:
#precision of both classes
print('precision:',lr_summary.precisionByLabel)

precision: [1.0, 1.0]


In [52]:
#recall of both classes
print('recall:',lr_summary.recallByLabel)

recall: [1.0, 1.0]


In [53]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol = 'Potability',predictionCol = 'prediction',metricName = 'accuracy')
accuracy = evaluator.evaluate(pred_test)
print('accuracy:',accuracy*100)

accuracy: 100.0
