In [2]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree
from pyspark import SparkConf, SparkContext , SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.regression import LabeledPoint
import numpy as np
import pandas as pd
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, FloatType
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import when, lit, col






In [5]:
spark = SparkSession.builder.appName("Job1").master("local").getOrCreate()
sc = SparkContext
# conf = SparkConf().setAppName("Job1").setMaster("local[*]")
# sc = parkContext.getOrCreate(conf)



# Defining Schema of file

In [6]:
housingfileSchema = StructType([
    StructField('longitude', FloatType(), True),
    StructField('latitude', FloatType(), True),
    StructField('housing_median_age', FloatType(), True),
    StructField('total_rooms', FloatType(), True),
    StructField('total_bedrooms', FloatType(), True),
    StructField('population', FloatType(), True),
    StructField('households', FloatType(), True),
    StructField('median_income', FloatType(), True),
    StructField('median_house_value', FloatType(), True),
    StructField('ocean_proximity', StringType(), True)
])

# Reading Input data

In [8]:
DATA_PATH = os.path.join("/home/zeefu/Desktop")
FILE_NAME = "housing.csv"
FULL_PATH = os.path.join(DATA_PATH, FILE_NAME)


In [9]:
# df = spark.read\
#     .schema(housingfileSchema)\
#     .option("delimiter", ",")\
#     .csv(FULL_PATH)

df = spark.read.option("delimiter", ",").schema(housingfileSchema).csv(FULL_PATH, header=True)

#or
# #  df = spark.read.csv(FULL_PATH, header=True, inferSchmea=True)


# Quantile values 

In [7]:
df.describe("median_house_value").show()



+-------+------------------+
|summary|median_house_value|
+-------+------------------+
|  count|             20640|
|   mean|206855.81690891474|
| stddev|115395.61587441359|
|    min|           14999.0|
|    max|          500001.0|
+-------+------------------+



In [8]:
x5 = df.approxQuantile("total_rooms", [0.5], 0)
x25 = df.approxQuantile("total_rooms", [0.25], 0)
x75 = df.approxQuantile("total_rooms", [0.75], 0)

# in one variable...
quantile = df.approxQuantile("total_rooms", [0.25, 0.5, 0.75], 0)
quantile



[1447.0, 2127.0, 3148.0]

# Some Feature Engineering 

In [10]:
# replace null values with median value for all coloumns // Note: careful of categorical features such as ocean_proximity...
# in this case we do not have missing values in this column so no value is going to be replaced!
def replace(df, column, value, quantile = None):
    if quantile != None:
        quant = df.approxQuantile(column, [0.5], 0)
        return when(column !=value, column).otherwise(quant)
    else:
        return when(column !=value, column)

df_clean = df
for column_name in df.schema.names:
    df_clean.withColumn(column_name, replace(df_clean, col(column_name), lit(None)))

In [14]:
# df_test = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["x", "y"])

# def replace(column, value):
#     return when(column != value, column).otherwise(lit(None))

# df_test.withColumn("y", replace(col("y"), "bar")).show()
df_clean.withColumn?

[0;31mSignature:[0m [0mdf_clean[0m[0;34m.[0m[0mwithColumn[0m[0;34m([0m[0mcolName[0m[0;34m,[0m [0mcol[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Returns a new :class:`DataFrame` by adding a column or replacing the
existing column that has the same name.

The column expression must be an expression over this :class:`DataFrame`; attempting to add
a column from some other :class:`DataFrame` will raise an error.

.. versionadded:: 1.3.0

Parameters
----------
colName : str
    string, name of the new column.
col : :class:`Column`
    a :class:`Column` expression for the new column.

Notes
-----
This method introduces a projection internally. Therefore, calling it multiple
times, for instance, via loops in order to add multiple columns can generate big
plans which can cause performance issues and even `StackOverflowException`.
To avoid this, use :func:`select` with the multiple columns at once.

Examples
--------
>>> df.withColumn('age2', df.age + 2).colle

In [51]:

df_clean = df
for column_name in df.schema.names: 
    df_clean = df_clean.filter(F.col(column_name).isNotNull())

In [11]:
df_clean_rdd = df_clean.rdd

In [12]:
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)
 |-- ocean_proximity: string (nullable = true)



# Split Data Into Training Data & Test Data 

In [13]:
training_data_rdd, test_data_rdd = df_clean_rdd.randomSplit(weights=[8.0,2.0], seed=1)  # raw_data_rdd.map(createLabeledPoint)


# If You Want To Convert Pyspark Dataframe To Pandas Dataframe

In [14]:
df_clean.toPandas().head()



Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.230003,37.880001,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.220001,37.860001,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.239998,37.849998,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.849998,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.849998,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


# Creating LabelPoint

In [18]:
def mapProximity(proximity):
    if (proximity == 'NEAR BAY'):
        return 1
    elif (proximity =='INLAND'):
        return 0
    
    elif (proximity == '<1H OCEAN'):
        return 1
    
    elif (proximity == 'NEAR OCEAN'):
        return 1

    elif (proximity == 'ISLAND'):
        return 0

    else:
        return 0


def createLabeledPoints(fields):

    longitude = fields[0]
    latitude = fields[1]
    housing_median_age = fields[2]
    total_rooms = fields[3]
    total_bedrooms = fields[4]
    population = fields[5]
    households = fields[6]
    median_income = fields[7]
    median_house_value = fields[8]
    ocean_proximity = mapProximity(fields[9])
    
    return LabeledPoint(ocean_proximity, np.array([longitude, 
                                                latitude, 
                                                housing_median_age, 
                                                total_rooms, 
                                                total_bedrooms,
                                                population,
                                                households,
                                                median_income,
                                                median_house_value,
                                                ocean_proximity]))



# Convert a list of raw fields from our CSV file to a
# LabeledPoint that MLLib can use. All data must be numerical...
# def createLabeledPoints1(fields, sl, label_name):

#     label_name = binary(fields[9])

#     sl.remove(label_name)

#     feature_array = np.array(sl)
    
#     return LabeledPoint(sl[label_name], feature_array)


# df_clean_rdd.map(lambda x: createLabeledPoints1(fields= x, sl=df_clean.schema.names, label_name='ocean_proximity'))

In [19]:
training_data, test_data = training_data_rdd.map(createLabeledPoints), test_data_rdd.map(createLabeledPoints)

In [17]:
test_data.collect()



[LabeledPoint(1.0, [-122.22000122070312,37.86000061035156,21.0,7099.0,1106.0,2401.0,1138.0,8.301400184631348,358500.0,1.0]),
 LabeledPoint(1.0, [-122.25,37.84000015258789,52.0,3104.0,687.0,1157.0,647.0,3.119999885559082,241400.0,1.0]),
 LabeledPoint(1.0, [-122.26000213623047,37.849998474121094,52.0,2643.0,626.0,1212.0,620.0,1.916700005531311,159200.0,1.0]),
 LabeledPoint(1.0, [-122.2699966430664,37.849998474121094,52.0,1966.0,347.0,793.0,331.0,2.7750000953674316,152500.0,1.0]),
 LabeledPoint(1.0, [-122.2699966430664,37.849998474121094,40.0,751.0,184.0,409.0,166.0,1.357800006866455,147500.0,1.0]),
 LabeledPoint(1.0, [-122.27999877929688,37.849998474121094,41.0,535.0,123.0,317.0,119.0,2.4038000106811523,107500.0,1.0]),
 LabeledPoint(1.0, [-122.27999877929688,37.849998474121094,49.0,1130.0,244.0,607.0,239.0,2.459700107574463,93800.0,1.0]),
 LabeledPoint(1.0, [-122.27999877929688,37.84000015258789,50.0,2082.0,492.0,1131.0,473.0,1.6424000263214111,108900.0,1.0]),
 LabeledPoint(1.0, [-122.27

In [20]:
example =  [np.array([-122.22000122070312,37.86000061035156,21.0,7099.0,1106.0,2401.0,1138.0,8.301400184631348,358500.0,0.0])]
test_example = spark.sparkContext.parallelize(example)

In [21]:
model = DecisionTree.trainClassifier(training_data, numClasses=2,
                                     categoricalFeaturesInfo={9:2},
                                     impurity='gini', maxDepth=5, maxBins=14)



In [22]:
model.predict(test_example).collect()

[0.0]

In [23]:
print('Learned classification tree model:')
print(model.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 1 with 3 nodes
  If (feature 9 in {0.0})
   Predict: 0.0
  Else (feature 9 not in {0.0})
   Predict: 1.0



# DecsionTreeClassifier Approach 

In [24]:
df_2 = spark.read.csv(FULL_PATH, header = True, inferSchema = True)



In [28]:
list_names = df_2.schema.names

list_names.remove("ocean_proximity")

In [29]:
list_names

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [31]:
# # replace null values with median value for all coloumns // Note: careful of categorical features such as ocean_proximity...
# # in this case we do not have missing values in this column so no value is going to be replaced!
# def replace(df, column, value, quantile = None):
#     if quantile != None:
#         quant = df.approxQuantile(column, [0.5], 0)
#         return when(column !=value, column).otherwise(quant)
#     else:
#         return when(column !=value, column)

# for column_name in df.schema.names:
#     df_2.withColumn(column_name, replace(df_clean, col(column_name), lit(None)))

In [63]:
df_clean_2 = df_2
all_feature_names = [name for name in df_2.schema.names]
outcome_label = "ocean_proximity"
for column_name in all_feature_names:
    df_clean_2 = df_clean_2.filter(F.col(column_name).isNotNull())

In [34]:
df_clean_2 = df_2
all_feature_names = [name for name in df_2.schema.names]



In [35]:
data= df_clean_2.na.replace(['NEAR BAY', 'INLAND','<1H OCEAN', 'NEAR OCEAN', 'ISLAND'], ['1','0','1','1','0'], 'ocean_proximity')
data = data.withColumn("ocean_proximity", data.ocean_proximity.cast('int'))

In [41]:
#

In [68]:
train_features = [feature for feature in all_feature_names if feature !='ocean_proximity']
print(train_features)

['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value']


In [73]:
assembler = VectorAssembler(inputCols=train_features, outputCol='features')
# assembled = assembler.transform(data)

In [75]:
# temp = assembler.transform(data).show()

In [45]:
# # standarize the data

# scale = StandardScaler(inputCol='features',outputCol='standardized_train_features')

# scaled_data = scale.fit(assembled)


In [76]:
transformed_data = assembler.transform(data)
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])


In [61]:
# (training_data, test_data) = data.randomSplit([0.8,0.2])


In [79]:
rf = RandomForestClassifier(labelCol='ocean_proximity', 
                            featuresCol='features',
                            maxDepth=5)

In [80]:
evaluator = BinaryClassificationEvaluator()
model = rf.fit(training_data)


21/07/17 17:53:27 ERROR Executor: Exception in task 0.0 in stage 19.0 (TID 16)
org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3228/0x0000000801690040: (struct<longitude:double,latitude:double,housing_median_age:double,total_rooms:double,total_bedrooms:double,population:double,households:double,median_income:double,median_house_value:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at or

Py4JJavaError: An error occurred while calling o743.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 16) (192.168.0.155 executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3228/0x0000000801690040: (struct<longitude:double,latitude:double,housing_median_age:double,total_rooms:double,total_bedrooms:double,population:double,households:double,median_income:double,median_house_value:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:390)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.ml.classification.Classifier.getNumClasses(Classifier.scala:146)
	at org.apache.spark.ml.classification.RandomForestClassifier.$anonfun$train$1(RandomForestClassifier.scala:143)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:138)
	at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$3228/0x0000000801690040: (struct<longitude:double,latitude:double,housing_median_age:double,total_rooms:double,total_bedrooms:double,population:double,households:double,median_income:double,median_house_value:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 18 more


In [64]:
# Predict with the test dataset
rf_predictions = model.transform(test_data)

AttributeError: 'DecisionTreeModel' object has no attribute 'transform'

# Evaluate DecisionTreeClassifier


In [76]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'ocean_proximity', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))

Random Forest classifier Accuracy: 0.8980702622464126


