#Inisiasi SparkSession

In [284]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, isnull, when, count, col

In [285]:
spark = SparkSession.builder.appName("filmtv_movies").getOrCreate()

#Load and Read Data

In [286]:
df = spark.read.csv("datasets/filmtv_movies_ENG.csv", header=True, inferSchema=True)

In [287]:
df = df [['duration','avg_vote','critics_vote','public_vote','total_votes','year','title']]

In [288]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show(5)
c1 = df.count()
print("data awal : ", c1)

+--------+--------+------------+-----------+-----------+----+-----+
|duration|avg_vote|critics_vote|public_vote|total_votes|year|title|
+--------+--------+------------+-----------+-----------+----+-----+
|       0|       0|        4085|        219|          0|   0|    0|
+--------+--------+------------+-----------+-----------+----+-----+

data awal :  37711


#Hapus Missing Data

In [289]:
df = df.dropna()
c2 = df.count()
print("data setelah dihapus : ", c2)

data setelah dihapus :  33407


Hapus Duplicate Data

In [290]:
df = df.dropDuplicates()
c3 = c2 - df.count()
print("total data duplicate yang dihapus : ",c3 )


total data duplicate yang dihapus :  1


In [291]:
df[['total_votes']].summary().show()



+-------+------------------+
|summary|       total_votes|
+-------+------------------+
|  count|             33406|
|   mean|39.306142609112136|
| stddev| 69.04621102554131|
|    min|                 2|
|    25%|                 6|
|    50%|                14|
|    75%|                40|
|    max|              1010|
+-------+------------------+



                                                                                

K-Means Clustring

Import package yang dibutuhkan

In [337]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [338]:
df.show(2)

+--------+--------+------------+-----------+-----------+----+------------------+
|duration|avg_vote|critics_vote|public_vote|total_votes|year|             title|
+--------+--------+------------+-----------+-----------+----+------------------+
|      80|     5.6|         4.2|        7.0|          8|1942|Acque di primavera|
|      89|     4.8|         5.0|        5.0|         18|1986|     Assassination|
+--------+--------+------------+-----------+-----------+----+------------------+
only showing top 2 rows



Memilih kolom yang akan dibutuhkan


In [294]:
feat_cols = ['duration','avg_vote','critics_vote','public_vote','total_votes','year']

In [295]:
assembler = VectorAssembler(inputCols=feat_cols, outputCol='features')

In [296]:
final_data = assembler.transform(df)

In [297]:
from pyspark.ml.feature import StandardScaler

In [298]:
scaler = StandardScaler(inputCol='features', outputCol='scaledFeatures', withStd=True, withMean=True)

In [299]:
scalerModel = scaler.fit(final_data)

In [300]:
cluster_final_data = scalerModel.transform(final_data)
cluster_final_data.show(5)

+--------+--------+------------+-----------+-----------+----+--------------------+--------------------+--------------------+
|duration|avg_vote|critics_vote|public_vote|total_votes|year|               title|            features|      scaledFeatures|
+--------+--------+------------+-----------+-----------+----+--------------------+--------------------+--------------------+
|      80|     5.6|         4.2|        7.0|          8|1942|  Acque di primavera|[80.0,5.6,4.2,7.0...|[-0.7972114086041...|
|      89|     4.8|         5.0|        5.0|         18|1986|       Assassination|[89.0,4.8,5.0,5.0...|[-0.4591486594416...|
|     100|     5.0|        4.33|        6.0|         15|1953|              Attila|[100.0,5.0,4.33,6...|[-0.0459608549096...|
|     123|     6.4|        5.71|        7.0|         20|1972|L'aventure, c'est...|[123.0,6.4,5.71,7...|[0.81797728183890...|
|      98|     6.5|         7.0|        6.0|         15|1985|Baby: Secret of t...|[98.0,6.5,7.0,6.0...|[-0.1210859102791...|


In [301]:
kmeans3 = KMeans(featuresCol='scaledFeatures', k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures', k=2)

In [302]:
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)

                                                                                

In [307]:
prediction_3 = model_k3.transform(cluster_final_data)
prediction_2 = model_k2.transform(cluster_final_data)

In [318]:
prediction_3.show(10)
prediction_2.show(10)

+--------+--------+------------+-----------+-----------+----+--------------------+--------------------+--------------------+----------+
|duration|avg_vote|critics_vote|public_vote|total_votes|year|               title|            features|      scaledFeatures|prediction|
+--------+--------+------------+-----------+-----------+----+--------------------+--------------------+--------------------+----------+
|      80|     5.6|         4.2|        7.0|          8|1942|  Acque di primavera|[80.0,5.6,4.2,7.0...|[-0.7972114086041...|         1|
|      89|     4.8|         5.0|        5.0|         18|1986|       Assassination|[89.0,4.8,5.0,5.0...|[-0.4591486594416...|         0|
|     100|     5.0|        4.33|        6.0|         15|1953|              Attila|[100.0,5.0,4.33,6...|[-0.0459608549096...|         0|
|     123|     6.4|        5.71|        7.0|         20|1972|L'aventure, c'est...|[123.0,6.4,5.71,7...|[0.81797728183890...|         1|
|      98|     6.5|         7.0|        6.0|    

In [336]:
prediction_3.groupBy('prediction').count().show() 
prediction_2.groupBy('prediction').count().show()


                                                                                

+----------+-----+
|prediction|count|
+----------+-----+
|         1|17691|
|         2| 2375|
|         0|13340|
+----------+-----+



                                                                                

+----------+-----+
|prediction|count|
+----------+-----+
|         1|14015|
|         0|19391|
+----------+-----+



Melakukan Centering atau mencari nilai tengah

In [314]:
center_3 = model_k3.clusterCenters()
print('Cluster centers : ')
for center in center_3:
    print(center)

Cluster centers : 
[-0.12723276 -0.99376918 -0.91328808 -0.87207236 -0.24815632  0.24187258]
[-0.05079976  0.63242297  0.57951123  0.55895279 -0.18730137 -0.21986635]
[1.0930457  0.87102573 0.81310731 0.73474168 2.78903318 0.27918966]


In [319]:
center_2 = model_k2.clusterCenters()
print('Cluster centers : ')
for center in center_2:
    print(center)

Cluster centers : 
[ 0.10356183  0.69107253  0.63363525  0.60723272  0.18311434 -0.15641466]
[-0.14281236 -0.95299306 -0.87378671 -0.83737746 -0.25251574  0.21569672]


Evaluator


In [330]:
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='scaledFeatures',metricName='silhouette')

In [331]:
silhouette3 = evaluator.evaluate(prediction_3)
silhouette2 = evaluator.evaluate(prediction_2)


                                                                                

In [339]:
print('silhouette with squared euclidean distance 3 cluster : ' + str(silhouette3))
print('silhouette with squared euclidean distance 2 cluster : ' + str(silhouette2))

silhouette with squared euclidean distance 3 cluster : 0.4542696284341163
silhouette with squared euclidean distance 2 cluster : 0.413957165407153
