In [0]:
%run /Repos/ruddys@sandiego.edu/Movie-Recommendation-system-using-Azure-and-Databricks/Analysis/Authorization

In [0]:
#import required libraries
import datetime
import pyspark.sql.functions as f
import pyspark.sql.types
import pandas as pd

from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import Window
from pyspark.sql.functions import rank, min

In [0]:
#test check to access the container
dbutils.fs.ls("abfss://containerdatalake@samovieanalysis.dfs.core.windows.net")

In [0]:
# list all the mounts points
dbutils.fs.mounts()

In [0]:
#Unzip the file Using the DBFS
#copy thie file into local system file
dbutils.fs.cp("dbfs:/mnt/movieanalysis/ml_latest_small.zip", "file:/tmp/ml_latest_small.zip")

In [0]:
%sh
# ls -l /tmp/ml_latest_small.zip

In [0]:
# %sh
# rm -r /tmp/ml_latest_small.zip/

In [0]:
%sh 
unzip /tmp/ml_latest_small.zip -d /tmp

In [0]:
# Now if we use the `ls` we will see all the files that already copued into the local file system.
dbutils.fs.ls("file:/tmp/ml-latest-small/")

In [0]:
# Copy the files back from the local file system into the actual container (Edge Lake Container)
dbutils.fs.cp("file:/tmp/ml-latest-small/movies.csv", "abfss://containerdatalake@samovieanalysis.dfs.core.windows.net/movies.csv") 
dbutils.fs.cp("file:/tmp/ml-latest-small/ratings.csv", "abfss://containerdatalake@samovieanalysis.dfs.core.windows.net/ratings.csv") 
dbutils.fs.cp("file:/tmp/ml-latest-small/tags.csv", "abfss://containerdatalake@samovieanalysis.dfs.core.windows.net/tags.csv") 
dbutils.fs.cp("file:/tmp/ml-latest-small/links.csv", "abfss://containerdatalake@samovieanalysis.dfs.core.windows.net/links.csv") 


In [0]:
# Note: So any line that starts with % should be the beginning of a new input cell. So, if you have any cell start with '%' you should not add any comment or note. 

In [0]:
%fs ls abfss://containerdatalake@samovieanalysis.dfs.core.windows.net/

In [0]:
# File location and type

# Read "Movie" dataset
file_location_movie = "/mnt/movieanalysis/movies.csv"

file_type = "csv"



# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
 
df_movies = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location_movie)
    
display(df_movies)
df_movies.count()

In [0]:
# Read "link" dataset
file_location_link = "/mnt/movieanalysis/links.csv"
df_links = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location_link)
    
display(df_links)

In [0]:
# Read "tags" dataset
file_location_tags = "/mnt/movieanalysis/tags.csv"
df_tags = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location_tags)
    
display(df_tags)

In [0]:
# Read "ratings" dataset
file_location_ratings = "/mnt/movieanalysis/ratings.csv"
df_ratings = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location_ratings)
    
display(df_ratings)

In [0]:
# Join Movies and rating dataset together
movies_rating_df = df_movies.join(df_ratings, 'movieId','left')
display(movies_rating_df)

In [0]:
# Join with tags dataset
movies_rating_df=movies_rating_df.join(df_tags,['movieID','userId'],'inner')
display(movies_rating_df)

In [0]:
#Join with ratings with tag
df_ratings_tags=df_ratings.join(df_tags,['movieID'],'inner')
display(movies_rating_df)

In [0]:
df_ratings_add_tsdate=df_ratings.withColumn("tsdate",f.from_unixtime("timestamp"))
df_ratings_add_tsdate.display()

In [0]:
df_selected_rating_date=df_ratings_add_tsdate.select('userid','movieid','rating',f.to_date(unix_timestamp('tsdate','yyyy-MM-dd HH:mm:ss').cast('timestamp')).alias('rating_date'))
df_selected_rating_date.display()

In [0]:
df_rating_year=df_selected_rating_date.groupBy('rating_date').count()
df_rating_year.display()

In [0]:
df_avg_ratings=df_selected_rating_date.groupBy('movieid').mean('rating')
df_avg_ratings.display()

In [0]:
df=df_avg_ratings.join(df_movies,'movieid', 'inner')
df=df.withColumnRenamed('avg(rating)', 'avg_rating')
df.display()

In [0]:
df_total_rating=df_selected_rating_date.groupBy('movieid').count()
df_total_rating.display()

In [0]:
df_total_rating=df_total_rating.filter(df_total_rating['count']>50)
df_ratings_filtered=df_selected_rating_date.join(df_total_rating, 'movieid', 'inner')
df_ratings_filtered.display()

In [0]:
df_rating_per_user=df_ratings_filtered.select('userid','movieid','rating').groupBy('userid', 'movieid').max('rating')
df_rating_per_user_movie=df_rating_per_user.join(df_movies,'movieid','inner')
df_rating_per_user_movie=df_rating_per_user_movie.withColumnRenamed('max(rating)', 'max_rating')
df_rating_per_user_movie.display()

In [0]:
df_rating_max_rating=df_rating_per_user_movie.groupBy('userid','movieid','title','genres').max('max_rating')
df_rating_max_rating=df_rating_max_rating.withColumnRenamed('max(max_rating)', 'max_rating')
df_rating_max_rating.display()

In [0]:
df_rating_max_rating=df_rating_max_rating.filter(df_rating_max_rating['max_rating']>=4)
df_rating_max_rating.display()

In [0]:
#identify best movies per genre
df_movies_per_genere=df_rating_max_rating.groupBy('genres','title').count()
df_movies_per_genere.display()

In [0]:
#identify genres of user
df_ratings_genre=df_rating_max_rating.select('userid','title','genres').groupBy('userid','genres').count()
df_ratings_genre.display()

In [0]:
#latest Trending Movies Over all
df_recent_movie=df_ratings_filtered.groupBy('userid','movieid').agg(f.max(df_ratings_filtered['rating_date']))
df_recent_movie.display()

In [0]:
df_movies_per_genere=df.groupBy('genres').avg('avg_rating')
display(df_movies_per_genere)