In [1]:
#BASICS DATA FRAME ALGORITHM

# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/24 03:25:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df = spark.read.csv('water_potability.1.csv')

In [3]:
df.head()

Row(_c0='ph', _c1='Hardness', _c2='Solids', _c3='Chloramines', _c4='Sulfate', _c5='Conductivity', _c6='Organic_carbon', _c7='Trihalomethanes', _c8='Turbidity', _c9='Potability')

In [4]:
# The show method allows you visualise DataFrames. We can see that there are two columns. 
df.show()

# You could also try this. 
df.columns

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|  _c0|     _c1|     _c2|        _c3|    _c4|         _c5|           _c6|            _c7|      _c8|       _c9|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
| null|  204.89|20791.32|       7.30| 368.52|      564.31|         10.38|          86.99|     2.96|      0.00|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|     4.50|      0.00|
| 8.10|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|      0.00|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|      0.00|
| 9.09|  181.10|17978.99|       6.55| 310.14|      398.41|         11.56|          32.00|     4.08|      0.00|
|

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']

In [5]:
df.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']

In [6]:
print("Column names:", df.columns)

Column names: ['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']


In [7]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)



In [43]:
#DATA CLEANING


# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('missing').getOrCreate()

In [44]:
# Importing data which has a header. Schema is automatically configured.
df = spark.read.csv('water_potability.1.csv', header=True, inferSchema=True)

# Let's see the data. You'll notice nulls.
df.show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [45]:
# Using this syntax, we can drop any row with missing data. Three rows are dropped.
df.na.drop().show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|10.22|  248.07|28749.72|       7.51| 393.66|      283.65|         13.79|           84.6|     2.67|       0.0|
| 8.64|  203.36|13672.09|       4.56| 303.31|      474.61|         12.36|           62.8|      4.4|       0.0|
|11.18|  227.23|25484.51|       9.08| 404.04|      563.89|         17.93|          71.98|     4.37|       0.0|
|

In [46]:
# Requires a certain amount of non-null values. Row two was dropped, as there's only one non-null value.
df.na.drop(thresh=2).show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [47]:
# Drops a row if all values are missing. Zero rows are dropped. 
df.na.drop(how="all").show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [48]:
# Drops a row if a value from a particular row is missing. Two rows are dropped.
df.na.drop(subset="ph",).show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|10.22|  248.07|28749.72|       7.51| 393.66|      283.65|         13.79|           84.6|     2.67|       0.0|
|

In [49]:
df.na.drop(subset="Sulfate",).show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|10.22|  248.07|28749.72|       7.51| 393.66|      283.65|         13.79|           84.6|     2.67|       0.0|
| 8.64|  203.36|13672.09|       4.56| 303.31|      474.61|         12.36|           62.8|      4.4|       0.0|
|

In [50]:
# Instead of dropping the row, this fills null string types with FILL VALUE. 
df.na.fill("FILL VALUE").show()

# Spark will only apply numbers to number data types, and strings to string data types.
df.na.fill(0).show()

# However, it's good practice to specify the row you want to fill using subset. 
df.na.fill('FILL NAME', subset=['ph']).show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [51]:
# Instead of dropping the row, this fills null string types with FILL VALUE. 
df.na.fill("FILL VALUE").show()

# Spark will only apply numbers to number data types, and strings to string data types.
df.na.fill(0).show()

# However, it's good practice to specify the row you want to fill using subset. 
df.na.fill('FILL NAME', subset=['Sulfate']).show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [52]:
# Instead of dropping the row, this fills null string types with FILL VALUE. 
df.na.fill("FILL VALUE").show()

# Spark will only apply numbers to number data types, and strings to string data types.
df.na.fill(0).show()

# However, it's good practice to specify the row you want to fill using subset. 
df.na.fill('FILL NAME', subset=['ph']).show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [53]:
# Also, it's good practice to use your ph average to fill missing data. 
from pyspark.sql.functions import mean

In [54]:
mean_ph = df.select(mean(df['ph'])).collect()
mean_ph

[Row(avg(ph)=7.08079381023912)]

In [55]:
mean_Sulfate = df.select(mean(df['Sulfate'])).collect()
mean_Sulfate

[Row(avg(Sulfate)=333.77578249731533)]

In [56]:
# Looks like we need to go one level deeper. Perfect! Let's assign that value to a variable.
mean_ph[0][0]
mean_ph_val = mean_ph[0][0]

mean_Sulfate[0][0]
mean_Sulfate_val = mean_Sulfate[0][0]

In [57]:
# And finally, fill the missing values with the mean.
df.na.fill(mean_ph_val, subset=['ph']).show()
df.na.fill(mean_Sulfate_val, subset=['Sulfate']).show()

+----------------+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|              ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+----------------+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|7.08079381023912|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
|            3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|             8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
|            8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
|            9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
|            5.58|  188.

In [58]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('water_potability.1.csv').getOrCreate()

In [59]:
# Schemas can only be inferred for CSV files. 
df = spark.read.csv('water_potability.1.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- ph: double (nullable = true)
 |-- Hardness: double (nullable = true)
 |-- Solids: double (nullable = true)
 |-- Chloramines: double (nullable = true)
 |-- Sulfate: double (nullable = true)
 |-- Conductivity: double (nullable = true)
 |-- Organic_carbon: double (nullable = true)
 |-- Trihalomethanes: double (nullable = true)
 |-- Turbidity: double (nullable = true)
 |-- Potability: double (nullable = true)



In [60]:
# Let's get a better look at the data.
# We know that we can show a DataFrame, but that's resulted in a mess! 
df.show()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [61]:
# Instead, let's just grab the first row. Much neater! 
df.head(1)

[Row(ph=None, Hardness=204.89, Solids=20791.32, Chloramines=7.3, Sulfate=368.52, Conductivity=564.31, Organic_carbon=10.38, Trihalomethanes=86.99, Turbidity=2.96, Potability=0.0)]

In [62]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('water_potability.1.csv').getOrCreate()

In [63]:
# Reads schema and accepts that the data has a header.
df = spark.read.csv('water_potability.1.csv', inferSchema=True, header=True)

# Let's see the data.
df.show()

# And the data schema.
df.printSchema()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
| null|  204.89|20791.32|        7.3| 368.52|      564.31|         10.38|          86.99|     2.96|       0.0|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|      4.5|       0.0|
|  8.1|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|       0.0|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|       0.0|
| 9.09|   181.1|17978.99|       6.55| 310.14|      398.41|         11.56|           32.0|     4.08|       0.0|
| 5.58|  188.31|28748.69|       7.54| 326.68|      280.47|           8.4|          54.92|     2.56|       0.0|
|

In [64]:
# We can also group, and show the averages of each group.
df.groupBy('Potability').mean().show()

+----------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------------+---------------+
|Potability|          avg(ph)|     avg(Hardness)|       avg(Solids)| avg(Chloramines)|     avg(Sulfate)| avg(Conductivity)|avg(Organic_carbon)|avg(Trihalomethanes)|   avg(Turbidity)|avg(Potability)|
+----------+-----------------+------------------+------------------+-----------------+-----------------+------------------+-------------------+--------------------+-----------------+---------------+
|       0.0|7.085377288204867|196.73328431532053|21777.490781777895|7.092156204221223|334.5642993056295|426.73046905325344| 14.364347931442447|   66.30354620526703|3.965802536707207|            0.0|
|       1.0|7.073783295348782|195.80074426245721|22383.991017830645| 7.16933802623318|332.5669902327923|425.38379997840417|   14.1608925211244|   66.53968374061084|3.968328289134596|            1.0|
+----

In [65]:
# Instead of grouping, you can use aggregation which represents the entire dataset. 
df.agg({"Potability":"mean"}).show()

+------------------+
|   avg(Potability)|
+------------------+
|0.3901098901098901|
+------------------+



In [66]:
# We can also import SQL functions. 
from pyspark.sql.functions import countDistinct,avg,stddev,format_number
df.select(avg('Potability').alias('Average Potability')).show()

+------------------+
|Average Potability|
+------------------+
|0.3901098901098901|
+------------------+



24/05/24 03:39:12 ERROR Executor: Exception in task 0.0 in stage 101.0 (TID 91)
org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3463/0x00000008413df840: (struct<ph:double,Hardness:double,Solids:double,Chloramines:double,Sulfate:double,Conductivity:double,Organic_carbon:double,Trihalomethanes:double,Turbidity:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scal

Py4JJavaError: An error occurred while calling o455.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 101.0 failed 1 times, most recent failure: Lost task 0.0 in stage 101.0 (TID 91) (ip-172-31-55-62.ec2.internal executor driver): org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3463/0x00000008413df840: (struct<ph:double,Hardness:double,Solids:double,Chloramines:double,Sulfate:double,Conductivity:double,Organic_carbon:double,Trihalomethanes:double,Turbidity:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (VectorAssembler$$Lambda$3463/0x00000008413df840: (struct<ph:double,Hardness:double,Solids:double,Chloramines:double,Sulfate:double,Conductivity:double,Organic_carbon:double,Trihalomethanes:double,Turbidity:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 17 more


In [67]:
 #LINEAR REGRESSION

# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('linear_regression_adv').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip install numpy --user' into the EC2 console.
from pyspark.ml.regression import LinearRegression

In [68]:
data = spark.read.csv("water_potability.1.csv",inferSchema=True,header=True)


In [69]:
# Print the schema of the DataFrame. You can see potential features as well as the predictor.
data.printSchema()

root
 |-- ph: double (nullable = true)
 |-- Hardness: double (nullable = true)
 |-- Solids: double (nullable = true)
 |-- Chloramines: double (nullable = true)
 |-- Sulfate: double (nullable = true)
 |-- Conductivity: double (nullable = true)
 |-- Organic_carbon: double (nullable = true)
 |-- Trihalomethanes: double (nullable = true)
 |-- Turbidity: double (nullable = true)
 |-- Potability: double (nullable = true)



In [70]:
# Let's focus on one row to make it easier to read.
data.head()

Row(ph=None, Hardness=204.89, Solids=20791.32, Chloramines=7.3, Sulfate=368.52, Conductivity=564.31, Organic_carbon=10.38, Trihalomethanes=86.99, Turbidity=2.96, Potability=0.0)

In [71]:
# A simple for loop allows us to make it even clearer. 
for item in data.head():
    print(item)

None
204.89
20791.32
7.3
368.52
564.31
10.38
86.99
2.96
0.0


In [79]:
  #TREE

# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('water_potability.1.csv').getOrCreate()

In [80]:
# Load training data. 
data = spark.read.csv('water_potability.1.csv',inferSchema=True,header=True)


In [81]:
# Let's get an idea of what the data looks like. 
data.printSchema()

root
 |-- ph: double (nullable = true)
 |-- Hardness: double (nullable = true)
 |-- Solids: double (nullable = true)
 |-- Chloramines: double (nullable = true)
 |-- Sulfate: double (nullable = true)
 |-- Conductivity: double (nullable = true)
 |-- Organic_carbon: double (nullable = true)
 |-- Trihalomethanes: double (nullable = true)
 |-- Turbidity: double (nullable = true)
 |-- Potability: double (nullable = true)



In [82]:
data.head()

Row(ph=None, Hardness=204.89, Solids=20791.32, Chloramines=7.3, Sulfate=368.52, Conductivity=564.31, Organic_carbon=10.38, Trihalomethanes=86.99, Turbidity=2.96, Potability=0.0)

In [83]:
# Must be included at the beginning of each new notebook. Remember to change the app name.
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('water_potability.1.csv').getOrCreate()

from pyspark.ml import Pipeline
from pyspark.ml.classification import (RandomForestClassifier, GBTClassifier, DecisionTreeClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [84]:
# Load and parse the data file, converting it to a DataFrame.
data = spark.read.csv('water_potability.1.csv')

In [85]:
# Let's get a better look at the data.
data.show()

data.printSchema()

+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|  _c0|     _c1|     _c2|        _c3|    _c4|         _c5|           _c6|            _c7|      _c8|       _c9|
+-----+--------+--------+-----------+-------+------------+--------------+---------------+---------+----------+
|   ph|Hardness|  Solids|Chloramines|Sulfate|Conductivity|Organic_carbon|Trihalomethanes|Turbidity|Potability|
| null|  204.89|20791.32|       7.30| 368.52|      564.31|         10.38|          86.99|     2.96|      0.00|
| 3.72|  129.42|18630.06|       6.64|   null|      592.89|         15.18|          56.33|     4.50|      0.00|
| 8.10|  224.24|19909.54|       9.28|   null|      418.61|         16.87|          66.42|     3.06|      0.00|
| 8.32|  214.37|22018.42|       8.06| 356.89|      363.27|         18.44|         100.34|     4.63|      0.00|
| 9.09|  181.10|17978.99|       6.55| 310.14|      398.41|         11.56|          32.00|     4.08|      0.00|
|

In [104]:
# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [105]:
data.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9']

In [106]:
feature_columns = [col for col in df.columns if col != 'Potability']

In [107]:
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [109]:
# Import the relevant Python libraries.
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
in BDASS 