In [0]:
adlsAccountName="sskstoragelearning"
adlsContainername="validated"
adlsFolderName="Data"
mountpoint="/mnt/Files/validated"
applicationId=dbutils.secrets.get(scope="moverecscp1",key="clientappid")
authenticationKey=dbutils.secrets.get(scope="moverecscp1",key="clientsecretval")
tenantId=dbutils.secrets.get(scope="moverecscp1",key="clienttenantid")
endpoint="https://login.microsoftonline.com/"+tenantId+"/oauth2/token"
source="abfss://"+adlsContainername+"@"+adlsAccountName+".dfs.core.windows.net/"+adlsFolderName
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": applicationId,
          "fs.azure.account.oauth2.client.secret": authenticationKey,
          "fs.azure.account.oauth2.client.endpoint": endpoint}
#Mounting ADLS storage to DBFS
#mount if the directory is not already mounted 
if not any(mount.mountPoint==mountpoint for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source=source,
        mount_point=mountpoint,
        extra_configs=configs)

In [0]:
dbutils.fs.ls('/mnt/Files/validated')

[FileInfo(path='dbfs:/mnt/Files/validated/movies.csv', name='movies.csv', size=3038099, modificationTime=1707818626000),
 FileInfo(path='dbfs:/mnt/Files/validated/ratings.csv', name='ratings.csv', size=678260987, modificationTime=1708000847000),
 FileInfo(path='dbfs:/mnt/Files/validated/tags.csv', name='tags.csv', size=38810332, modificationTime=1708000826000)]

In [0]:
movies_filename='dbfs:/mnt/Files/validated/movies.csv'
rating_filename='dbfs:/mnt/Files/validated/ratings.csv'

In [0]:
%fs 
ls /mnt/Files/validated/

path,name,size,modificationTime
dbfs:/mnt/Files/validated/movies.csv,movies.csv,3038099,1707818626000
dbfs:/mnt/Files/validated/ratings.csv,ratings.csv,678260987,1708000847000
dbfs:/mnt/Files/validated/tags.csv,tags.csv,38810332,1708000826000


In [0]:
from pyspark.sql.types import *
#working on movies.csv file
movies_with_genres_df_schema = StructType(
[StructField('ID',IntegerType()),
 StructField('title',StringType()),
 StructField('genres',StringType())]
)

movies_df_schema = StructType(
   [StructField('ID',IntegerType()),
    StructField('title',StringType())]
)


In [0]:
movies_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(movies_df_schema).load(movies_filename)
movies_with_genres_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(movies_with_genres_df_schema).load(movies_filename)

#inspecting the dataframe before the transformation

In [0]:
movies_df.show(4,truncate=False)
movies_with_genres_df.show(4,truncate=False)

+---+------------------------+
|ID |title                   |
+---+------------------------+
|1  |Toy Story (1995)        |
|2  |Jumanji (1995)          |
|3  |Grumpier Old Men (1995) |
|4  |Waiting to Exhale (1995)|
+---+------------------------+
only showing top 4 rows

+---+------------------------+-------------------------------------------+
|ID |title                   |genres                                     |
+---+------------------------+-------------------------------------------+
|1  |Toy Story (1995)        |Adventure|Animation|Children|Comedy|Fantasy|
|2  |Jumanji (1995)          |Adventure|Children|Fantasy                 |
|3  |Grumpier Old Men (1995) |Comedy|Romance                             |
|4  |Waiting to Exhale (1995)|Comedy|Drama|Romance                       |
+---+------------------------+-------------------------------------------+
only showing top 4 rows



#Rating data Analysis

In [0]:
from pyspark.sql.types import *

ratings_df_schema = StructType(
[StructField('userId',IntegerType()),
 StructField('moveId',IntegerType()),
 StructField('rating',DoubleType())]
)

ratings_df = sqlContext.read.format('com.databricks.spark.csv').options(header=True,inferSchema=False).schema(ratings_df_schema).load(rating_filename)
ratings_df.show(4,truncate=False)

+------+------+------+
|userId|moveId|rating|
+------+------+------+
|1     |296   |5.0   |
|1     |306   |3.5   |
|1     |307   |5.0   |
|1     |665   |5.0   |
+------+------+------+
only showing top 4 rows



In [0]:
movies_df.cache()
ratings_df.cache()

DataFrame[userId: int, moveId: int, rating: double]

In [0]:
from pyspark.sql import functions as F
movie_ids_with_avg_ratings_df=ratings_df.groupBy('moveId').agg(F.count(ratings_df.rating).alias("count"),F.avg(ratings_df.rating).alias("average"))
print("movie_ids_with_avg_ratings_df:")
movie_ids_with_avg_ratings_df.show(4,truncate=False)

movie_ids_with_avg_ratings_df:
+------+-----+------------------+
|moveId|count|average           |
+------+-----+------------------+
|1088  |11935|3.25002094679514  |
|1580  |40308|3.5817083457378187|
|3175  |14659|3.6077836141619484|
|44022 |4833 |3.2593627146699773|
+------+-----+------------------+
only showing top 4 rows



In [0]:
movie_names_with_avg_rating_df=movie_ids_with_avg_ratings_df.join(movies_df,F.col("MoveId") == F.col("ID")).drop('ID')
movie_names_with_avg_rating_df.show(4,truncate=False)

+------+-----+------------------+--------------------------------+
|moveId|count|average           |title                           |
+------+-----+------------------+--------------------------------+
|1088  |11935|3.25002094679514  |Dirty Dancing (1987)            |
|1580  |40308|3.5817083457378187|Men in Black (a.k.a. MIB) (1997)|
|3175  |14659|3.6077836141619484|Galaxy Quest (1999)             |
|44022 |4833 |3.2593627146699773|Ice Age 2: The Meltdown (2006)  |
+------+-----+------------------+--------------------------------+
only showing top 4 rows



In [0]:
movies_with_500_ratings_or_more = movie_names_with_avg_rating_df.filter(movie_names_with_avg_rating_df['count'] >= 500).orderBy('average',acending=False)
movies_with_500_ratings_or_more.show(4,truncate=False)


+------+-----+------------------+----------------------+
|moveId|count|average           |title                 |
+------+-----+------------------+----------------------+
|4775  |669  |1.1255605381165918|Glitter (2001)        |
|61348 |557  |1.2055655296229804|Disaster Movie (2008) |
|6587  |758  |1.2143799472295516|Gigli (2003)          |
|31698 |633  |1.2322274881516588|Son of the Mask (2005)|
+------+-----+------------------+----------------------+
only showing top 4 rows



In [0]:
from pyspark.sql import functions as F
seed=4
(split_60_df,split_a_20_df,split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

training_df = split_60_df.cache()
validation_df = split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0},Validation: {1},test: {2}\n'.format(
    training_df.count(), validation_df.count(), test_df.count())
)

training_df.show(4,truncate=False)
validation_df.show(4,truncate=False)
test_df.show(4,truncate=False)

Training: 14999112,Validation: 4999908,test: 5001075

+------+------+------+
|userId|moveId|rating|
+------+------+------+
|1     |306   |3.5   |
|1     |307   |5.0   |
|1     |665   |5.0   |
|1     |899   |3.5   |
+------+------+------+
only showing top 4 rows

+------+------+------+
|userId|moveId|rating|
+------+------+------+
|1     |1250  |4.0   |
|1     |2011  |2.5   |
|1     |2161  |3.5   |
|1     |2351  |4.5   |
+------+------+------+
only showing top 4 rows

+------+------+------+
|userId|moveId|rating|
+------+------+------+
|1     |296   |5.0   |
|1     |1217  |3.5   |
|1     |2068  |2.5   |
|1     |2843  |4.5   |
+------+------+------+
only showing top 4 rows



In [0]:
from pyspark.ml.recommendation import ALS
als = ALS()

als.setPredictionCol("prediction")\
    .setMaxIter(5)\
    .setSeed(seed)\
    .setRegParam(0.1)\
    .setUserCol("userId")\
    .setItemCol("moveId")\
    .setRatingCol("rating")\
    .setRank(8)

my_ratings_model = als.fit(training_df)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

reg_eval = RegressionEvaluator(predictionCol="prediction",labelCol="rating",metricName="rmse")
my_predict_df=my_ratings_model.transform(test_df)

predicated_test_my_ratings = my_predict_df.filter(my_predict_df.prediction != float('nan'))

test_RMSE_my_ratings = reg_eval.evaluate(predicated_test_my_ratings)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))

dbutils.widgets.text("input","5","")
ins = dbutils.widgets.get("input")
uid=int(ins)
ll=predicated_test_my_ratings.filter(col("userId")==uid)


The model had a RMSE on the test set of 0.8137715461272659


In [0]:
MovieRec = ll.join(movies_df,F.col("moveId") == F.col("ID")).drop('ID').select('title').take(10)
l=dbutils.notebook.exit(MovieRec)