In [1]:
%%capture

!apt-get update
!apt-get install -y openjdk-8-jdk-headless -qq
!apt-get install maven -qq

!curl -L "https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz" > spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install pyspark py4j
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql import functions
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
#from pyspark.sql.Column import isNull
from pyspark.sql.functions import lit
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "16g").getOrCreate()

In [2]:

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("Collaborative filtering").getOrCreate()

# from pyspark.sql.functions import col,lit
# from pyspark.sql.functions import sum,avg,max,min,mean,count



In [3]:
moviesDF=spark.read.options(header='True',inferSchema='True').csv('movies.csv')
ratingsDF=spark.read.options(header='True',inferSchema='True').csv('ratings.csv')

moviesDF.show()
ratingsDF.show()


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [7]:
# join dataframes

display(moviesDF)    # to get a better view

DataFrame[movieId: int, title: string, genres: string]

In [8]:
ratingsDF.join(moviesDF,"movieid","left").show()   # left join i.e. left table is  ratings

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

In [10]:
ratings=ratingsDF.join(moviesDF,"movieid","left")  # stroing in variable
(train,test)=ratings.randomSplit([0.8,0.2])   # 80% for training , 20% for testing , this will return 2 df

In [11]:
train.show()

+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|     1|   4.0| 964982703|Toy Story (1995)|Adventure|Animati...|
|      1|     5|   4.0| 847434962|Toy Story (1995)|Adventure|Animati...|
|      1|     7|   4.5|1106635946|Toy Story (1995)|Adventure|Animati...|
|      1|    15|   2.5|1510577970|Toy Story (1995)|Adventure|Animati...|
|      1|    17|   4.5|1305696483|Toy Story (1995)|Adventure|Animati...|
|      1|    18|   3.5|1455209816|Toy Story (1995)|Adventure|Animati...|
|      1|    19|   4.0| 965705637|Toy Story (1995)|Adventure|Animati...|
|      1|    31|   5.0| 850466616|Toy Story (1995)|Adventure|Animati...|
|      1|    32|   3.0| 856736119|Toy Story (1995)|Adventure|Animati...|
|      1|    33|   3.0| 939647444|Toy Story (1995)|Adventure|Animati...|
|      1|    40|   5.0| 832058959|Toy Story (1995)|

In [12]:
# creating ALS Model

from pyspark.ml.recommendation import ALS

als=ALS(userCol="userId",itemCol="movieId",ratingCol="rating",nonnegative=True,implicitPrefs=False,coldStartStrategy="drop")
# userCol me , kisko recomend krna
# itemCol m , kya recomend krne
#nonnegative ko true because ratings are +ve
#implicitprefs m false because we using explicit
#coldstartstrategy means drop those users who not take part in ratings

In [13]:
# Hypertuning and cross validation (AI related - multiple models)

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder , CrossValidator

In [16]:
# creating models

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
           # 16 models will be created i.e. 4*4

In [17]:
# evaluator

evaluator = RegressionEvaluator(
           metricName="rmse",
           labelCol="rating",
           predictionCol="prediction")

In [18]:
# cross validator will give us best model

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)  # numFolds=5 , to eptimize to 5 times

In [None]:
model=cv.fit(train)    # model with data
bestModel=model.bestModel


testPrediction=bestModel.transform(test)
RMSE=evaluator(testPrediction)
print(RMSE)    # to get accuracy percentage of prediction

In [None]:
# oral only

recommendations = best_model.recommendForAllUsers(5)

# COMMAND ----------

df = recommendations

# COMMAND ----------

display(df)

# COMMAND ----------

df2 = df.withColumn("movieid_rating", explode("recommendations"))

# COMMAND ----------

display(df2)

# COMMAND ----------

display(df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")))

# COMMAND ----------
