In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
spark

In [3]:
from pyspark import SparkContext
sc = spark.sparkContext

**Naive Bayes**

Referensi: https://runawayhorse001.github.io/LearningApacheSpark/classification.html

1. Menyiapkan spark context dan SparkSession

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("PySpark Naive Bayes Classifier") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

2. Load dataset

In [5]:
df = spark.read.format('com.databricks.spark.csv').load("dataset.txt", header=False)
df.show(5)

+-----+-----+---+---+-----+----+-----+
|  _c0|  _c1|_c2|_c3|  _c4| _c5|  _c6|
+-----+-----+---+---+-----+----+-----+
|vhigh|vhigh|  2|  2|small| low|unacc|
|vhigh|vhigh|  2|  2|small| med|unacc|
|vhigh|vhigh|  2|  2|small|high|unacc|
|vhigh|vhigh|  2|  2|  med| low|unacc|
|vhigh|vhigh|  2|  2|  med| med|unacc|
+-----+-----+---+---+-----+----+-----+
only showing top 5 rows



3. Fungsi untuk mengubah fitur kategorikal menjadi numerik dan disimpan dalam bentuk *Dense Vector*

In [6]:
def get_dummy(df,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label',col(labelCol))

    return data.select('features','label')

In [7]:
categorical = [f"_c{i}" for i in range(6)]
numeric = []
class_output = '_c6'
categorical, numeric, class_output

(['_c0', '_c1', '_c2', '_c3', '_c4', '_c5'], [], '_c6')

In [8]:
data = get_dummy(df,categorical, numeric, class_output)
data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(15,[6,9,14],[1.0...|unacc|
|(15,[6,9],[1.0,1.0])|unacc|
|(15,[6,9,13],[1.0...|unacc|
|(15,[6,9,12,14],[...|unacc|
|(15,[6,9,12],[1.0...|unacc|
+--------------------+-----+
only showing top 5 rows



4. merubah label kategorikal

In [9]:
from pyspark.ml.feature import StringIndexer
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(5, True)

+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|(15,[6,9,14],[1.0...|unacc|         0.0|
|(15,[6,9],[1.0,1.0])|unacc|         0.0|
|(15,[6,9,13],[1.0...|unacc|         0.0|
|(15,[6,9,12,14],[...|unacc|         0.0|
|(15,[6,9,12],[1.0...|unacc|         0.0|
+--------------------+-----+------------+
only showing top 5 rows



5. fitur yang memiliki *unique value* lebih dari 4 akan dianggap sebagai fitur kontinu

In [10]:
from pyspark.ml.feature import VectorIndexer
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features",
                               outputCol="indexedFeatures",
                               maxCategories=5).fit(data)
featureIndexer.transform(data).show(5, True)

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(15,[6,9,14],[1.0...|unacc|(15,[6,9,14],[1.0...|
|(15,[6,9],[1.0,1.0])|unacc|(15,[6,9],[1.0,1.0])|
|(15,[6,9,13],[1.0...|unacc|(15,[6,9,13],[1.0...|
|(15,[6,9,12,14],[...|unacc|(15,[6,9,12,14],[...|
|(15,[6,9,12],[1.0...|unacc|(15,[6,9,12],[1.0...|
+--------------------+-----+--------------------+
only showing top 5 rows



6. split data training:data test dengan perbandingan 60:40

In [11]:
# Split the data into training and test sets (40% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

trainingData.show(5,False)
testData.show(5,False)

+----------------------------------------------+-----+
|features                                      |label|
+----------------------------------------------+-----+
|(15,[0],[1.0])                                |unacc|
|(15,[0,3],[1.0,1.0])                          |unacc|
|(15,[0,3,6],[1.0,1.0,1.0])                    |unacc|
|(15,[0,3,6,9,11],[1.0,1.0,1.0,1.0,1.0])       |unacc|
|(15,[0,3,6,9,11,13],[1.0,1.0,1.0,1.0,1.0,1.0])|unacc|
+----------------------------------------------+-----+
only showing top 5 rows

+----------------------------------------------+-----+
|features                                      |label|
+----------------------------------------------+-----+
|(15,[],[])                                    |unacc|
|(15,[0,3,6,9],[1.0,1.0,1.0,1.0])              |unacc|
|(15,[0,3,6,9,12,13],[1.0,1.0,1.0,1.0,1.0,1.0])|unacc|
|(15,[0,3,6,9,12,14],[1.0,1.0,1.0,1.0,1.0,1.0])|unacc|
|(15,[0,3,6,9,13],[1.0,1.0,1.0,1.0,1.0])       |unacc|
+---------------------------------------

In [12]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

7. fit Naive Bayes model

In [13]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='indexedFeatures', labelCol='indexedLabel')

8. Arsitektur *pipeline*

In [14]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction",
                               outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [15]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, nb,labelConverter])

In [16]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

9. Membuat prediksi menggunakan data uji

In [17]:
# Make predictions.
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)

+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|          (15,[],[])|unacc|         unacc|
|(15,[0,3,6,9],[1....|unacc|         unacc|
|(15,[0,3,6,9,12,1...|unacc|         unacc|
|(15,[0,3,6,9,12,1...|unacc|         unacc|
|(15,[0,3,6,9,13],...|unacc|         unacc|
+--------------------+-----+--------------+
only showing top 5 rows



10. evaluasi

In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel",
                                              predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.20208
