In [83]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, StandardScaler
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.types import DoubleType
import warnings
warnings.filterwarnings("ignore")

In [85]:
spark = SparkSession.builder.master("local[1]") \
                    .appName('Tubes ITBD') \
                    .getOrCreate()

df = spark.read.format("csv").option("header", "true").load("/home/ubuntu/codingan/movies_dataset.csv")

24/01/01 11:52:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [69]:
df.head()

24/01/01 11:41:30 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , User_Id, Movie_Name, Rating, Genre
 Schema: _c0, User_Id, Movie_Name, Rating, Genre
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/codingan/movies_dataset.csv


Row(_c0='0', User_Id='1', Movie_Name='Pulp Fiction (1994)', Rating='5.0', Genre='Comedy|Crime|Drama|Thriller')

In [70]:
df = df.withColumn("Rating", df["Rating"].cast(DoubleType()))

In [71]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- User_Id: string (nullable = true)
 |-- Movie_Name: string (nullable = true)
 |-- Rating: double (nullable = true)
 |-- Genre: string (nullable = true)



In [72]:
input_col = ['Rating']

In [73]:
vec_assembler = VectorAssembler(inputCols = input_col,
                                outputCol = "features")

In [74]:
final_df = vec_assembler.transform(df)

In [75]:
final_df.show()

+---+-------+--------------------+------+--------------------+--------+
|_c0|User_Id|          Movie_Name|Rating|               Genre|features|
+---+-------+--------------------+------+--------------------+--------+
|  0|      1| Pulp Fiction (1994)|   5.0|Comedy|Crime|Dram...|   [5.0]|
|  1|      1|Three Colors: Red...|   3.5|               Drama|   [3.5]|
|  2|      1|Three Colors: Blu...|   5.0|               Drama|   [5.0]|
|  3|      1|  Underground (1995)|   5.0|    Comedy|Drama|War|   [5.0]|
|  4|      1|Singin' in the Ra...|   3.5|Comedy|Musical|Ro...|   [3.5]|
|  5|      1|Dirty Dancing (1987)|   4.0|Drama|Musical|Rom...|   [4.0]|
|  6|      1| Delicatessen (1991)|   3.5|Comedy|Drama|Romance|   [3.5]|
|  7|      1|          Ran (1985)|   3.5|           Drama|War|   [3.5]|
|  8|      1|Seventh Seal, The...|   5.0|               Drama|   [5.0]|
|  9|      1|Bridge on the Riv...|   4.0| Adventure|Drama|War|   [4.0]|
| 10|      1|            M (1931)|   3.5|Crime|Film-Noir|T...|  

24/01/01 11:41:30 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , User_Id, Movie_Name, Rating, Genre
 Schema: _c0, User_Id, Movie_Name, Rating, Genre
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/codingan/movies_dataset.csv


In [76]:
model = KMeans(featuresCol = "features", k=5)

In [82]:
model = model.fit(final_df)

24/01/01 11:42:30 WARN MemoryStore: Not enough space to cache rdd_191_1 in memory! (computed 98.1 MiB so far)
24/01/01 11:42:30 WARN BlockManager: Persisting block rdd_191_1 to disk instead.
24/01/01 11:42:37 WARN MemoryStore: Not enough space to cache rdd_191_1 in memory! (computed 43.6 MiB so far)
24/01/01 11:42:38 WARN MemoryStore: Not enough space to cache rdd_191_5 in memory! (computed 3.4 MiB so far)
24/01/01 11:42:38 WARN BlockManager: Persisting block rdd_191_5 to disk instead.
24/01/01 11:42:39 WARN MemoryStore: Not enough space to cache rdd_191_3 in memory! (computed 65.4 MiB so far)
24/01/01 11:42:39 WARN BlockManager: Persisting block rdd_191_3 to disk instead.
24/01/01 11:42:39 WARN MemoryStore: Not enough space to cache rdd_191_4 in memory! (computed 65.4 MiB so far)
24/01/01 11:42:39 WARN BlockManager: Persisting block rdd_191_4 to disk instead.
24/01/01 11:42:47 WARN MemoryStore: Not enough space to cache rdd_191_3 in memory! (computed 65.4 MiB so far)
24/01/01 11:42:48

In [86]:
model

KMeansModel: uid=KMeans_535cd5a63084, k=5, distanceMeasure=euclidean, numFeatures=1

In [89]:
model.transform(final_df).groupBy("prediction").count().show()



+----------+-------+
|prediction|  count|
+----------+-------+
|         1|5813013|
|         3|1569373|
|         4|2903665|
|         2|6639798|
|         0|8074246|
+----------+-------+



                                                                                

In [90]:
predictions = model.transform(final_df)

In [92]:
predictions.show()

+---+-------+--------------------+------+--------------------+--------+----------+
|_c0|User_Id|          Movie_Name|Rating|               Genre|features|prediction|
+---+-------+--------------------+------+--------------------+--------+----------+
|  0|      1| Pulp Fiction (1994)|   5.0|Comedy|Crime|Dram...|   [5.0]|         1|
|  1|      1|Three Colors: Red...|   3.5|               Drama|   [3.5]|         0|
|  2|      1|Three Colors: Blu...|   5.0|               Drama|   [5.0]|         1|
|  3|      1|  Underground (1995)|   5.0|    Comedy|Drama|War|   [5.0]|         1|
|  4|      1|Singin' in the Ra...|   3.5|Comedy|Musical|Ro...|   [3.5]|         0|
|  5|      1|Dirty Dancing (1987)|   4.0|Drama|Musical|Rom...|   [4.0]|         2|
|  6|      1| Delicatessen (1991)|   3.5|Comedy|Drama|Romance|   [3.5]|         0|
|  7|      1|          Ran (1985)|   3.5|           Drama|War|   [3.5]|         0|
|  8|      1|Seventh Seal, The...|   5.0|               Drama|   [5.0]|         1|
|  9

24/01/01 11:58:15 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , User_Id, Movie_Name, Rating, Genre
 Schema: _c0, User_Id, Movie_Name, Rating, Genre
Expected: _c0 but found: 
CSV file: file:///home/ubuntu/codingan/movies_dataset.csv


In [94]:
predictions.groupBy("Rating", "prediction").count().show()



+------+----------+-------+
|Rating|prediction|  count|
+------+----------+-------+
|   2.5|         4|1262797|
|   4.5|         1|2200539|
|   1.0|         3| 776815|
|   1.5|         3| 399490|
|   3.5|         0|3177318|
|   4.0|         2|6639798|
|   0.5|         3| 393068|
|   2.0|         4|1640868|
|   5.0|         1|3612474|
|   3.0|         0|4896928|
+------+----------+-------+



                                                                                

In [77]:
# movie_name_indexer = StringIndexer(inputCol="Movie_Name", outputCol="Movie_Name_index")
# genre_indexer = StringIndexer(inputCol="Genre", outputCol="Genre_index")

# indexed_df = movie_name_indexer.fit(df).transform(genre_indexer.fit(df).transform(df))

In [78]:
# indexed_df = df.withColumn('Rating', ('Rating').cast(DoubleType()))

In [79]:
# scaler = StandardScaler(inputCol="Rating", outputCol="scaled_rating")

# scaled_df = scaler.fit(indexed_df).transform(indexed_df)

In [80]:
# kmeans = KMeans(featuresCol="features", predictionCol="cluster")

In [81]:
# pipeline = Pipeline(stages=[kmeans])
# model = pipeline.fit(scaled_df)

# model.write().overwrite().save("path/to/save/model")

# # Untuk mendapatkan centroid:
# centroids = model.stages[-1].clusterCenters()
# print(centroids)