In [1]:
 # Connect Google Drive Untuk Ambil Data
 from google.colab import drive
 drive.mount('/content/drive')

 # Install PySpark
 !pip install pyspark

Mounted at /content/drive
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=ce0815be1abed97aa9a00ce0a6b9b1eda962acba63d2b272551c62011ff9960a
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [15]:
# Import Library
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Movie Lens").getOrCreate()

 # Parse String Menjadi Objek Rating
def parseRating(str):
    fields = str.split(",")
    assert len(fields) == 4
    return (int(fields[0]), int(fields[1]), float(fields[2]), int(fields[3]))


 # Baca File
raw = spark.read.text("/content/drive/MyDrive/S6/big data/ml-latest-small/ratings.dat").rdd.map(lambda x: x[0])
header = raw.first()
data = raw.filter(lambda x: x != header)
ratings = data.map(parseRating).toDF(["userId", "movieId", "rating", "timestamp"])


 # Data Training 80% dan Test 20%
training, test = ratings.randomSplit([0.8, 0.2])

 # Membuat Model
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)
model.save("mymodel")

 # Prediksi Data
predictions = model.transform(test)
mse = predictions.withColumn("diff", col("rating") - col("prediction")).select((col("diff") ** 2).alias("squared_diff")).filter(~col("squared_diff").isNull()).agg({"squared_diff": "sum"}).collect()[0][0]
print("Mean Squared Error:", mse)

predictions.show(10)

 # Menyimpan Hasil Prediksi
predictions.write.format("csv").save("ml-predictions.csv")

Mean Squared Error: nan
+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|     47|   5.0|964983815|   4.54771|
|     1|    101|   5.0|964980868|  3.902245|
|     1|    216|   5.0|964981208|  3.605606|
|     1|    356|   4.0|964980962| 4.8947506|
|     1|    423|   3.0|964982363|  3.355478|
|     1|    553|   5.0|964984153| 4.1617055|
|     1|    593|   4.0|964983793| 4.9087496|
|     1|    596|   5.0|964982838| 4.0692987|
|     1|    804|   4.0|964980499| 2.7491038|
|     1|    943|   4.0|964983614| 2.4962883|
+------+-------+------+---------+----------+
only showing top 10 rows



In [18]:
# Import Library
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Movie Lens").getOrCreate()
sc = spark.sparkContext

# Parse String Menjadi Objek Rating
def parseRating(str):
    fields = str.split(",")
    assert len(fields) == 4
    return (int(fields[0]), int(fields[1]), float(fields[2]), int(fields[3]))

# Baca File
raw = spark.read.text("/content/drive/MyDrive/S6/big data/ml-latest-small/ratings.csv").rdd.map(lambda x: x[0])
header = raw.first()
data = raw.filter(lambda x: x != header)
ratings = data.map(parseRating).toDF(["userId", "movieId", "rating", "timestamp"])
class Rating:
    def __init__(self, userId, movieId, rating):
        self.userId = userId
        self.movieId = movieId
        self.rating = rating

ratings_df = ratings.select(["userId", "movieId", "rating"])

# Build the recommendation model using ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(ratings_df)

# Generate product recommendations for user ID 1
products = model.recommendForUserSubset(spark.createDataFrame([(1,)]).toDF("userId"), 10)

products.show()


+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{33649, 5.555023...|
+------+--------------------+



In [26]:
# Import Library
from pyspark.mllib.stat import Statistics
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Movie Lens").getOrCreate()
sc = spark.sparkContext

# Baca File
raw = spark.read.csv("/content/drive/MyDrive/S6/big data/ml-latest-small/ratings.csv", header=True)
ratings = raw.selectExpr("cast(userId as int) userId", "cast(movieId as int) movieId", "cast(rating as float) rating", "cast(timestamp as int) timestamp")

# Diambil dari ratings.csv
mat = ratings.select("rating").rdd.map(lambda x: [x[0]])

# Perhitungan Statistics
summary = Statistics.colStats(mat)
print("Mean:", summary.mean()[0])
print("Variance:", summary.variance()[0])
print("Number of Non-Zeros:", summary.numNonzeros()[0])


Mean: 3.5015569836169593
Variance: 1.086867214296345
Number of Non-Zeros: 100836.0
