In [0]:

dbutils.fs.unmount("/mnt/movieLens/")
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "ff63fa47-78e9-47ca-b9bb-0f08b411e5f6",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="app-secret",key="service-principle-secret"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/7f77ffeb-3a63-436d-85f5-46b045efa8d7/oauth2/token"}

dbutils.fs.mount(
  source = "abfss://landingarea@mrdlsg2.dfs.core.windows.net/",
  mount_point = "/mnt/movieLens/",
  extra_configs = configs)

dbutils.fs.ls("/mnt/movieLens/movie_related_information")



/mnt/movieLens/ has been unmounted.


[FileInfo(path='dbfs:/mnt/movieLens/movie_related_information/genome-scores.parquet', name='genome-scores.parquet', size=72309528, modificationTime=1711275446000),
 FileInfo(path='dbfs:/mnt/movieLens/movie_related_information/genome-tags.parquet', name='genome-tags.parquet', size=16314, modificationTime=1711275296000),
 FileInfo(path='dbfs:/mnt/movieLens/movie_related_information/links.parquet', name='links.parquet', size=1138717, modificationTime=1711275248000),
 FileInfo(path='dbfs:/mnt/movieLens/movie_related_information/movies.parquet', name='movies.parquet', size=1745860, modificationTime=1711275295000),
 FileInfo(path='dbfs:/mnt/movieLens/movie_related_information/ratings.parquet', name='ratings.parquet', size=425618265, modificationTime=1711275635000),
 FileInfo(path='dbfs:/mnt/movieLens/movie_related_information/tags.parquet', name='tags.parquet', size=19641860, modificationTime=1711275293000)]

In [0]:


genomeScores = spark.read.option("header","true").option("inferSchema","true").parquet("/mnt/movieLens/movie_related_information/genome-scores.parquet")
genomeTags = spark.read.option("header","true").parquet("/mnt/movieLens/movie_related_information/genome-tags.parquet")
links = spark.read.option("header","true").parquet("/mnt/movieLens/movie_related_information/links.parquet")
movies = spark.read.option("header","true").option("inferSchema","true").parquet("/mnt/movieLens/movie_related_information/movies.parquet")
ratings = spark.read.option("header","true").parquet("/mnt/movieLens/movie_related_information/ratings.parquet")
tags = spark.read.option("header","true").parquet("/mnt/movieLens/movie_related_information/tags.parquet")





In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType,DoubleType,TimestampNTZType

movies.printSchema()

movies=movies.withColumn("movieId",col("movieId").cast(IntegerType()))

movies.printSchema()




root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [0]:
ratings.printSchema()

ratings=ratings.withColumn("userId",col("userId").cast(IntegerType()))\
        .withColumn("movieId",col("movieId").cast(IntegerType()))\
        .withColumn("rating",col("rating").cast(DoubleType()))

ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
genomeTags.printSchema()

genomeTags=genomeTags.withColumn("tagId",col("tagId").cast(IntegerType()))

genomeTags.printSchema()

root
 |-- tagId: string (nullable = true)
 |-- tag: string (nullable = true)

root
 |-- tagId: integer (nullable = true)
 |-- tag: string (nullable = true)



In [0]:
genomeScores.printSchema()

genomeScores=genomeScores.withColumn("movieId",col("movieId").cast(IntegerType()))\
        .withColumn("tagId",col("tagId").cast(IntegerType()))

genomeScores.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- tagId: string (nullable = true)
 |-- relevance: string (nullable = true)

root
 |-- movieId: integer (nullable = true)
 |-- tagId: integer (nullable = true)
 |-- relevance: string (nullable = true)



In [0]:
tags.printSchema()

tags=tags.withColumn("movieId",col("movieId").cast(IntegerType()))\
        .withColumn("userId",col("userId").cast(IntegerType()))

tags.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [0]:
sqlstr = "create database if not exists movie_lens"
spark.sql(sqlstr)

genomeScores.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("movie_lens.genomeScores")
genomeTags.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("movie_lens.genomeTags")
links.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("movie_lens.links")
movies.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("movie_lens.movies")
ratings.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("movie_lens.ratings")
tags.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("movie_lens.tags")

In [0]:
import pyspark.sql.functions as sf

mean_ratings = ratings.groupBy("movieId").agg(sf.count("*").alias("tot_count"),sf.avg("rating").alias("avg_rating"))
mean_ratings.show(5)
mean_ratings_nm = mean_ratings.join(movies,"movieId").select("movieId","title","tot_count","avg_rating")

min_rating = mean_ratings_nm.agg(sf.min("avg_rating")).collect()[0]["min(avg_rating)"]
print(min_rating)
mean_ratings_nm.select("*").filter(mean_ratings_nm.avg_rating==min_rating).show(3)

max_rating = mean_ratings_nm.agg(sf.max("avg_rating")).collect()[0]["max(avg_rating)"]
print(max_rating)
mean_ratings_nm.select("*").filter(mean_ratings_nm.avg_rating==max_rating).show(3)




+-------+---------+------------------+
|movieId|tot_count|        avg_rating|
+-------+---------+------------------+
|    148|      335| 2.908955223880597|
|   1088|    11935|  3.25002094679514|
|  68135|     2676| 3.062219730941704|
|   3175|    14659|3.6077836141619484|
|   1645|    13496| 3.547347362181387|
+-------+---------+------------------+
only showing top 5 rows

0.5
+-------+--------------------+---------+----------+
|movieId|               title|tot_count|avg_rating|
+-------+--------------------+---------+----------+
| 205371|Black Sabbath: Th...|        1|       0.5|
| 155959|White Bondage (1937)|        1|       0.5|
| 202437|    The Climb (2019)|        2|       0.5|
+-------+--------------------+---------+----------+
only showing top 3 rows

5.0
+-------+--------------------+---------+----------+
|movieId|               title|tot_count|avg_rating|
+-------+--------------------+---------+----------+
| 167666|The Capture of th...|        1|       5.0|
| 164278|Going Attr

In [0]:
from pyspark.sql import functions as sf
from pyspark.sql.functions import pandas_udf, PandasUDFType

tot_avg_cnt = mean_ratings_nm.agg(sf.avg("tot_count")).collect()[0]["avg(tot_count)"]
print(tot_avg_cnt)

tot_avg_rat = mean_ratings_nm.agg(sf.avg("avg_rating")).collect()[0]["avg(avg_rating)"]
print(tot_avg_rat)

@pandas_udf('double', PandasUDFType.GROUPED_AGG)  
def Bayesian_avg(rating):
    return (float)(tot_avg_cnt*tot_avg_rat+rating.sum())/(tot_avg_cnt+rating.count())

bay_avg_ratings = ratings.groupBy("movieId").agg(Bayesian_avg(ratings.rating).alias("bay_rating"))

423.3931444442563
3.071373920951132




In [0]:
bay_avg_ratings_nm = bay_avg_ratings.join(movies,"movieId").select("movieId","title","bay_rating","genres")

min_rating = bay_avg_ratings_nm.agg(sf.min("bay_rating")).collect()[0]["min(bay_rating)"]
print(min_rating)
bay_avg_ratings_nm.select("*").filter(bay_avg_ratings_nm.bay_rating==min_rating).show(3)

max_rating = bay_avg_ratings_nm.agg(sf.max("bay_rating")).collect()[0]["max(bay_rating)"]
print(max_rating)
bay_avg_ratings_nm.select("*").filter(bay_avg_ratings_nm.bay_rating==max_rating).show(3)

bay_avg_ratings_nm.sort("bay_rating", ascending=False).show(5)

bay_avg_ratings_nm.sort("bay_rating", ascending=True).show(5)

1.7131155745651407
+-------+--------------------+------------------+-------------+
|movieId|               title|        bay_rating|       genres|
+-------+--------------------+------------------+-------------+
|   3593|Battlefield Earth...|1.7131155745651407|Action|Sci-Fi|
+-------+--------------------+------------------+-------------+

4.406637765911728
+-------+--------------------+-----------------+-----------+
|movieId|               title|       bay_rating|     genres|
+-------+--------------------+-----------------+-----------+
|    318|Shawshank Redempt...|4.406637765911728|Crime|Drama|
+-------+--------------------+-----------------+-----------+

+-------+--------------------+-----------------+--------------------+
|movieId|               title|       bay_rating|              genres|
+-------+--------------------+-----------------+--------------------+
|    318|Shawshank Redempt...|4.406637765911728|         Crime|Drama|
|    858|Godfather, The (1...|4.314311946380134|        

In [0]:
bay_avg_ratings_gn = bay_avg_ratings_nm.select("movieId","title","bay_rating",sf.split(col("genres"),'\|').alias("genres_arr"))
bay_avg_ratings_gn.show(5)

genre_count = bay_avg_ratings_gn.select("movieId","title","bay_rating",sf.explode("genres_arr").alias("genre")).groupBy("genre").agg(sf.count("*").alias("movie_cnt")).sort("movie_cnt", ascending=False)
genre_count.show()

bay_avg_ratings_gn.select("title","bay_rating","genres_arr").filter(sf.array_contains(bay_avg_ratings_gn.genres_arr, "Thriller")).sort("bay_rating", ascending=False).show(5)



+-------+--------------------+------------------+--------------------+
|movieId|               title|        bay_rating|          genres_arr|
+-------+--------------------+------------------+--------------------+
|      1|    Toy Story (1995)| 3.887677029091847|[Adventure, Anima...|
|     12|Dracula: Dead and...| 2.669992547891253|    [Comedy, Horror]|
|     22|      Copycat (1995)|  3.30911984472808|[Crime, Drama, Ho...|
|     26|      Othello (1995)| 3.530454469581147|             [Drama]|
|     27| Now and Then (1995)|3.3335440489165347|   [Children, Drama]|
+-------+--------------------+------------------+--------------------+
only showing top 5 rows

+------------------+---------+
|             genre|movie_cnt|
+------------------+---------+
|             Drama|    24465|
|            Comedy|    16051|
|          Thriller|     8330|
|           Romance|     7305|
|            Action|     6913|
|            Horror|     5746|
|       Documentary|     5453|
|             Crime|     5

In [0]:
dbutils.fs.ls("/mnt/movieLens/")

bay_avg_ratings_gn.write.mode("overwrite").option("header","true").parquet("/mnt/movieLens/transformed_data/Bayesian_avg_ratings_genre")
genre_count.write.mode("overwrite").option("header","true").parquet("/mnt/movieLens/transformed_data/genre_count")
bay_avg_ratings_nm.write.mode("overwrite").option("header","true").parquet("/mnt/movieLens/transformed_data/Bayesian_avg_ratings_movie")
#dbutils.fs.ls("/mnt/movieLens/transformed_data/")

#dbutils. fs. refreshMounts()

In [0]:
tags.show(3)
genomeScores.show(3)
genomeTags.show(3)

+------+-------+--------------+----------+
|userId|movieId|           tag| timestamp|
+------+-------+--------------+----------+
|145919|    551|       musical|1312330643|
| 14116|     89|child in peril|1309883418|
|  6550| 146992|     virginity|1527140669|
+------+-------+--------------+----------+
only showing top 3 rows

+-------+-----+--------------------+
|movieId|tagId|           relevance|
+-------+-----+--------------------+
|    621|  538| 0.04425000000000001|
|    923|  215|  0.9039999999999999|
|   6659| 1058|0.034499999999999975|
+-------+-----+--------------------+
only showing top 3 rows

+-----+-----------------+
|tagId|              tag|
+-----+-----------------+
|  356|entirely dialogue|
|    2|     007 (series)|
|  263|  courtroom drama|
+-----+-----------------+
only showing top 3 rows



In [0]:
from pyspark.sql.window import Window

movie_tag_rel = genomeScores.join(genomeTags,"tagId").select("movieId","tag","relevance")
movie_tag_rel.show(5)


movie_rel = tags.join(movie_tag_rel,["movieId","tag"]).select("movieId","tag","relevance").distinct()
movie_rel = movie_rel.withColumn("row_num", sf.row_number().over(Window.partitionBy(["movieId"]).orderBy(sf.desc("relevance"))))
movie_rel.show(5)
movie_rel.printSchema()
top_five_tags = movie_rel.filter(movie_rel.row_num<5).groupBy("movieId").agg(sf.collect_list("tag").alias("top_tags"))

top_movie_tags = top_five_tags.join(movies,"movieId").select("movieId","title","top_tags")

top_movie_tags.show(10)


+-------+--------------------+--------------------+
|movieId|                 tag|           relevance|
+-------+--------------------+--------------------+
|    621|         immortality| 0.04425000000000001|
|    923|             classic|  0.9039999999999999|
|   6659|unintentionally f...|0.034499999999999975|
| 110194|           not funny|               0.267|
|  40723|  exceptional acting| 0.11275000000000002|
+-------+--------------------+--------------------+
only showing top 5 rows

+-------+------------------+------------------+-------+
|movieId|               tag|         relevance|row_num|
+-------+------------------+------------------+-------+
|      1|              toys|           0.99925|      1|
|      1|computer animation|           0.99875|      2|
|      1|   kids and family|0.9857499999999999|      3|
|      1|         animation|0.9842500000000001|      4|
|      1|              kids|              0.98|      5|
+-------+------------------+------------------+-------+
onl

In [0]:
top_movie_tags.write.mode("overwrite").option("header","true").parquet("/mnt/movieLens/transformed_data/top_movie_tags")