In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=806aba1d92c50d2787767e04e03caba2d4fc29546ef8698ac3cb8ae103815721
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.recommendation import ALS

spark = SparkSession.builder.appName("Movie Lens").getOrCreate()

In [4]:
def parseRating(str):
    fields = str.split("::")
    assert len(fields) == 4
    return (int(fields[0]), int(fields[1]), float(fields[2]), int(fields[3]))

In [5]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

class Rating:
    def __init__(self, userId, movieId, rating, timestamp):
        self.userId = int(userId)
        self.movieId = int(movieId)
        self.rating = float(rating)
        self.timestamp = float(timestamp)

def parseRating(str):
    fields = str.split(",")
    assert(len(fields) == 4)
    return Rating(int(fields[0]), int(fields[1]), float(fields[2]), float(fields[3]))

In [6]:
# Test it
parseRating("1,1193,5,978300760")

<__main__.Rating at 0x7f9430ec9450>

In [7]:

ratings = spark.read.csv("/content/drive/MyDrive/Big Data/Spark/ratings.csv", header=True, inferSchema=True)
# Check if everything is ok
ratings.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [8]:
# Data Training 80% dan Test 20%
training, test = ratings.randomSplit([0.8, 0.2])

In [9]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)
model.save("mymodel")


In [10]:
predictions = model.transform(test)
mse = predictions.withColumn("diff", col("rating") - col("prediction")).select((col("diff") ** 2).alias("squared_diff")).filter(~col("squared_diff").isNull()).agg({"squared_diff": "sum"}).collect()[0][0]
print("Mean Squared Error:", mse)

predictions.show(10)

Mean Squared Error: nan
+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|    362|   5.0|964982588| 3.8978007|
|     1|    923|   5.0|964981529|  4.251715|
|     1|   1029|   5.0|964982855|  4.504368|
|     1|    553|   5.0|964984153|  5.373411|
|     1|    316|   3.0|964982310|  4.147068|
|     1|    231|   5.0|964981179| 3.8493934|
|     1|    733|   4.0|964982400| 3.8678486|
|     1|    648|   3.0|964982563| 4.2043037|
|     1|    527|   5.0|964984002|  4.748278|
|     1|   1049|   5.0|964982400| 3.7579348|
+------+-------+------+---------+----------+
only showing top 10 rows



In [11]:
# Menyimpan Hasil Prediksi
predictions.write.format("csv").save("ml-bigdata.csv")