#connection with azure blob storage

In [0]:
#storageAccount = "movieStorageAcc"
#storageContainer = "movieStorageContainer"
#storage_account_key = "moviestorageAccKey"

#dbutils.fs.mount(source = f"wasbs://{storageContainer}@{storageAccount}.blob.core.windows.net/",
#                mount_point = "/mnt/rawStore",
#                extra_configs = {f"fs.azure.account.key.{storageAccount}.blob.core.windows.net":storage_account_key})

In [0]:
# Accessing the file path
display(dbutils.fs.ls("/mnt/rawStore/RawData"))

In [0]:
# JDBC Connection

jdbcHostName = "jdbchostname"
jdbcPort = 1433
jdbcDatabaseName = "databaseName"
jdbcUserName = "userName"
jdbcPassword = "jdbcPass"
jdbcDriver = "jdbcDriver"

jdbcUrl = f"jdbc:sqlserver://{jdbcHostName}:{jdbcPort};databaseName={jdbcDatabaseName};user={jdbcUserName};password={jdbcPassword}"

In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import *

In [0]:
# define the schema for movie
movie_schm = StructType([StructField("movieId", IntegerType(), True),
                        StructField("title", StringType(), True),
                        StructField("genres", StringType(), True)])

In [0]:
# creating movie data frame
movie_df = spark.read.format("csv").option("header", True).schema(movie_schm).load("/mnt/rawStore/RawData/movies.csv")
display(movie_df)

In [0]:
# define Schema for ratings
rating_schm = StructType([StructField("userId", IntegerType(), True),
                          StructField("movieId", IntegerType(), True),
                          StructField("rating", FloatType(), True),
                          StructField("timestamp", StringType(), True)])

In [0]:
# create dataframe for rating as per schema
rating_df = spark.read.format("csv").option("header", True).schema(rating_schm).load("/mnt/rawStore/RawData/ratings.csv")
rating_df.show(5)

In [0]:
# udf function to extract title without year
getTitle = f.udf(lambda x: x.split(" (")[0], StringType())


#using udf to extract tile and year
movie_df = movie_df.withColumn("title_mod", getTitle("title")).withColumn("year", f.regexp_extract('title', r"\((\d{4})\)", 1))
# refined movie data
movie_df_refined = movie_df.select("movieId", f.col("title_mod").alias("title"), "year", "genres")

# seggregated genre
genres_segg = movie_df_refined.withColumn("genre", f.explode(f.split("genres", "\|"))).drop("genres")
display(genres_segg)

In [0]:
# to check if there is any null values in column
null_counts = rating_df.agg(*[
    f.sum(f.col(column).isNull().cast("integer")).alias(column)
    for column in rating_df.columns
])
null_counts.show()

In [0]:
# write refined movie data and rating data to JDBC

#movie_df_refined.write.format("jdbc").options(url = jdbcUrl, dbtable = "dbo.movies").mode("append").save()
# rating_df.write.format("jdbc").options(url = jdbcUrl, dbtable = "dbo.ratings").mode("append").save()

In [0]:
# create dataframe using JDBC table

moviedfJdbc = spark.read.format("jdbc").option("url", jdbcUrl).option("dbtable", "dbo.movies").load()
moviedfJdbc.show()

In [0]:
# get average rating as per movies

av_ratings_df = rating_df.groupBy("movieId").agg(f.avg("rating").alias("avg_rating"))
av_ratings_df.show()

In [0]:
# get top rated movies

moviesWithRating = movie_df_refined.join(av_ratings_df, movie_df_refined["movieId"] == av_ratings_df["movieId"], "inner").orderBy(f.col("avg_rating").desc()).drop(av_ratings_df["movieId"])
moviesWithRating.show()

In [0]:
# get top rated genre

ratingAsPerGenre = genres_segg.join(rating_df, rating_df["movieId"] == genres_segg["movieId"]).drop(rating_df["movieId"])
av_rating_pergenre = ratingAsPerGenre.groupBy("genre").agg(f.avg("rating").alias("rating")).orderBy(f.col("rating").desc())
display(av_rating_pergenre)

In [0]:
# Get user ratings for a specific user
user_id = 100
user_ratings = rating_df.filter(f.col("userId") == user_id)
user_ratings.show()

In [0]:
# Join user ratings with movie titles
user_ratings_acc_movies = user_ratings.join(movie_df_refined, movie_df_refined["movieId"] == user_ratings["movieId"]).drop(user_ratings["movieId"], "inner")
# sort the user by rating in desc order
sorted_user_ratings = user_ratings_acc_movies.orderBy(f.col("rating").desc())
# display the user most rated movies
display(sorted_user_ratings)

In [0]:
# join user rating acc to genre
user_rating_acc_genre = user_ratings.join(genres_segg, user_ratings["movieId"] == genres_segg["movieId"], "inner").drop(user_ratings["movieId"])
grp_acc_genre = user_rating_acc_genre.groupBy("genre", "userId").agg(f.avg("rating").alias("rating"))
# sort the user by rating in desc order
sorted_rating_acc_genre = grp_acc_genre.select("userId", "genre", "rating").orderBy(f.col("rating").desc())
# display rating as per genre
display(sorted_rating_acc_genre)