In [85]:
# ! pip install pyspark

In [11]:
# IMporting the libraries
import pyspark


# Use 3 features:  'Type', 'Age', 'Breed1'

## Using pyspark to read the data and process it

In [10]:
# To work with spark we need to create a spark session
# Need to instal java
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local=[*]").appName('petfinder').getOrCreate()

########### For the train dataset
# Read a dataset with spark
df_spark = spark.read.csv('./train_balanced_corr.csv', header=True, inferSchema=True)
# Header = True, inferSchema = True, means that the first row is the header and the schema is inferred (if schema is not inferred, all columns will be read as string)
# Convert the column "AdoptionSpeed" to integer
df_spark = df_spark.withColumn("AdoptionSpeed", df_spark["AdoptionSpeed"].cast("integer"))


############ For the test dataset
# Read a dataset with spark
df_spark_test = spark.read.csv('./test_split_corr.csv', header=True, inferSchema=True)
# Header = True, inferSchema = True, means that the first row is the header and the schema is inferred (if schema is not inferred, all columns will be read as string)
# Convert the column "AdoptionSpeed" to integer
df_spark_test = df_spark_test.withColumn("AdoptionSpeed", df_spark_test["AdoptionSpeed"].cast("integer"))


## Print size of the data
print("Size of the training data: ", df_spark.count())
print("Size of the test data: ", df_spark_test.count())

Size of the training data:  16870
Size of the test data:  2999


## Data cleaning

In [2]:

########### For the train dataset

## Drop rows with missing values
# df_spark.na.drop(how='all', thresh=10).show() 
    ### how='any' means drop rows with any missing value, how='all' means drop rows whose all values are missing
    ### thresh=10 means drop rows whose number of missing values is greater than 10
    ### subset=['Age'] means drop rows whose 'Age' value is missing
df_spark = df_spark.na.drop(how= 'any' , subset=['AdoptionSpeed'])
## Fill missing values with mean
from pyspark.sql.functions import mean
mean_val = df_spark.select(mean(df_spark['Age'])).collect()
mean_age = mean_val[0][0]
df_spark.na.fill(mean_age, subset=['Age']).show()

########### For the test dataset

df_spark_test = df_spark_test.na.drop(how= 'any' , subset=['AdoptionSpeed'])
## Fill missing values with mean
from pyspark.sql.functions import mean
mean_val = df_spark_test.select(mean(df_spark_test['Age'])).collect()
mean_age = mean_val[0][0]
df_spark_test.na.fill(mean_age, subset=['Age']).show()




+------+---+----+-------------+
|Breed1|Age|Type|AdoptionSpeed|
+------+---+----+-------------+
|   307|  2|   1|            1|
|   307| 36|   1|            4|
|   179|  2|   1|            1|
|   265| 27|   2|            4|
|   307|  2|   1|            1|
|   266| 29|   2|            4|
|    83| 36|   1|            1|
|   307| 24|   1|            3|
|   307| 21|   1|            4|
|   307| 29|   1|            3|
|   307|  3|   1|            2|
|    60|120|   1|            2|
|   145| 18|   1|            2|
|   254| 24|   2|            3|
|   205| 36|   1|            2|
|   307|  2|   1|            3|
|   266| 12|   2|            3|
|   307|  3|   1|            2|
|   283| 48|   2|            4|
|   265|  3|   2|            2|
+------+---+----+-------------+
only showing top 20 rows

+------+---+----+-------------+
|Breed1|Age|Type|AdoptionSpeed|
+------+---+----+-------------+
|   265|  7|   2|            4|
|   266| 24|   2|            4|
|   266| 12|   2|            2|
|   195| 60|  

# Using PySpark MLlib to build the model

In [3]:
# First, collect the features in a single column

from pyspark.ml.feature import VectorAssembler

#### For the train dataset
featureassemble = VectorAssembler(inputCols=['Breed1','Age','Type'], outputCol='features')
output = featureassemble.transform(df_spark) # This will create a new column called 'features' which is a vector of the selected columns (Type, Age2, Breed1) by the VectorAssembler
output.show()

#### For the test dataset
testfeatureassemble = VectorAssembler(inputCols=['Breed1','Age','Type'], outputCol='features')
testoutput = testfeatureassemble.transform(df_spark_test) # This will create a new column called 'features' which is a vector of the selected columns (Type, Age2, Breed1) by the VectorAssembler
testoutput.show()


+------+---+----+-------------+----------------+
|Breed1|Age|Type|AdoptionSpeed|        features|
+------+---+----+-------------+----------------+
|   307|  2|   1|            1| [307.0,2.0,1.0]|
|   307| 36|   1|            4|[307.0,36.0,1.0]|
|   179|  2|   1|            1| [179.0,2.0,1.0]|
|   265| 27|   2|            4|[265.0,27.0,2.0]|
|   307|  2|   1|            1| [307.0,2.0,1.0]|
|   266| 29|   2|            4|[266.0,29.0,2.0]|
|    83| 36|   1|            1| [83.0,36.0,1.0]|
|   307| 24|   1|            3|[307.0,24.0,1.0]|
|   307| 21|   1|            4|[307.0,21.0,1.0]|
|   307| 29|   1|            3|[307.0,29.0,1.0]|
|   307|  3|   1|            2| [307.0,3.0,1.0]|
|    60|120|   1|            2|[60.0,120.0,1.0]|
|   145| 18|   1|            2|[145.0,18.0,1.0]|
|   254| 24|   2|            3|[254.0,24.0,2.0]|
|   205| 36|   1|            2|[205.0,36.0,1.0]|
|   307|  2|   1|            3| [307.0,2.0,1.0]|
|   266| 12|   2|            3|[266.0,12.0,2.0]|
|   307|  3|   1|   

In [4]:
# Select the features and the target column

#### For the train dataset
finalized_data = output.select('features', 'AdoptionSpeed') # Select the features and the target column
finalized_data.show()

#### For the test dataset
testfinalized_data = testoutput.select('features', 'AdoptionSpeed') # Select the features and the target column
testfinalized_data.show()


+----------------+-------------+
|        features|AdoptionSpeed|
+----------------+-------------+
| [307.0,2.0,1.0]|            1|
|[307.0,36.0,1.0]|            4|
| [179.0,2.0,1.0]|            1|
|[265.0,27.0,2.0]|            4|
| [307.0,2.0,1.0]|            1|
|[266.0,29.0,2.0]|            4|
| [83.0,36.0,1.0]|            1|
|[307.0,24.0,1.0]|            3|
|[307.0,21.0,1.0]|            4|
|[307.0,29.0,1.0]|            3|
| [307.0,3.0,1.0]|            2|
|[60.0,120.0,1.0]|            2|
|[145.0,18.0,1.0]|            2|
|[254.0,24.0,2.0]|            3|
|[205.0,36.0,1.0]|            2|
| [307.0,2.0,1.0]|            3|
|[266.0,12.0,2.0]|            3|
| [307.0,3.0,1.0]|            2|
|[283.0,48.0,2.0]|            4|
| [265.0,3.0,2.0]|            2|
+----------------+-------------+
only showing top 20 rows

+----------------+-------------+
|        features|AdoptionSpeed|
+----------------+-------------+
| [265.0,7.0,2.0]|            4|
|[266.0,24.0,2.0]|            4|
|[266.0,12.0,2.0]

### 1. Logistic Regression

In [6]:
from pyspark.ml.classification import LogisticRegression
# Split the data into training and validation data
train_data = finalized_data
classifier = LogisticRegression
classifier = LogisticRegression(labelCol='AdoptionSpeed').fit(train_data) # Fit the model


test_data = testfinalized_data
results = classifier.evaluate(test_data) # Evaluate the model on the validation data
results.predictions.show() # Show the predictions
results.predictions.select('AdoptionSpeed', 'prediction').show() # Show the target and the prediction

# Want to show the f1 score and confusion matrix
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="AdoptionSpeed", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(results.predictions)
print("F1 score: %.3f" % f1_score)

# Confusion matrix
results.predictions.crosstab('AdoptionSpeed', 'prediction').show()



+----------------+-------------+--------------------+--------------------+----------+
|        features|AdoptionSpeed|       rawPrediction|         probability|prediction|
+----------------+-------------+--------------------+--------------------+----------+
| [265.0,7.0,2.0]|            4|[0.21852222769436...|[0.24594501729175...|       0.0|
|[266.0,24.0,2.0]|            4|[0.21091546173330...|[0.24452052550620...|       0.0|
|[266.0,12.0,2.0]|            2|[0.21328101316232...|[0.24531022067589...|       0.0|
|[195.0,60.0,1.0]|            1|[0.02040640887454...|[0.19880089651182...|       4.0|
| [266.0,3.0,2.0]|            4|[0.21505517673408...|[0.24453461523033...|       0.0|
|[307.0,12.0,1.0]|            2|[-0.4467550129775...|[0.12267623478828...|       4.0|
|[218.0,16.0,1.0]|            4|[-0.0687979689279...|[0.18565850870324...|       3.0|
|[285.0,24.0,2.0]|            3|[0.13005966777085...|[0.22530280619895...|       4.0|
|[266.0,36.0,2.0]|            4|[0.20854991030428...|[

### 2. Decision Tree

In [7]:
from pyspark.ml.classification import DecisionTreeClassifier
# Split the data into training and validation data
train_data = finalized_data
classifier = DecisionTreeClassifier( labelCol='AdoptionSpeed', featuresCol='features')
classifier = classifier.fit(train_data) # Fit the model


test_data = testfinalized_data
results = classifier.transform(test_data) # Evaluate the model on the validation data
results.show() # Show the predictions
results.select('AdoptionSpeed', 'prediction').show() # Show the target and the prediction

# Want to show the f1 score and confusion matrix
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="AdoptionSpeed", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(results)
print("F1 score: %.3f" % f1_score)

# Confusion matrix
results.crosstab('AdoptionSpeed', 'prediction').show()



+----------------+-------------+--------------------+--------------------+----------+
|        features|AdoptionSpeed|       rawPrediction|         probability|prediction|
+----------------+-------------+--------------------+--------------------+----------+
| [265.0,7.0,2.0]|            4|[388.0,258.0,317....|[0.19715447154471...|       4.0|
|[266.0,24.0,2.0]|            4|[388.0,258.0,317....|[0.19715447154471...|       4.0|
|[266.0,12.0,2.0]|            2|[388.0,258.0,317....|[0.19715447154471...|       4.0|
|[195.0,60.0,1.0]|            1|[380.0,195.0,147....|[0.39915966386554...|       0.0|
| [266.0,3.0,2.0]|            4|[475.0,657.0,581....|[0.19467213114754...|       1.0|
|[307.0,12.0,1.0]|            2|[163.0,156.0,291....|[0.08101391650099...|       4.0|
|[218.0,16.0,1.0]|            4|[388.0,258.0,317....|[0.19715447154471...|       4.0|
|[285.0,24.0,2.0]|            3|[214.0,128.0,98.0...|[0.31845238095238...|       0.0|
|[266.0,36.0,2.0]|            4|[388.0,258.0,317....|[

### 3. Random Forest

In [8]:
from pyspark.ml.classification import RandomForestClassifier

train_data = finalized_data
classifier = RandomForestClassifier(labelCol='AdoptionSpeed', featuresCol='features')
classifier = classifier.fit(train_data) # Fit the model


test_data = testfinalized_data
results = classifier.transform(test_data) # Evaluate the model on the validation data
results.show() # Show the predictions
results.select('AdoptionSpeed', 'prediction').show() # Show the target and the prediction

# Want to show the f1 score and confusion matrix
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="AdoptionSpeed", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(results)
print("F1 score: %.3f" % f1_score)

# Confusion matrix
results.crosstab('AdoptionSpeed', 'prediction').show()



+----------------+-------------+--------------------+--------------------+----------+
|        features|AdoptionSpeed|       rawPrediction|         probability|prediction|
+----------------+-------------+--------------------+--------------------+----------+
| [265.0,7.0,2.0]|            4|[4.66462762344093...|[0.23323138117204...|       4.0|
|[266.0,24.0,2.0]|            4|[3.90832354413122...|[0.19541617720656...|       4.0|
|[266.0,12.0,2.0]|            2|[3.80982603011887...|[0.19049130150594...|       4.0|
|[195.0,60.0,1.0]|            1|[6.90513437153496...|[0.34525671857674...|       0.0|
| [266.0,3.0,2.0]|            4|[4.14015478728268...|[0.20700773936413...|       1.0|
|[307.0,12.0,1.0]|            2|[1.87071354752533...|[0.09353567737626...|       4.0|
|[218.0,16.0,1.0]|            4|[4.40701208522221...|[0.22035060426111...|       4.0|
|[285.0,24.0,2.0]|            3|[4.99401987424813...|[0.24970099371240...|       0.0|
|[266.0,36.0,2.0]|            4|[3.63240680226321...|[

### 4. Naive Bayes

In [9]:
from pyspark.ml.classification import NaiveBayes

train_data = finalized_data
classifier = NaiveBayes(labelCol='AdoptionSpeed', featuresCol='features')
classifier = classifier.fit(train_data) # Fit the model


test_data = testfinalized_data
results = classifier.transform(test_data) # Evaluate the model on the validation data
results.show() # Show the predictions
results.select('AdoptionSpeed', 'prediction').show() # Show the target and the prediction

# Want to show the f1 score and confusion matrix
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="AdoptionSpeed", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(results)
print("F1 score: %.3f" % f1_score)

# Confusion matrix
results.crosstab('AdoptionSpeed', 'prediction').show()


+----------------+-------------+--------------------+--------------------+----------+
|        features|AdoptionSpeed|       rawPrediction|         probability|prediction|
+----------------+-------------+--------------------+--------------------+----------+
| [265.0,7.0,2.0]|            4|[-46.974540850384...|[0.14402667045828...|       2.0|
|[266.0,24.0,2.0]|            4|[-101.24812790298...|[0.17873348172516...|       4.0|
|[266.0,12.0,2.0]|            2|[-62.971521419971...|[0.25457037136800...|       0.0|
|[195.0,60.0,1.0]|            1|[-207.53567000969...|[0.00110149909586...|       4.0|
| [266.0,3.0,2.0]|            4|[-34.264066557712...|[0.06978615073014...|       2.0|
|[307.0,12.0,1.0]|            2|[-59.849431997550...|[0.19599104897496...|       3.0|
|[218.0,16.0,1.0]|            4|[-68.301187210301...|[0.24240190122342...|       4.0|
|[285.0,24.0,2.0]|            3|[-102.16762406796...|[0.19066671534541...|       4.0|
|[266.0,36.0,2.0]|            4|[-139.52473438599...|[

## Using Map Reduce to build Naive Bayes model

In [29]:
# To work with spark we need to create a spark session
# Need to instal java
from pyspark.sql import SparkSession
rdd_spark_session = SparkSession.builder.master("local=[*]").config("spark.driver.memory", "15g").appName('petfinderMapReduce').getOrCreate()

# Read the data
df_spark = rdd_spark_session.read.csv('./train_balanced_corr.csv', header=True, inferSchema=True)

# Cast the column "AdoptionSpeed" to integer
df_spark = df_spark.withColumn("AdoptionSpeed", df_spark["AdoptionSpeed"].cast("integer"))

df_spark = df_spark.na.drop(how= 'any' , subset=['AdoptionSpeed'])

# Convert the data to RDD
rdd_spark = df_spark.rdd

# Map the data to count the number of each class
rdd_spark_map = rdd_spark.map(lambda x: (x[3], 1))
# To work with spark we need to create a spark session

In [32]:

# Print the result
rdd_spark_map.collectAsMap()


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 219.0 failed 1 times, most recent failure: Lost task 0.0 in stage 219.0 (TID 171) (LAPTOP-0IJ6E4KR executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:192)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:179)
	... 15 more
