<a href="https://colab.research.google.com/github/shohidu-zzaman/GA_Sessions/blob/master/Water_Quality_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Configaration cell
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [2]:
# findspark initialazation cell
import findspark
findspark.init()

In [3]:
# Spark Session creation
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Water_Quality_Prediction").getOrCreate()

In [19]:
# Mounting drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Data load

In [20]:
record_data_path = '/content/drive/MyDrive/big_data_project_water_prediction/water_potability.csv'

In [21]:
record_dataset = spark.read.csv(record_data_path, header=True, inferSchema=True)
record_dataset.show(5)

+-----------+-----------+-----------+-----------+-----------+------------+--------------+---------------+-----------+----------+
|         ph|   Hardness|     Solids|Chloramines|    Sulfate|Conductivity|Organic_carbon|Trihalomethanes|  Turbidity|Potability|
+-----------+-----------+-----------+-----------+-----------+------------+--------------+---------------+-----------+----------+
|       null|204.8904555|20791.31898|7.300211873|368.5164413| 564.3086542|   10.37978308|    86.99097046|2.963135381|         0|
|3.716080075|129.4229205|18630.05786|6.635245884|       null| 592.8853591|   15.18001312|    56.32907628|4.500656275|         0|
|8.099124189|224.2362594|19909.54173|9.275883603|       null| 418.6062131|   16.86863693|    66.42009251| 3.05593375|         0|
|8.316765884|214.3733941|22018.41744|8.059332377|356.8861356| 363.2665162|    18.4365245|    100.3416744|4.628770537|         0|
|9.092223456|181.1015092|17978.98634|6.546599974|310.1357375| 398.4108134|   11.55827944|    31.9

In [22]:
# Data Shape
record_row_count = record_dataset.count()
record_col_count = len(record_dataset.columns)

print(f"Record Data Shape: ({record_row_count}, {record_col_count})")

Record Data Shape: (3276, 10)


# Data preprocessing

Check null value

In [23]:

from pyspark.sql import functions as F
from pyspark.sql.functions import col,isnan, when, count
record_null = record_dataset.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in record_dataset.columns])
record_null.show()


+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+
| ph|Hardness|Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+
|491|       0|     0|          0|    781|           0|             0|            162|        0|         0|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+----------+



Delete Null values

In [24]:
from pyspark.sql.functions import col, when, count

numerical_columns = ["ph", "Hardness", "Solids", "Chloramines", "Sulfate", "Conductivity", "Organic_carbon", "Trihalomethanes", "Turbidity"]

empty_counts = record_dataset.agg(*[count(when(col(c) == "", c)).alias(c) for c in numerical_columns])

empty_counts.show()

record_dataset_no_empty_rows = record_dataset.replace("", None).na.drop()

record_dataset_no_empty_rows.show(5)


+---+--------+------+-----------+-------+------------+--------------+---------------+---------+
| ph|Hardness|Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+
|  0|       0|     0|          0|      0|           0|             0|              0|        0|
+---+--------+------+-----------+-------+------------+--------------+---------------+---------+

+-----------+-----------+-----------+-----------+-----------+------------+--------------+---------------+-----------+----------+
|         ph|   Hardness|     Solids|Chloramines|    Sulfate|Conductivity|Organic_carbon|Trihalomethanes|  Turbidity|Potability|
+-----------+-----------+-----------+-----------+-----------+------------+--------------+---------------+-----------+----------+
|8.316765884|214.3733941|22018.41744|8.059332377|356.8861356| 363.2665162|    18.4365245|    100.3416744|4.628770537|         0|
|9.

Update data shape

In [25]:
record_row_count_after = record_dataset_no_empty_rows.count()
record_col_count_after = len(record_dataset_no_empty_rows.columns)

print(f"Record Data Shape After Deletion: ({record_row_count_after}, {record_col_count_after})")


Record Data Shape After Deletion: (2011, 10)


Apply the imputer to the DataFrame

In [26]:
from pyspark.ml.feature import Imputer

# Define the input columns
input_cols = ["ph", "Hardness", "Solids", "Chloramines", "Sulfate", "Conductivity", "Organic_carbon", "Trihalomethanes", "Turbidity"]

# Create an imputer object
imputer = Imputer(inputCols=input_cols, outputCols=input_cols)

# Fit and transform the imputer on the dataset
record_dataset_imputed = imputer.fit(record_dataset_no_empty_rows).transform(record_dataset_no_empty_rows)

# Show the first few rows of the DataFrame after imputation
record_dataset_imputed.show(5)


+-----------+-----------+-----------+-----------+-----------+------------+--------------+---------------+-----------+----------+
|         ph|   Hardness|     Solids|Chloramines|    Sulfate|Conductivity|Organic_carbon|Trihalomethanes|  Turbidity|Potability|
+-----------+-----------+-----------+-----------+-----------+------------+--------------+---------------+-----------+----------+
|8.316765884|214.3733941|22018.41744|8.059332377|356.8861356| 363.2665162|    18.4365245|    100.3416744|4.628770537|         0|
|9.092223456|181.1015092|17978.98634|6.546599974|310.1357375| 398.4108134|   11.55827944|    31.99799273|4.075075425|         0|
|5.584086638|188.3133238|28748.68774|7.544868789|326.6783629| 280.4679159|    8.39973464|    54.91786184|2.559708228|         0|
|10.22386216|248.0717353|28749.71654|7.513408466|393.6633955| 283.6516335|   13.78969532|    84.60355617|2.672988737|         0|
|8.635848719|203.3615226|13672.09176|4.563008686|303.3097712| 474.6076449|    12.3638167|    62.7

Apply VectorAssembler

In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

assembler = VectorAssembler(
    inputCols=["ph", "Hardness", "Solids", "Chloramines", "Sulfate", "Conductivity", "Organic_carbon", "Trihalomethanes", "Turbidity"],
    outputCol="features")

data_assembled = assembler.transform(record_dataset_imputed)
final_data = data_assembled.select("features", "Potability")

final_data.show(5, truncate=False)


+-------------------------------------------------------------------------------------------------------------+----------+
|features                                                                                                     |Potability|
+-------------------------------------------------------------------------------------------------------------+----------+
|[8.316765884,214.3733941,22018.41744,8.059332377,356.8861356,363.2665162,18.4365245,100.3416744,4.628770537] |0         |
|[9.092223456,181.1015092,17978.98634,6.546599974,310.1357375,398.4108134,11.55827944,31.99799273,4.075075425]|0         |
|[5.584086638,188.3133238,28748.68774,7.544868789,326.6783629,280.4679159,8.39973464,54.91786184,2.559708228] |0         |
|[10.22386216,248.0717353,28749.71654,7.513408466,393.6633955,283.6516335,13.78969532,84.60355617,2.672988737]|0         |
|[8.635848719,203.3615226,13672.09176,4.563008686,303.3097712,474.6076449,12.3638167,62.79830896,4.401424715] |0         |
+---------------

Split Dataset

In [29]:
# Split the data into training and testing sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

print("Training Data Count: ", train_data.count())
print("Testing Data Count: ", test_data.count())

Training Data Count:  1651
Testing Data Count:  360


Prepare The Model

In [30]:
# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="Potability" , predictionCol="predicted_potability")

# Fit the model on the training data
model = lr.fit(train_data)

Evaluate Performance

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.stat import Summarizer
from pyspark.sql import functions as F

predictions = model.transform(test_data)

# Update the column name to the one generated during the model transformation
evaluator = RegressionEvaluator(labelCol="Potability", predictionCol="predicted_potability", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: {:.3f}".format(rmse))

evaluator_mse = RegressionEvaluator(labelCol="Potability", predictionCol="predicted_potability", metricName="mse")
mse = evaluator_mse.evaluate(predictions)
print("MSE on test data: {:.3f}".format(mse))



Root Mean Squared Error (RMSE) on test data: 0.492
MSE on test data: 0.243


Accurecy Result

In [32]:
# If you want to calculate accuracy for classification, you can use a binary classification evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assuming 'Potability' is a binary class (0 or 1)
binary_evaluator = BinaryClassificationEvaluator(labelCol="Potability", rawPredictionCol="predicted_potability")
accuracy = binary_evaluator.evaluate(predictions)
print(f"Accuracy on test data: {accuracy}")


Accuracy on test data: 0.5025709814442209


Save Data

In [34]:
model.save("/content/drive/MyDrive/big_data_project_water_prediction/load_data")

# Load the model
from pyspark.ml.regression import LinearRegressionModel
loaded_model = LinearRegressionModel.load("/content/drive/MyDrive/big_data_project_water_prediction/load_data")

#User Interface part

In [35]:
from IPython.display import HTML
shell = get_ipython()

def adjust_font_size():
  display(HTML('''<style>
    body {
      font-size: 20px;
    }
  '''))

if adjust_font_size not in shell.events.callbacks['pre_execute']:
  shell.events.register('pre_execute', adjust_font_size)

In [36]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegressionModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, FloatType
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

# Initialize Spark session
spark = SparkSession.builder.appName("Water_Quality_Prediction_UI").getOrCreate()

# Load the saved model
model_path = "/content/drive/MyDrive/big_data_project_water_prediction/load_data"
loaded_model = LinearRegressionModel.load(model_path)

# Create input widgets
ph_input = widgets.FloatText(description="pH:")
hardness_input = widgets.FloatText(description="Hardness:")
solids_input = widgets.FloatText(description="Solids:")
chloramines_input = widgets.FloatText(description="Chloramines:")
sulfate_input = widgets.FloatText(description="Sulfate:")
conductivity_input = widgets.FloatText(description="Conductivity:")
organic_carbon_input = widgets.FloatText(description="Organic Carbon:")
trihalomethanes_input = widgets.FloatText(description="Trihalomethanes:")
turbidity_input = widgets.FloatText(description="Turbidity:")

# Create submit button and result output
submit_button = widgets.Button(description="Submit")
output_result = widgets.Output()

# Define UI layout
ui = widgets.VBox([ph_input, hardness_input, solids_input, chloramines_input, sulfate_input,
                   conductivity_input, organic_carbon_input, trihalomethanes_input, turbidity_input,
                   submit_button, output_result])

# Display UI
display(ui)

# Define on_submit_button_click function
def on_submit_button_click(b):
    # Get input values
    attributes = {
        "ph": ph_input.value,
        "Hardness": hardness_input.value,
        "Solids": solids_input.value,
        "Chloramines": chloramines_input.value,
        "Sulfate": sulfate_input.value,
        "Conductivity": conductivity_input.value,
        "Organic_carbon": organic_carbon_input.value,
        "Trihalomethanes": trihalomethanes_input.value,
        "Turbidity": turbidity_input.value
    }

    # Create a DataFrame with the provided attributes using the defined schema
    input_schema = StructType([
        StructField("ph", FloatType(), True),
        StructField("Hardness", FloatType(), True),
        StructField("Solids", FloatType(), True),
        StructField("Chloramines", FloatType(), True),
        StructField("Sulfate", FloatType(), True),
        StructField("Conductivity", FloatType(), True),
        StructField("Organic_carbon", FloatType(), True),
        StructField("Trihalomethanes", FloatType(), True),
        StructField("Turbidity", FloatType(), True)
    ])

    input_data = spark.createDataFrame([(attributes["ph"], attributes["Hardness"], attributes["Solids"],
                                         attributes["Chloramines"], attributes["Sulfate"],
                                         attributes["Conductivity"], attributes["Organic_carbon"],
                                         attributes["Trihalomethanes"], attributes["Turbidity"])],
                                       schema=input_schema)

    # Assemble features using VectorAssembler
    assembler = VectorAssembler(inputCols=input_data.columns, outputCol="features")
    assembled_data = assembler.transform(input_data)

    # Make predictions
    prediction_col = loaded_model.getPredictionCol()
    prediction = loaded_model.transform(assembled_data).select(prediction_col).collect()[0][prediction_col]

    with output_result:
        output_result.clear_output()
        result_message = f"The water is {'Potable' if prediction == 1.0 else 'Not Potable'}"
        print(result_message)

# Attach on_submit_button_click function to the submit button
submit_button.on_click(on_submit_button_click)


VBox(children=(FloatText(value=0.0, description='pH:'), FloatText(value=0.0, description='Hardness:'), FloatTe…