In [None]:
# installing pyspark python library
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Konfigurasi Spark
# creating Spark session
from pyspark.sql import SparkSession

# Membuat SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("NETFLIX")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Deklarasi fungsi
spark
sc = spark.sparkContext

In [None]:
#Digunakan untuk menghilangkan space diheader agar data dapat diolah
from pyspark.sql.types import StructType,StructField, StringType
import re
#import sparkFiles untuk membaca dataset dari url
from pyspark import SparkFiles

#load dataset
url = \
"https://raw.githubusercontent.com/ravenusa/FlashKnight/main/NetflixShows.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv("file://"+SparkFiles.get("NetflixShows.csv"),\
                    header=True, inferSchema= True)

#menampilkan info type data pada setiap kolom
df.printSchema()
#menghapus spaci pada header table agar data dapat diolah
for each in df.schema.names:
    df = df.withColumnRenamed(each,  re.sub(r'\s+([a-zA-Z_][a-zA-Z_0-9]*)\s*',\
                                            '',each.replace(' ', '')))

#menampilkan data dataset
df.show(truncate=False)

root
 |-- title: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- ratingLevel: string (nullable = true)
 |-- ratingDescription: integer (nullable = true)
 |-- release year: integer (nullable = true)
 |-- user rating score: string (nullable = true)
 |-- user rating size: integer (nullable = true)

+---------------------+------+-----------------------------------------------------------------------------+-----------------+-----------+---------------+--------------+
|title                |rating|ratingLevel                                                                  |ratingDescription|releaseyear|userratingscore|userratingsize|
+---------------------+------+-----------------------------------------------------------------------------+-----------------+-----------+---------------+--------------+
|White Chicks         |PG-13 |crude and sexual humor, language and some drug content                       |80               |2004       |82             |80            |
|Lu

In [None]:
# menghapus column yang tidak diperlukan dan menampilkan data
df_fill = df.drop('rating','ratingLevel','userratingscore')
df_fill.show(truncate=False)

+---------------------+-----------------+-----------+--------------+
|title                |ratingDescription|releaseyear|userratingsize|
+---------------------+-----------------+-----------+--------------+
|White Chicks         |80               |2004       |80            |
|Lucky Number Slevin  |100              |2006       |82            |
|Grey's Anatomy       |90               |2016       |80            |
|Prison Break         |90               |2008       |80            |
|How I Met Your Mother|70               |2014       |80            |
|Supernatural         |90               |2016       |80            |
|Breaking Bad         |110              |2013       |80            |
|The Vampire Diaries  |90               |2017       |80            |
|The Walking Dead     |110              |2015       |80            |
|Pretty Little Liars  |90               |2016       |80            |
|Once Upon a Time     |70               |2016       |80            |
|Sherlock             |90         

In [None]:
# menghapus data yang memiliki nilai null
df_fill = df_fill.na.drop()

In [None]:
# schema dari data
df_fill.printSchema()

root
 |-- title: string (nullable = true)
 |-- ratingDescription: integer (nullable = true)
 |-- releaseyear: integer (nullable = true)
 |-- userratingsize: integer (nullable = true)



In [None]:
#import library filtering data
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# membagi data menjadi data training dan data testing
(training, test) = df_fill.randomSplit([0.8, 0.2])

In [None]:
# membuat model pada training data
als = ALS(maxIter=15, regParam=0.09, userCol="ratingDescription", \
          itemCol="releaseyear", ratingCol="userratingsize", \
          coldStartStrategy="drop")
model = als.fit(training)

In [None]:
# membuat prediksi dan menampilkan hasil prediksi
predictions = model.transform(test)
predictions.show(truncate=False)

+-------------------------------------------------------------+-----------------+-----------+--------------+----------+
|title                                                        |ratingDescription|releaseyear|userratingsize|prediction|
+-------------------------------------------------------------+-----------------+-----------+--------------+----------+
|Animaniacs                                                   |41               |1997       |80            |79.58071  |
|Animaniacs                                                   |41               |1997       |80            |79.58071  |
|Dawn of the Croods                                           |41               |2017       |80            |81.268684 |
|Dino Squad                                                   |41               |2008       |82            |82.03045  |
|Lego DC Comics: Batman Be-Leaguered                          |41               |2014       |82            |82.12498  |
|The Deep                               

In [None]:
# Menampilkan 5 baris pertama RDD
predictions.take(5)

[Row(title='Animaniacs', ratingDescription=41, releaseyear=1997, userratingsize=80, prediction=79.5807113647461),
 Row(title='Animaniacs', ratingDescription=41, releaseyear=1997, userratingsize=80, prediction=79.5807113647461),
 Row(title='Dawn of the Croods', ratingDescription=41, releaseyear=2017, userratingsize=80, prediction=81.26868438720703),
 Row(title='Dino Squad', ratingDescription=41, releaseyear=2008, userratingsize=82, prediction=82.03044891357422),
 Row(title='Lego DC Comics: Batman Be-Leaguered', ratingDescription=41, releaseyear=2014, userratingsize=82, prediction=82.1249771118164)]

In [None]:
# menghitung nilai RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="userratingsize", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 2.2356122441196393


In [None]:
# membuat rekomendasi berdasarkan ratingDesc
recomSingle = test.filter(test['ratingDescription']==41)\
.select(['releaseyear','ratingDescription'])

recomSingle.show(truncate=False)

+-----------+-----------------+
|releaseyear|ratingDescription|
+-----------+-----------------+
|1997       |41               |
|1997       |41               |
|2017       |41               |
|2008       |41               |
|2014       |41               |
|2015       |41               |
|1989       |41               |
|2016       |41               |
+-----------+-----------------+



In [None]:
#membuat rekomendasi untuk satu kolom saja ratingDesc
recomendations = model.transform(recomSingle)
recomendations.orderBy('prediction',ascending=False).show(truncate=False)

+-----------+-----------------+----------+
|releaseyear|ratingDescription|prediction|
+-----------+-----------------+----------+
|2014       |41               |82.12498  |
|2008       |41               |82.03045  |
|2015       |41               |81.87812  |
|2016       |41               |81.804306 |
|2017       |41               |81.268684 |
|1997       |41               |79.58071  |
|1997       |41               |79.58071  |
+-----------+-----------------+----------+



In [None]:
#membuat rekomendasi untuk satu kolom saja ratingDesc
recomenAll = model.recommendForAllUsers(5)
recomenAll.show(truncate=False)

+-----------------+--------------------------------------------------------------------------------------------+
|ratingDescription|recommendations                                                                             |
+-----------------+--------------------------------------------------------------------------------------------+
|100              |[{2003, 82.622475}, {2008, 82.531845}, {2002, 82.4902}, {2011, 82.451096}, {1998, 82.11563}]|
|10               |[{2010, 82.18918}, {2012, 82.01984}, {2011, 81.954475}, {2014, 81.92988}, {2002, 81.77436}] |
|80               |[{2001, 81.462975}, {2011, 81.2135}, {2002, 80.81116}, {1982, 80.74129}, {2008, 80.52011}]  |
|70               |[{2011, 81.94851}, {2017, 81.46403}, {2012, 81.13612}, {2013, 80.82902}, {2008, 80.667175}] |
|60               |[{2011, 82.32549}, {2012, 81.90643}, {2013, 81.70754}, {2010, 81.24812}, {2001, 81.19743}]  |
|90               |[{2011, 81.17854}, {2017, 80.53702}, {2012, 80.53222}, {2016, 80.42053}, {201

In [None]:
# Ubah tampilan diatas menjadi format yang mudah dibaca
from pyspark.sql.functions import explode
from pyspark.sql.functions import col
convertData = recomenAll.withColumn("recommendations",\
                                    explode("recommendations"))
convertData = convertData.select("ratingDescription",\
                           col("recommendations.releaseyear"))
convertData.show(truncate=False)

+-----------------+-----------+
|ratingDescription|releaseyear|
+-----------------+-----------+
|100              |2003       |
|100              |2008       |
|100              |2002       |
|100              |2011       |
|100              |1998       |
|10               |2010       |
|10               |2012       |
|10               |2011       |
|10               |2014       |
|10               |2002       |
|80               |2001       |
|80               |2011       |
|80               |2002       |
|80               |1982       |
|80               |2008       |
|70               |2011       |
|70               |2017       |
|70               |2012       |
|70               |2013       |
|70               |2008       |
+-----------------+-----------+
only showing top 20 rows

