# Spark ML: Overview

Spark ML (Machine Learning) adalah pustaka untuk pembelajaran mesin di Apache Spark. Pustaka ini dirancang untuk memudahkan pengembangan, penerapan, dan pengelolaan algoritma pembelajaran mesin pada data besar. 

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [2]:
sc =SparkContext()
spark = SparkSession.builder.appName("Python Spark ML basic example").getOrCreate()

## Diagnostic Analytics

Jenis diagnostic analytics berguna ketika lembaga, organisasi, atau perusahaan ingin mendapatkan wawasan mengenai masalah tertentu. Proses analisis dilakukan dengan melakukan pemulihan, pengembangan, dan penelusuran data. Data yang dimasukkan dalam analisis tentu saja lebih banyak dan bervariasi. 

* Tujuan: Menganalisis data untuk memahami penyebab di balik kejadian atau pola yang ditemukan.
* Metode: Menggunakan teknik analisis lebih mendalam seperti analisis sebab-akibat dan regresi.

Contoh: Menyelidiki penurunan performa penjualan dengan mencari tahu penyebabnya, seperti perubahan dalam strategi pemasaran atau faktor eksternal.

### Correlation

In [3]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

In [4]:
data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
data

[(SparseVector(4, {0: 1.0, 3: -2.0}),),
 (DenseVector([4.0, 5.0, 0.0, 3.0]),),
 (DenseVector([6.0, 7.0, 0.0, 8.0]),),
 (SparseVector(4, {0: 9.0, 3: 1.0}),)]

In [5]:
df = spark.createDataFrame(data, ["features"])
df.show()

+--------------------+
|            features|
+--------------------+
|(4,[0,3],[1.0,-2.0])|
|   [4.0,5.0,0.0,3.0]|
|   [6.0,7.0,0.0,8.0]|
| (4,[0,3],[9.0,1.0])|
+--------------------+



In [6]:
r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))



Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])


In [7]:
r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


### Chi Square

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])

r = ChiSquareTest.test(df, "features", "label").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.6872892787909721,0.6822703303362126]
degreesOfFreedom: [2, 3]
statistics: [0.75,1.5]


### Summarizer

In [9]:
from pyspark.ml.stat import Summarizer
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

df = sc.parallelize([Row(weight=1.0, features=Vectors.dense(1.0, 1.0, 1.0)),
                     Row(weight=0.0, features=Vectors.dense(1.0, 2.0, 3.0))]).toDF()
df.show()

+------+-------------+
|weight|     features|
+------+-------------+
|   1.0|[1.0,1.0,1.0]|
|   0.0|[1.0,2.0,3.0]|
+------+-------------+



In [10]:

# create summarizer for multiple metrics "mean" and "count"
summarizer = Summarizer.metrics("mean", "count")


In [11]:

# compute statistics for multiple metrics with weight
df.select(summarizer.summary(df.features, df.weight)).show(truncate=False)

# compute statistics for multiple metrics without weight
df.select(summarizer.summary(df.features)).show(truncate=False)

# compute statistics for single metric "mean" with weight
df.select(Summarizer.mean(df.features, df.weight)).show(truncate=False)

# compute statistics for single metric "mean" without weight
df.select(Summarizer.mean(df.features)).show(truncate=False)

+-----------------------------------+
|aggregate_metrics(features, weight)|
+-----------------------------------+
|{[1.0,1.0,1.0], 1}                 |
+-----------------------------------+

+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|{[1.0,1.5,2.0], 2}              |
+--------------------------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.0,1.0] |
+--------------+

+--------------+
|mean(features)|
+--------------+
|[1.0,1.5,2.0] |
+--------------+



In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL")],
 ["document", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = CountVectorizer(inputCol="words", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)

In [13]:
import numpy as np

total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---------+------+
|vocabList|counts|
+---------+------+
|   python|   3.0|
|    spark|   2.0|
|      sql|   1.0|
+---------+------+



In [14]:
model.transform(sentenceData).show(truncate=True)


+--------+--------------------+--------------------+-------------------+--------------------+
|document|            sentence|               words|        rawFeatures|            features|
+--------+--------------------+--------------------+-------------------+--------------------+
|       0|Python python Spa...|[python, python, ...|(3,[0,1],[2.0,2.0])|(3,[0,1],[0.0,0.8...|
|       1|          Python SQL|       [python, sql]|(3,[0,2],[1.0,1.0])|(3,[0,2],[0.0,0.4...|
+--------+--------------------+--------------------+-------------------+--------------------+



In [15]:
from pyspark.sql.types import ArrayType, StringType

def termsIdx2Term(vocabulary):
    def termsIdx2Term(termIndices):
        return [vocabulary[int(index)] for index in termIndices]
    return udf(termsIdx2Term, ArrayType(StringType()))

vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
vocabList

['python', 'spark', 'sql']

In [16]:
rawFeatures = model.transform(sentenceData).select('rawFeatures')
rawFeatures.show()

+-------------------+
|        rawFeatures|
+-------------------+
|(3,[0,1],[2.0,2.0])|
|(3,[0,2],[1.0,1.0])|
+-------------------+



## Predictive Analytics

### Regression

In [18]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)



In [19]:
house_df = sqlContext.read.load('housing.csv',sep=" ", format="csv", inferSchema="true", header="true")
house_df.take(5)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0, nox=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1, TAX=296.0, PT=15.3, B=396.9, LSTAT=4.98, MV=24.0),
 Row(CRIM=0.02731, ZN=0.0, INDUS=7.07, CHAS=0, nox=0.469, RM=6.421, AGE=78.9, DIS=4.9671, RAD=2, TAX=242.0, PT=17.8, B=396.9, LSTAT=9.14, MV=21.6),
 Row(CRIM=0.02729, ZN=0.0, INDUS=7.07, CHAS=0, nox=0.469, RM=7.185, AGE=61.1, DIS=4.9671, RAD=2, TAX=242.0, PT=17.8, B=392.83, LSTAT=4.03, MV=34.7),
 Row(CRIM=0.03237, ZN=0.0, INDUS=2.18, CHAS=0, nox=0.458, RM=6.998, AGE=45.8, DIS=6.0622, RAD=3, TAX=222.0, PT=18.7, B=394.63, LSTAT=2.94, MV=33.4),
 Row(CRIM=0.06905, ZN=0.0, INDUS=2.18, CHAS=0, nox=0.458, RM=7.147, AGE=54.2, DIS=6.0622, RAD=3, TAX=222.0, PT=18.7, B=396.9, LSTAT=5.33, MV=36.2)]

In [20]:
house_df.cache()
house_df.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- nox: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PT: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- MV: double (nullable = true)



In [21]:
house_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
CRIM,506,3.6135235573122535,8.601545105332491,0.00632,88.9762
ZN,506,11.363636363636363,23.32245299451514,0.0,100.0
INDUS,506,11.136778656126504,6.860352940897589,0.46,27.74
CHAS,506,0.0691699604743083,0.2539940413404101,0,1
nox,506,0.5546950592885372,0.11587767566755584,0.385,0.871
RM,506,6.284634387351787,0.7026171434153232,3.561,8.78
AGE,506,68.57490118577078,28.148861406903595,2.9,100.0
DIS,506,3.795042687747034,2.10571012662761,1.1296,12.1265
RAD,506,9.549407114624506,8.707259384239366,1,24


In [22]:
type(house_df)

pyspark.sql.dataframe.DataFrame

In [23]:
import pandas as pd
numeric_features = [t[0] for t in house_df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = house_df.select(numeric_features).sample(False, 0.8).toPandas()
type(sampled_data)

pandas.core.frame.DataFrame

setelah menjadi panda, dapat dilakukan prosesdata analitik seperti pada python

In [24]:
house_df.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'nox',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PT',
 'B',
 'LSTAT',
 'MV']

In [25]:
house_df.show(3)

+-------+----+-----+----+-----+-----+----+------+---+-----+----+------+-----+----+
|   CRIM|  ZN|INDUS|CHAS|  nox|   RM| AGE|   DIS|RAD|  TAX|  PT|     B|LSTAT|  MV|
+-------+----+-----+----+-----+-----+----+------+---+-----+----+------+-----+----+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296.0|15.3| 396.9| 4.98|24.0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242.0|17.8| 396.9| 9.14|21.6|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242.0|17.8|392.83| 4.03|34.7|
+-------+----+-----+----+-----+-----+----+------+---+-----+----+------+-----+----+
only showing top 3 rows



In [26]:
from pyspark.ml.feature import VectorAssembler
inputCols = [
    'CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PT',
 'B',
 'LSTAT',
 'MV'
]
outputCol = "features"
df_va = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
df = df_va.transform(house_df)
df = df.select(['nox','features'])
newcolumns = ['label','features']
df.toDF(*newcolumns).show(truncate=False)

+-----+---------------------------------------------------------------------------+
|label|features                                                                   |
+-----+---------------------------------------------------------------------------+
|0.538|[0.00632,18.0,2.31,0.0,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98,24.0]     |
|0.469|[0.02731,0.0,7.07,0.0,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14,21.6]    |
|0.469|[0.02729,0.0,7.07,0.0,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03,34.7]   |
|0.458|[0.03237,0.0,2.18,0.0,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94,33.4]   |
|0.458|[0.06905,0.0,2.18,0.0,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33,36.2]    |
|0.458|[0.02985,0.0,2.18,0.0,6.43,58.7,6.0622,3.0,222.0,18.7,394.12,5.21,28.7]    |
|0.524|[0.08829,12.5,7.87,0.0,6.012,66.6,5.5605,5.0,311.0,15.2,395.6,12.43,22.9]  |
|0.524|[0.14455,12.5,7.87,0.0,6.172,96.1,5.9505,5.0,311.0,15.2,396.9,19.15,27.1]  |
|0.524|[0.21124,12.5,7.87,0.0,5.631,100.0,6.0821,5.0,311.0,15.2,386.63,29.93

In [28]:
# Membagi data
(trainingData, testData) = df.randomSplit([0.7, 0.3])

In [30]:
trainingData.show()

+-----+--------------------+
|  nox|            features|
+-----+--------------------+
|0.392|[0.03548,80.0,3.6...|
|0.392|[0.04819,80.0,3.6...|
|0.394|[0.01538,90.0,3.7...|
|0.398|[0.04379,80.0,3.3...|
|  0.4|[0.00906,90.0,2.9...|
|  0.4|[0.05561,70.0,2.2...|
|0.401|[0.01439,60.0,2.9...|
|0.401|[0.01501,90.0,1.2...|
|0.401|[0.02187,60.0,2.9...|
|0.403|[0.01778,95.0,1.4...|
|0.403|[0.0315,95.0,1.47...|
|0.404|[0.03768,80.0,1.5...|
|0.404|[0.04011,80.0,1.5...|
|0.404|[0.04666,80.0,1.5...|
|0.405|[0.03871,52.5,5.3...|
|0.405|[0.04297,52.5,5.3...|
|0.409|[0.05789,12.5,6.0...|
| 0.41|[0.0136,75.0,4.0,...|
|0.411|[0.03615,80.0,4.9...|
|0.411|[0.07244,60.0,1.6...|
+-----+--------------------+
only showing top 20 rows



In [31]:
testData.show()

+------+--------------------+
|   nox|            features|
+------+--------------------+
| 0.385|[0.01965,80.0,1.7...|
| 0.389|[0.01096,55.0,2.2...|
| 0.398|[0.03584,80.0,3.3...|
|   0.4|[0.04417,70.0,2.2...|
|   0.4|[0.06466,70.0,2.2...|
| 0.403|[0.01311,90.0,1.2...|
| 0.405|[0.0459,52.5,5.32...|
| 0.409|[0.12816,12.5,6.0...|
| 0.409|[0.13554,12.5,6.0...|
|  0.41|[0.01709,90.0,2.0...|
|  0.41|[0.02055,85.0,0.7...|
| 0.411|[0.01432,100.0,1....|
| 0.411|[0.03502,80.0,4.9...|
| 0.413|[0.04301,80.0,1.9...|
|0.4161|[0.02009,95.0,2.6...|
|0.4161|[0.0351,95.0,2.68...|
| 0.426|[0.04462,25.0,4.8...|
| 0.428|[0.09252,30.0,4.9...|
| 0.428|[0.1029,30.0,4.93...|
| 0.431|[0.1403,22.0,5.86...|
+------+--------------------+
only showing top 20 rows



In [34]:
from pyspark.ml.regression import LinearRegression
# Melatih model
lr = LinearRegression(featuresCol="features", labelCol="nox")
model = lr.fit(trainingData)

In [35]:
# Melihat koefisien model
print("Coefficients: " + str(model.coefficients))

# Melihat intercept model
print("Intercept: " + str(model.intercept))

Coefficients: [-0.000565736235050674,-3.1996235109388945e-05,0.0038575593627519274,-0.0073594935050072165,-0.00786112398153221,0.000809958080738571,-0.018822829856001692,0.0024933259358736323,7.841173656238722e-05,-0.01256680744065956,2.191882928070262e-06,0.0002655770991284851,-0.001395396443715132]
Intercept: 0.7832464233929658


In [41]:
# Mendapatkan ringkasan model
trainingSummary = model.summary
trainingSummary


<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x7f752e624450>

In [42]:
# Memprediksi
predictions = model.transform(testData)

In [43]:
predictions.show()

+------+--------------------+-------------------+
|   nox|            features|         prediction|
+------+--------------------+-------------------+
| 0.385|[0.01965,80.0,1.7...|0.36172917396547893|
| 0.389|[0.01096,55.0,2.2...|0.43382255453093793|
| 0.398|[0.03584,80.0,3.3...|0.43758091418055384|
|   0.4|[0.04417,70.0,2.2...|0.44907173573724257|
|   0.4|[0.06466,70.0,2.2...|0.43395093603157986|
| 0.403|[0.01311,90.0,1.2...| 0.3401137928556448|
| 0.405|[0.0459,52.5,5.32...| 0.4527201439880943|
| 0.409|[0.12816,12.5,6.0...|  0.437897792249772|
| 0.409|[0.13554,12.5,6.0...| 0.4492849139810452|
|  0.41|[0.01709,90.0,2.0...| 0.3097744152578325|
|  0.41|[0.02055,85.0,0.7...| 0.3692308317094424|
| 0.411|[0.01432,100.0,1....|0.40825319873249727|
| 0.411|[0.03502,80.0,4.9...|0.42200072440697856|
| 0.413|[0.04301,80.0,1.9...| 0.2992718307237377|
|0.4161|[0.02009,95.0,2.6...|0.43153636710676585|
|0.4161|[0.0351,95.0,2.68...|  0.436348653618187|
| 0.426|[0.04462,25.0,4.8...|  0.467173151553065|


In [57]:
from pyspark.ml.evaluation import RegressionEvaluator
# Evaluasi
evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="nox")
evaluator.evaluate(predictions)

0.05801174055311912

### Decision Tree Classiication

In [59]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer

data = df.toDF(*newcolumns)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [60]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [61]:
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

In [62]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [63]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [64]:
# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       2.0|        75.0|[0.01965,80.0,1.7...|
|       2.0|        76.0|[0.01096,55.0,2.2...|
|       2.0|        77.0|[0.01538,90.0,3.7...|
|       2.0|        68.0|[0.03584,80.0,3.3...|
|       2.0|        43.0|[0.04417,70.0,2.2...|
+----------+------------+--------------------+
only showing top 5 rows



In [65]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Acccuracy = %g " % accuracy)
print("Test Error = %g " % (1.0 - accuracy))

Acccuracy = 0.295302 
Test Error = 0.704698 


In [66]:
treeModel = model.stages[2]
# summary only
print(treeModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_251a26391ccb, depth=5, numNodes=27, numClasses=81, numFeatures=13


### K Means

In [69]:
from pyspark.ml.clustering import KMeans

In [70]:
# Loads data.
dataset = spark.read.format("libsvm").load("sample_kmeans_data.txt")
dataset.show(truncate=False)

+-----+-------------------------+
|label|features                 |
+-----+-------------------------+
|0.0  |(3,[],[])                |
|1.0  |(3,[0,1,2],[0.1,0.1,0.1])|
|2.0  |(3,[0,1,2],[0.2,0.2,0.2])|
|3.0  |(3,[0,1,2],[9.0,9.0,9.0])|
|4.0  |(3,[0,1,2],[9.1,9.1,9.1])|
|5.0  |(3,[0,1,2],[9.2,9.2,9.2])|
+-----+-------------------------+



In [71]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [72]:
# Make predictions
predictions = model.transform(dataset)

In [73]:
from pyspark.ml.evaluation import ClusteringEvaluator
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9997530305375207


In [74]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]
