In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#import numpy
# Load training data
from pyspark.ml.linalg import SparseVector
# from pyspark.python.pyspark.shell import spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [3]:
data = spark.read.load("adult_data.csv", format="csv", header=True, delimiter=",")
data.show(5)

+---+------+---+---+----+---+---+
|  1|     3|  5| 10|  11| 12| 13|
+---+------+---+---+----+---+---+
| 39| 77516| 13|  1|2174|  0| 40|
| 50| 83311| 13|  1|   0|  0| 13|
| 38|215646|  9|  1|   0|  0| 40|
| 53|234721|  7|  1|   0|  0| 40|
| 28|338409| 13|  2|   0|  0| 40|
+---+------+---+---+----+---+---+
only showing top 5 rows



In [4]:
from pyspark.sql.types import *

# Change column type
data = data.withColumn("1", data["1"].cast(IntegerType()))
data = data.withColumn("3", data["3"].cast(IntegerType()))
data = data.withColumn("5", data["5"].cast(IntegerType()))
data = data.withColumn("10", data["10"].cast(IntegerType()))
data = data.withColumn("11", data["11"].cast(IntegerType()))
data = data.withColumn("12", data["12"].cast(IntegerType()))
data = data.withColumn("13", data["13"].cast(IntegerType()))

data.printSchema()

root
 |-- 1: integer (nullable = true)
 |-- 3: integer (nullable = true)
 |-- 5: integer (nullable = true)
 |-- 10: integer (nullable = true)
 |-- 11: integer (nullable = true)
 |-- 12: integer (nullable = true)
 |-- 13: integer (nullable = true)



In [5]:
data = data.withColumn("label", data['10'] - 0)
data.show(5)

+---+------+---+---+----+---+---+-----+
|  1|     3|  5| 10|  11| 12| 13|label|
+---+------+---+---+----+---+---+-----+
| 39| 77516| 13|  1|2174|  0| 40|    1|
| 50| 83311| 13|  1|   0|  0| 13|    1|
| 38|215646|  9|  1|   0|  0| 40|    1|
| 53|234721|  7|  1|   0|  0| 40|    1|
| 28|338409| 13|  2|   0|  0| 40|    2|
+---+------+---+---+----+---+---+-----+
only showing top 5 rows



In [6]:
assem = VectorAssembler(inputCols=data.columns[0:7], outputCol='features')
data = assem.transform(data)
data.show(5)

+---+------+---+---+----+---+---+-----+--------------------+
|  1|     3|  5| 10|  11| 12| 13|label|            features|
+---+------+---+---+----+---+---+-----+--------------------+
| 39| 77516| 13|  1|2174|  0| 40|    1|[39.0,77516.0,13....|
| 50| 83311| 13|  1|   0|  0| 13|    1|[50.0,83311.0,13....|
| 38|215646|  9|  1|   0|  0| 40|    1|[38.0,215646.0,9....|
| 53|234721|  7|  1|   0|  0| 40|    1|[53.0,234721.0,7....|
| 28|338409| 13|  2|   0|  0| 40|    2|[28.0,338409.0,13...|
+---+------+---+---+----+---+---+-----+--------------------+
only showing top 5 rows



In [7]:
# Split the data into train and test
train,test = data.randomSplit([0.6, 0.4], 1234)

In [8]:
# create the trainer and set its parameters
nb1 = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model1 = nb1.fit(train)

# select example rows to display.
predictions = model1.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5| 10|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|  2|   0|  0| 25|    2|[17.0,19752.0,7.0...|[-603.83024152897...|[3.55017080576790...|       1.0|
| 17|24090|  9|  2|   0|  0| 35|    2|[17.0,24090.0,9.0...|[-741.82504306404...|[1.30388265806149...|       1.0|
| 17|25051|  6|  1|   0|  0| 16|    1|[17.0,25051.0,6.0...|[-547.62221373539...|[8.55480582189268...|       1.0|
| 17|29571|  8|  1|   0|  0| 15|    1|[17.0,29571.0,8.0...|[-594.41135826275...|[4.22658921812652...|       1.0|
| 17|31007|  6|  2|   0|  0| 30|    2|[17.0,31007.0,6.0...|[-724.46084085428...|[6.78632746005661...|       1.0|
| 17|32607|  6|  1|   0|  0| 20|    1|[17.0,32607.0,6.0...|[-640.64253400357...|[1.1532817128748

In [9]:
# create the trainer and set its parameters
nb2 = NaiveBayes(smoothing=10.0, modelType="multinomial")

# train the model
model2 = nb2.fit(train)

# select example rows to display.
predictions = model2.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5| 10|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|  2|   0|  0| 25|    2|[17.0,19752.0,7.0...|[-603.82833255900...|[3.54425060120333...|       1.0|
| 17|24090|  9|  2|   0|  0| 35|    2|[17.0,24090.0,9.0...|[-741.82292742258...|[1.30136607200351...|       1.0|
| 17|25051|  6|  1|   0|  0| 16|    1|[17.0,25051.0,6.0...|[-547.62132541304...|[8.54402615287241...|       1.0|
| 17|29571|  8|  1|   0|  0| 15|    1|[17.0,29571.0,8.0...|[-594.41044627901...|[4.22120922120825...|       1.0|
| 17|31007|  6|  2|   0|  0| 30|    2|[17.0,31007.0,6.0...|[-724.45916112928...|[6.77654354208634...|       1.0|
| 17|32607|  6|  1|   0|  0| 20|    1|[17.0,32607.0,6.0...|[-640.64174307001...|[1.1519318472885

In [10]:
from pyspark.ml.classification import DecisionTreeClassifier

# create the trainer and set its parameters
nb3 = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+---+----+---+---+-----+--------------------+-----------------+-------------+----------+
|  1|    3|  5| 10|  11| 12| 13|label|            features|    rawPrediction|  probability|prediction|
+---+-----+---+---+----+---+---+-----+--------------------+-----------------+-------------+----------+
| 17|19752|  7|  2|   0|  0| 25|    2|[17.0,19752.0,7.0...| [0.0,0.0,6409.0]|[0.0,0.0,1.0]|       2.0|
| 17|24090|  9|  2|   0|  0| 35|    2|[17.0,24090.0,9.0...| [0.0,0.0,6409.0]|[0.0,0.0,1.0]|       2.0|
| 17|25051|  6|  1|   0|  0| 16|    1|[17.0,25051.0,6.0...|[0.0,13008.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|29571|  8|  1|   0|  0| 15|    1|[17.0,29571.0,8.0...|[0.0,13008.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|31007|  6|  2|   0|  0| 30|    2|[17.0,31007.0,6.0...| [0.0,0.0,6409.0]|[0.0,0.0,1.0]|       2.0|
| 17|32607|  6|  1|   0|  0| 20|    1|[17.0,32607.0,6.0...|[0.0,13008.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|34019|  6|  1|   0|  0| 20|    1|[17.0,34019.0,6.0...|[0.0,13008.0,0

In [11]:
from pyspark.ml.classification import RandomForestClassifier

# create the trainer and set its parameters
nb3 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5| 10|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|  2|   0|  0| 25|    2|[17.0,19752.0,7.0...|[0.0,0.7914942528...|[0.0,0.0791494252...|       2.0|
| 17|24090|  9|  2|   0|  0| 35|    2|[17.0,24090.0,9.0...|[0.0,0.3448275862...|[0.0,0.0344827586...|       2.0|
| 17|25051|  6|  1|   0|  0| 16|    1|[17.0,25051.0,6.0...|[0.0,8.7914942528...|[0.0,0.8791494252...|       1.0|
| 17|29571|  8|  1|   0|  0| 15|    1|[17.0,29571.0,8.0...|[0.0,8.7914942528...|[0.0,0.8791494252...|       1.0|
| 17|31007|  6|  2|   0|  0| 30|    2|[17.0,31007.0,6.0...|[0.0,0.7914942528...|[0.0,0.0791494252...|       2.0|
| 17|32607|  6|  1|   0|  0| 20|    1|[17.0,32607.0,6.0...|[0.0,8.7914942528...|[0.0,0.879149425

In [12]:
from pyspark.ml.classification import RandomForestClassifier

# create the trainer and set its parameters
nb3 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5| 10|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|  2|   0|  0| 25|    2|[17.0,19752.0,7.0...|[0.0,3.3481437437...|[0.0,0.0334814374...|       2.0|
| 17|24090|  9|  2|   0|  0| 35|    2|[17.0,24090.0,9.0...|[0.0,2.8062093469...|[0.0,0.0280620934...|       2.0|
| 17|25051|  6|  1|   0|  0| 16|    1|[17.0,25051.0,6.0...|[0.0,96.348143743...|[0.0,0.9634814374...|       1.0|
| 17|29571|  8|  1|   0|  0| 15|    1|[17.0,29571.0,8.0...|[0.0,96.223852213...|[0.0,0.9622385221...|       1.0|
| 17|31007|  6|  2|   0|  0| 30|    2|[17.0,31007.0,6.0...|[0.0,3.3481437437...|[0.0,0.0334814374...|       2.0|
| 17|32607|  6|  1|   0|  0| 20|    1|[17.0,32607.0,6.0...|[0.0,96.348143743...|[0.0,0.963481437

In [13]:
from pyspark.ml.classification import RandomForestClassifier

# create the trainer and set its parameters
nb3 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=1)

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+---+----+---+---+-----+--------------------+-------------+-------------+----------+
|  1|    3|  5| 10|  11| 12| 13|label|            features|rawPrediction|  probability|prediction|
+---+-----+---+---+----+---+---+-----+--------------------+-------------+-------------+----------+
| 17|19752|  7|  2|   0|  0| 25|    2|[17.0,19752.0,7.0...|[0.0,0.0,1.0]|[0.0,0.0,1.0]|       2.0|
| 17|24090|  9|  2|   0|  0| 35|    2|[17.0,24090.0,9.0...|[0.0,0.0,1.0]|[0.0,0.0,1.0]|       2.0|
| 17|25051|  6|  1|   0|  0| 16|    1|[17.0,25051.0,6.0...|[0.0,1.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|29571|  8|  1|   0|  0| 15|    1|[17.0,29571.0,8.0...|[0.0,1.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|31007|  6|  2|   0|  0| 30|    2|[17.0,31007.0,6.0...|[0.0,0.0,1.0]|[0.0,0.0,1.0]|       2.0|
| 17|32607|  6|  1|   0|  0| 20|    1|[17.0,32607.0,6.0...|[0.0,1.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|34019|  6|  1|   0|  0| 20|    1|[17.0,34019.0,6.0...|[0.0,1.0,0.0]|[0.0,1.0,0.0]|       1.0|
| 17|47199