In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#import numpy
# Load training data
from pyspark.ml.linalg import SparseVector
# from pyspark.python.pyspark.shell import spark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [4]:
data = spark.read.load("adult.csv", format="csv", header=True, delimiter=",")
data.show(5)

+---+------+---+---+----+---+---+
|  1|     3|  5| 10|  11| 12| 13|
+---+------+---+---+----+---+---+
| 39| 77516| 13|  1|2174|  0| 40|
| 50| 83311| 13|  1|   0|  0| 13|
| 38|215646|  9|  1|   0|  0| 40|
| 53|234721|  7|  1|   0|  0| 40|
| 28|338409| 13|  2|   0|  0| 40|
+---+------+---+---+----+---+---+
only showing top 5 rows



In [5]:

from pyspark.sql.types import *

# Change column type
data = data.withColumn("1", data["1"].cast(IntegerType()))
data = data.withColumn("3", data["3"].cast(IntegerType()))
data = data.withColumn("5", data["5"].cast(IntegerType()))
data = data.withColumn("10", data["10"].cast(IntegerType()))
data = data.withColumn("11", data["11"].cast(IntegerType()))
data = data.withColumn("12", data["12"].cast(IntegerType()))
data = data.withColumn("13", data["13"].cast(IntegerType()))

data.printSchema()

root
 |-- 1: integer (nullable = true)
 |-- 3: integer (nullable = true)
 |-- 5: integer (nullable = true)
 |-- 10: integer (nullable = true)
 |-- 11: integer (nullable = true)
 |-- 12: integer (nullable = true)
 |-- 13: integer (nullable = true)



In [6]:
data = data.withColumn("label", data['10'] - 0)
data=data.drop('10')
data.show(5)

+---+------+---+----+---+---+-----+
|  1|     3|  5|  11| 12| 13|label|
+---+------+---+----+---+---+-----+
| 39| 77516| 13|2174|  0| 40|    1|
| 50| 83311| 13|   0|  0| 13|    1|
| 38|215646|  9|   0|  0| 40|    1|
| 53|234721|  7|   0|  0| 40|    1|
| 28|338409| 13|   0|  0| 40|    2|
+---+------+---+----+---+---+-----+
only showing top 5 rows



In [7]:
assem = VectorAssembler(inputCols=data.columns[0:5], outputCol='features')
data = assem.transform(data)
data.show(5,False)

+---+------+---+----+---+---+-----+------------------------------+
|1  |3     |5  |11  |12 |13 |label|features                      |
+---+------+---+----+---+---+-----+------------------------------+
|39 |77516 |13 |2174|0  |40 |1    |[39.0,77516.0,13.0,2174.0,0.0]|
|50 |83311 |13 |0   |0  |13 |1    |[50.0,83311.0,13.0,0.0,0.0]   |
|38 |215646|9  |0   |0  |40 |1    |[38.0,215646.0,9.0,0.0,0.0]   |
|53 |234721|7  |0   |0  |40 |1    |[53.0,234721.0,7.0,0.0,0.0]   |
|28 |338409|13 |0   |0  |40 |2    |[28.0,338409.0,13.0,0.0,0.0]  |
+---+------+---+----+---+---+-----+------------------------------+
only showing top 5 rows



In [8]:
# Split the data into train and test
train,test = data.randomSplit([0.6, 0.4], 1234)

In [9]:
nb1 = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model1 = nb1.fit(train)

# select example rows to display.
predictions = model1.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|   0|  0| 25|    2|[17.0,19752.0,7.0...|[-364.50212722234...|[1.07683597450422...|       1.0|
| 17|24090|  9|   0|  0| 35|    2|[17.0,24090.0,9.0...|[-417.30549539174...|[1.27840195771681...|       1.0|
| 17|25051|  6|   0|  0| 16|    1|[17.0,25051.0,6.0...|[-395.06155607223...|[4.14942129598385...|       1.0|
| 17|29571|  8|   0|  0| 15|    1|[17.0,29571.0,8.0...|[-449.25310348026...|[2.52963334297250...|       1.0|
| 17|31007|  6|   0|  0| 30|    2|[17.0,31007.0,6.0...|[-440.49010302435...|[1.39871076585049...|       1.0|
| 17|32607|  6|   0|  0| 20|    1|[17.0,32607.0,6.0...|[-452.69387655078...|[3.99145877375918...|       1.0|
| 17|34019|  6|   0

In [10]:
from pyspark.ml.classification import DecisionTreeClassifier

# create the trainer and set its parameters
nb3 = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+----+---+---+-----+--------------------+---------------+--------------------+----------+
|  1|    3|  5|  11| 12| 13|label|            features|  rawPrediction|         probability|prediction|
+---+-----+---+----+---+---+-----+--------------------+---------------+--------------------+----------+
| 17|19752|  7|   0|  0| 25|    2|[17.0,19752.0,7.0...|[0.0,67.0,76.0]|[0.0,0.4685314685...|       2.0|
| 17|24090|  9|   0|  0| 35|    2|[17.0,24090.0,9.0...|[0.0,67.0,76.0]|[0.0,0.4685314685...|       2.0|
| 17|25051|  6|   0|  0| 16|    1|[17.0,25051.0,6.0...|[0.0,67.0,76.0]|[0.0,0.4685314685...|       2.0|
| 17|29571|  8|   0|  0| 15|    1|[17.0,29571.0,8.0...|[0.0,67.0,76.0]|[0.0,0.4685314685...|       2.0|
| 17|31007|  6|   0|  0| 30|    2|[17.0,31007.0,6.0...|[0.0,67.0,76.0]|[0.0,0.4685314685...|       2.0|
| 17|32607|  6|   0|  0| 20|    1|[17.0,32607.0,6.0...|[0.0,67.0,76.0]|[0.0,0.4685314685...|       2.0|
| 17|34019|  6|   0|  0| 20|    1|[17.0,34019.0,6.0...|[0.0,67.0

In [11]:
from pyspark.ml.classification import RandomForestClassifier

# create the trainer and set its parameters
nb3 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))


+---+-----+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|   0|  0| 25|    2|[17.0,19752.0,7.0...|[0.0,55.037599709...|[0.0,0.5503759970...|       1.0|
| 17|24090|  9|   0|  0| 35|    2|[17.0,24090.0,9.0...|[0.0,52.028434326...|[0.0,0.5202843432...|       1.0|
| 17|25051|  6|   0|  0| 16|    1|[17.0,25051.0,6.0...|[0.0,55.125359930...|[0.0,0.5512535993...|       1.0|
| 17|29571|  8|   0|  0| 15|    1|[17.0,29571.0,8.0...|[0.0,53.064942368...|[0.0,0.5306494236...|       1.0|
| 17|31007|  6|   0|  0| 30|    2|[17.0,31007.0,6.0...|[0.0,55.125359930...|[0.0,0.5512535993...|       1.0|
| 17|32607|  6|   0|  0| 20|    1|[17.0,32607.0,6.0...|[0.0,58.323007981...|[0.0,0.5832300798...|       1.0|
| 17|34019|  6|   0

In [12]:

from pyspark.ml.classification import RandomForestClassifier

# create the trainer and set its parameters
nb3 = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=1)

# train the model
model3 = nb3.fit(train)

# select example rows to display.
predictions = model3.transform(test)
predictions = predictions.withColumn("label", predictions["label"].cast(DoubleType()))
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

+---+-----+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
|  1|    3|  5|  11| 12| 13|label|            features|       rawPrediction|         probability|prediction|
+---+-----+---+----+---+---+-----+--------------------+--------------------+--------------------+----------+
| 17|19752|  7|   0|  0| 25|  2.0|[17.0,19752.0,7.0...|[0.0,0.5771704180...|[0.0,0.5771704180...|       1.0|
| 17|24090|  9|   0|  0| 35|  2.0|[17.0,24090.0,9.0...|[0.0,0.5771704180...|[0.0,0.5771704180...|       1.0|
| 17|25051|  6|   0|  0| 16|  1.0|[17.0,25051.0,6.0...|[0.0,0.5771704180...|[0.0,0.5771704180...|       1.0|
| 17|29571|  8|   0|  0| 15|  1.0|[17.0,29571.0,8.0...|[0.0,0.5771704180...|[0.0,0.5771704180...|       1.0|
| 17|31007|  6|   0|  0| 30|  2.0|[17.0,31007.0,6.0...|[0.0,0.5771704180...|[0.0,0.5771704180...|       1.0|
| 17|32607|  6|   0|  0| 20|  1.0|[17.0,32607.0,6.0...|[0.0,0.5771704180...|[0.0,0.5771704180...|       1.0|
| 17|34019|  6|   0