In [1]:
%env PYSPARK_PYTHON=python
%env PYSPARK_DRIVER_PYTHON=jupyter

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, BooleanType
from pyspark.ml import feature, classification, evaluation, Pipeline
schema = StructType([
    StructField("ph", DoubleType(), True),
    StructField("hardness", DoubleType(), True),
    StructField("solids", DoubleType(), True),
    StructField("chloramines", DoubleType(), True),
    StructField("sulfate", DoubleType(), True),
    StructField("conductivity", DoubleType(), True),
    StructField("organic_carbon", DoubleType(), True),
    StructField("trihalomethanes", DoubleType(), True),
    StructField("turbidity", DoubleType(), True),
    StructField("potability", IntegerType(), True),
])

spark = SparkSession.builder.appName("water").getOrCreate()
sc = spark.sparkContext
#df = spark.read.csv("./water_potability.csv", header=False, schema=schema)
df = spark.read.csv("./water_potability.csv", header=False, schema=schema)
df.createOrReplaceTempView("df")
df_train, df_eval = df.randomSplit([0.8,0.2], 42)




env: PYSPARK_PYTHON=python
env: PYSPARK_DRIVER_PYTHON=jupyter


In [2]:
df_train.show(3)
df_train = df_train.dropna()
df_train.show(3)

+----+------------------+-----------------+------------------+----------------+------------------+------------------+-----------------+-----------------+----------+
|  ph|          hardness|           solids|       chloramines|         sulfate|      conductivity|    organic_carbon|  trihalomethanes|        turbidity|potability|
+----+------------------+-----------------+------------------+----------------+------------------+------------------+-----------------+-----------------+----------+
|null|              null|             null|              null|            null|              null|              null|             null|             null|      null|
|null|  98.3679148956603|28415.57583214058|10.558949998467961|296.843207792478|505.24026927891407|12.882614472289333|85.32995534051292|4.119087300328971|         1|
|null|105.85926357195498|37928.14217716675| 5.609440345508508|            null|358.88876761151056|12.207108489369546|71.11989017420973|3.873853349593973|         0|
+----+----

In [3]:
print(df.columns[:-1])
vect = feature.VectorAssembler(inputCols=df.columns[:-1], outputCol="features_raw")
df_train_vectorized = vect.transform(df_train)
df_train_vectorized.show(1)
df_train_vectorized = df_train_vectorized.select("potability", "features_raw")

df_train_vectorized.show(3)



['ph', 'hardness', 'solids', 'chloramines', 'sulfate', 'conductivity', 'organic_carbon', 'trihalomethanes', 'turbidity']
+-------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+----------+--------------------+
|                 ph|         hardness|            solids|       chloramines|           sulfate|     conductivity|    organic_carbon|  trihalomethanes|        turbidity|potability|        features_raw|
+-------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+----------+--------------------+
|0.22749905020219874|152.5301111764229|39028.599340290755|3.4624920476792767|283.69378223429663|443.0292321286284|13.201943203829217|62.32271110691731|3.545741437567914|         1|[0.22749905020219...|
+-------------------+-----------------+------------------+-------------

In [4]:
scaler = feature.StandardScaler(inputCol="features_raw", outputCol="features")
scaler_t = scaler.fit(df_train_vectorized)
df_train_scaled = scaler_t.transform(df_train_vectorized)
df_train_scaled = df_train_scaled.select("potability", "features")
df_train_scaled.show(2)

+----------+--------------------+
|potability|            features|
+----------+--------------------+
|         1|[0.14545424324450...|
|         1|[0.63291223270976...|
+----------+--------------------+
only showing top 2 rows



In [5]:
forest = classification.RandomForestClassifier(featuresCol='features', labelCol='potability',maxDepth=12, minInstancesPerNode=2, seed=42)
forest_t = forest.fit(df_train_scaled)
prediction_train = forest_t.transform(df_train_scaled)
prediction_train.show(3)


+----------+--------------------+--------------------+--------------------+----------+
|potability|            features|       rawPrediction|         probability|prediction|
+----------+--------------------+--------------------+--------------------+----------+
|         1|[0.14545424324450...|[3.36587301587301...|[0.16829365079365...|       1.0|
|         1|[0.63291223270976...|[4.55929339477726...|[0.22796466973886...|       1.0|
|         1|[1.12338272955014...|[9.17977912373757...|[0.45898895618687...|       1.0|
+----------+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [6]:
binaryevaluator = evaluation.BinaryClassificationEvaluator(
    labelCol='potability'
    
)
multievaluator = evaluation.MulticlassClassificationEvaluator(
    labelCol='potability',
    metricName="accuracy"
)
print(binaryevaluator.evaluate(prediction_train))
print(multievaluator.evaluate(prediction_train))

0.9927757069108063
0.935009196811772


In [7]:
pipe = Pipeline(stages=[vect, scaler, forest])
pipeline = pipe.fit(df_train)

In [8]:
import pandas as pd
pandas_df = pd.read_csv('water_potability.csv', header=0)
pandas_df.drop(labels='Potability', axis=1, inplace=True)
print(pandas_df.head(3))
features_limits = {}
for col in pandas_df.columns:
        features_limits[col.lower()] = (float(pandas_df[col].min()), float(pandas_df[col].max()))

         ph    Hardness        Solids  Chloramines     Sulfate  Conductivity  \
0       NaN  204.890455  20791.318981     7.300212  368.516441    564.308654   
1  3.716080  129.422921  18630.057858     6.635246         NaN    592.885359   
2  8.099124  224.236259  19909.541732     9.275884         NaN    418.606213   

   Organic_carbon  Trihalomethanes  Turbidity  
0       10.379783        86.990970   2.963135  
1       15.180013        56.329076   4.500656  
2       16.868637        66.420093   3.055934  


In [9]:
import random
uni = random.uniform
random_features = [random.uniform(*limits) for limits in features_limits.values()]
random_df = pd.DataFrame({k.lower():v for k,v in zip(pandas_df.columns, random_features) }, index=[0])
random_spark_df = spark.createDataFrame(random_df)
random_spark_df.show(1)

         ph   hardness       solids  chloramines     sulfate  conductivity  \
0  8.239299  58.111374  25380.40525    10.104995  213.342585    494.884828   

   organic_carbon  trihalomethanes  turbidity  
0       20.086055        20.911411   4.119006  


In [12]:
prediction = pipeline.transform(random_spark_df)
prediction.show(1)



+-----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+----------+
|               ph|         hardness|           solids|       chloramines|           sulfate|     conductivity|   organic_carbon|   trihalomethanes|        turbidity|        features_raw|            features|       rawPrediction|         probability|prediction|
+-----------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+--------------------+--------------------+--------------------+--------------------+----------+
|8.239298883715975|58.11137403573491|25380.40525048162|10.104994522867138|213.34258476498928|494.8848275975486|20.08605533736839|20.911411383761894|4.119006300352495|[8.23929888371597...|[5.26789445024497...|[9.771

In [20]:
from flask import Flask, render_template
from dataclasses import dataclass, asdict
app = Flask(__name__)

@dataclass
class Feature():
    name:str
    min:float
    max:float

@app.route('/')
def index():
    features = [asdict(Feature(k, *v)) for k,v in features_limits.items()]
    return render_template('water.jinja', features=features)



In [21]:
app.run()

 * Serving Flask app '__main__' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
[2022-01-23 20:17:26,608] ERROR in app: Exception on / [GET]
Traceback (most recent call last):
  File "C:\Users\sevenreek\AppData\Roaming\Python\Python39\site-packages\flask\app.py", line 2070, in wsgi_app
    response = self.full_dispatch_request()
  File "C:\Users\sevenreek\AppData\Roaming\Python\Python39\site-packages\flask\app.py", line 1515, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "C:\Users\sevenreek\AppData\Roaming\Python\Python39\site-packages\flask\app.py", line 1513, in full_dispatch_request
    rv = self.dispatch_request()
  File "C:\Users\sevenreek\AppData\Roaming\Python\Python39\site-packages\flask\app.py", line 1499, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
  File "C:\Users\sevenreek\AppData\Local\Temp\ipykernel_23432\2411937443.py", line 13, in index
    features = [asdict(Feature(k, *v)) for k,v in features_limits]
  File