<a href="https://colab.research.google.com/github/kausar-vento/Big-Data/blob/main/Tugas%207/code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=ea6467dd83a06c176cc6d61e2de76e9cc14bfbc9df2ad932a702c5e52b1dd877
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


# Slide 30 Movie Lens Recomendation

In [3]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import math

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

class Rating:
    def __init__(self, userId, movieId, rating, timestamp):
        self.userId = userId
        self.movieId = movieId
        self.rating = rating
        self.timestamp = timestamp

def parseRating(line):
    fields = line.split("::")
    assert len(fields) == 4
    return Rating(int(fields[0]), int(fields[1]), float(fields[2]), int(fields[3]))

# Test it
parseRating("1::1193::5::978300760")

raw = sc.textFile("/content/drive/MyDrive/Big Data/ml-1m/ratings.dat")
# check one record. it should be res4: Array[String] = Array(1::1193::5::978300760)
# If this fails the location of the file is wrong.
raw.take(1)

ratings = raw.map(parseRating).toDF()
# check if everything is ok
ratings.show(5)

training, test = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Alternating Least Squares (ALS) matrix factorization.
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

model = als.fit(training)
model.save("mymodel")

# Prepare the recommendations
predictions = model.transform(test)
squared_diff = predictions.select((col("rating").cast("float") - col("prediction").cast("float")).alias("squared_diff")).na.drop()
squared_diff_squared = squared_diff.select((col("squared_diff") ** 2).alias("squared_diff_squared")).na.drop()
mse = squared_diff_squared.agg({"squared_diff_squared": "mean"}).collect()[0][0]
rmse = math.sqrt(mse)

predictions.take(10)

predictions.write.format("com.databricks.spark.csv").save("ml-predictions.csv")

+-------+------+---------+------+
|movieId|rating|timestamp|userId|
+-------+------+---------+------+
|   1193|   5.0|978300760|     1|
|    661|   3.0|978302109|     1|
|    914|   3.0|978301968|     1|
|   3408|   4.0|978300275|     1|
|   2355|   5.0|978824291|     1|
+-------+------+---------+------+
only showing top 5 rows



# Slide 48 Movie Lens Reco (Ver 2.0)

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

raw = spark.read.text("/content/drive/MyDrive/Big Data/ml-1m/ratings.dat").rdd
mydata = [(2, 0.01)]
mydatardd = spark.sparkContext.parallelize(mydata).map(lambda x: (0, x[0], x[1]))

def parseRating(row):
  fields = row.value.split("::")
  return int(fields[0]), int(fields[1]), float(fields[2])

ratings = raw.map(parseRating)
ratings_df = spark.createDataFrame(ratings, ["userId", "movieId", "rating"])

mydatadf = spark.createDataFrame(mydatardd, ["userId", "movieId", "rating"])

combine_ratings = ratings_df.union(mydatadf)

als = ALS(rank = 8, maxIter=5, regParam=1, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(combine_ratings)

products = model.recommendForUserSubset(spark.createDataFrame([(1,)], ["userId"]), 10)
products.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3382, 4.4569645...|
+------+--------------------+

