**Importing general libraries**

In [None]:
from pyspark.sql.types import *
import pyspark.sql.functions as func

movies_df_schema = StructType(
  [StructField('movieId', IntegerType()),
   StructField('title', StringType()),
   StructField('genres', StringType())
  ]) 

# again for avoiding the action we are explicitly defining the schema
ratings_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('rating', DoubleType()),
   StructField('timestamp', StringType())
  ]
)              #we are dropping the Time Stamp column

tags_df_schema = StructType(
  [StructField('userId', IntegerType()),
   StructField('movieId', IntegerType()),
   StructField('tag', StringType()),
   StructField('timestamp', StringType())
  ]
)  


giving path of the dataset files

In [None]:
movies_path="DataSet/movies.csv"
ratings_path="DataSet/ratings.csv"
tags_path="DataSet/tags.csv"

# Creating the dataframes 
Movies_DF = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(movies_df_schema).load(movies_path)
Ratings_DF = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(ratings_df_schema).load(ratings_path)
Tags_DF = sqlContext.read.format('com.databricks.spark.csv').options(header=True, inferSchema=False).schema(tags_df_schema).load(tags_path)

In [None]:
Movies_DF.printSchema()
Ratings_DF.printSchema()
Tags_DF.printSchema()

In [None]:
Movies_DF.show(5)
Ratings_DF.show(5)
Tags_DF.show(5)

In [None]:
print(('Movies_DF',('rows',Movies_DF.count()), ('colums',len(Movies_DF.columns))))
print(('Ratings_DF',('rows',Ratings_DF.count()), ('colums',len(Ratings_DF.columns))))
print(('Tags_DF',('rows',Tags_DF.count()), ('colums',len(Tags_DF.columns))))

# Part A


**#Please solve the following questions using spark dataframes.**

1- Find out all the movies that have ratings greater than 4.


In [None]:

Movies_DF.join(Ratings_DF,Movies_DF.movieId==Ratings_DF.movieId,'inner').where(Ratings_DF['rating']>=4).show()

**2- Find out the average ratings of all movies and sort it in descending order**

In [None]:
Movies_DF.join(Ratings_DF,Movies_DF.movieId==Ratings_DF.movieId,'inner').groupBy(Movies_DF['movieId']).agg(func.avg("rating").alias("rating_average")).sort(func.col('rating_average').desc()).show()

**3-Find out the top 10 users that have rated the maximum number of movies.**

In [None]:
Ratings_DF.groupBy('userId').agg(func.count("movieId").alias("movies_rated_by_user")).sort(func.col('movies_rated_by_user').desc()).show(10)

**4- Find out the top 5 genres with respect to movie counts. (Remember: Genres are concatenated by ‘|’)**

In [None]:
Movies_DF.withColumn("Genres",func.explode(func.split("genres","[|]"))).groupBy('Genres').agg(func.count("movieId").alias("movies")).sort(func.col('movies').desc()).show(5)

**5- Find out the movie id, movie name and all the tags given to that movie by all users. (Hint: Joins, Group by)**

In [None]:
Tags_DF_Grouped=Tags_DF.groupBy(func.col('movieId')).agg(func.concat_ws(",", func.collect_set('tag')).alias("tags"))
Movies_DF.join(Tags_DF_Grouped,Movies_DF.movieId==Tags_DF_Grouped.movieId,'inner').select(Movies_DF.movieId,Movies_DF.title,Tags_DF_Grouped.tags).show()

**6- Find out the top 5 tags and their counts given by the users.**

In [None]:
Tags_DF.groupBy('tag').agg(func.count("userID").alias("tag_counts")).sort(func.col('tag_counts').desc()).show(5)

**7- In Ratings_DF, convert timestamp format to “YYYY-MM-DD HH:MM:SS” by creating a new column named “Tmstmp_Normalized”**

In [None]:
Ratings_DF.withColumn("Tmstmp_Normalized", func.from_unixtime("timestamp","yyyy-MM-dd HH:mm:ss")).show()

**8- Find out the min and max date over this newly created column i.e. “Tmstmp_Normalized”.**

In [None]:
Ratings_DF.withColumn("Tmstmp_Normalized", func.from_unixtime("timestamp","yyyy-MM-dd")).select(func.min("Tmstmp_Normalized").alias('min date'), func.max("Tmstmp_Normalized").alias('max date')).show()

**9- Find out the min and max date of rating for each movie. (Hint: Group by over movies, then min max date)**

In [None]:
Ratings_DF.withColumn("Tmstmp_Normalized", func.from_unixtime("timestamp","yyyy-MM-dd HH:mm:ss")).groupby('movieID').agg(func.min("Tmstmp_Normalized").alias('min date'),func.max("Tmstmp_Normalized").alias('max date')).show()

**10- Find out the top 10 movies that have the maximum rating count given by all users.**

In [None]:
Ratings_DF.groupby(Ratings_DF.movieId).agg(func.count("rating").alias('rating_counts')).sort(func.col('rating_counts').desc()).show(10)

**11-Find out the movies, their average ratings, genres this movie belong to, all tags given by the users, maximum and minimum time of ratings. (Joins, Group By, Sum, Min ,Max)**

In [None]:
Tags_DF_Grouped=Tags_DF.groupBy(func.col('movieId')).agg(func.concat_ws(",", func.collect_set('tag')).alias("tags"))
Movies_DF_First=Movies_DF.join(Tags_DF_Grouped,Movies_DF.movieId==Tags_DF_Grouped.movieId,'inner').select(Movies_DF.movieId,Movies_DF.title,Movies_DF.genres,Tags_DF_Grouped.tags)
Ratings_DF_Grouped=Ratings_DF.withColumn("Tmstmp_Normalized", func.from_unixtime("timestamp","yyyy-MM-dd HH:mm:ss")).groupby('movieId').agg(func.min("Tmstmp_Normalized").alias('min_date'),func.max("Tmstmp_Normalized").alias('max_date'),func.avg("rating").alias('avg_rating'))

Movies_DF_Final=Movies_DF_First.join(Ratings_DF_Grouped,Movies_DF_First.movieId==Ratings_DF_Grouped.movieId,'inner').select(Movies_DF_First.movieId,Movies_DF_First.title,Movies_DF_First.genres,Movies_DF_First.tags,Ratings_DF_Grouped.min_date,Ratings_DF_Grouped.max_date,Ratings_DF_Grouped.avg_rating).show()

# Part B

**Solve the above questions using spark SQL.**

In [None]:
Movies_DF.createOrReplaceTempView("Movies_DF_View")
Ratings_DF.createOrReplaceTempView("Ratings_DF_View")
Tags_DF.createOrReplaceTempView("Tags_DF_View")

# Part A


**Please solve the following questions using spark dataframes.**


1- Find out all the movies that have ratings greater than 4.


In [None]:
df = spark.sql("Select r.*,m.title,m.genres from Ratings_DF_View r inner join Movies_DF_View m on r.movieId=m.movieId where r.rating > 4")
df.show()

**2- Find out the average ratings of all movies and sort it in descending order.**

In [None]:
df = spark.sql("Select r.movieID,AVG(r.rating) as rating_average from Ratings_DF_View r group by r.movieID  order by rating_average desc")
df.show()

**3-Find out the top 10 users that have rated the maximum number of movies.**


Ratings_DF.groupBy('userId').agg(func.count("movieId").alias("movies_rated_by_user")).sort(func.col('movies_rated_by_user').desc()).show(10)


In [None]:
df = spark.sql("Select r.userId,COUNT(r.movieId) as movies_rated_by_user from Ratings_DF_View r group by r.userId  order by movies_rated_by_user desc")
df.show(10)

**4- Find out the top 5 genres with respect to movie counts. (Remember: Genres are concatenated by ‘|’)**

In [None]:
df = spark.sql("SELECT subtable.genres ,count(subtable.movieID) as movies from (SELECT m.movieId,explode(split(m.genres,'[|]')) as genres from Movies_DF_View m) subtable group by subtable.genres order by movies desc limit 5")
df.show()

**5- Find out the movie id, movie name and all the tags given to that movie by all users. (Hint: Joins, Group by)**

In [None]:
DF_tags_temp = spark.sql("Select t.movieId, concat_ws(',', collect_list(t.tag)) as tags from Tags_DF_View t group by t.movieId")
DF_tags_temp.createOrReplaceTempView("DF_tags_temp_view")
df=spark.sql("Select m.movieId,m.title,tv.tags from Movies_DF_View m inner join DF_tags_temp_view tv on m.movieId=tv.movieId")
df.show()

**6- Find out the top 5 tags and their counts given by the users.**

In [None]:

df=spark.sql("Select t.tag,count(t.userID) as tag_counts from Tags_DF_View t group by t.tag order by tag_counts desc limit 5")
df.show()

**7- In Ratings_DF, convert timestamp format to “YYYY-MM-DD HH:MM:SS” by creating a new column named “Tmstmp_Normalized”**

In [None]:
df=spark.sql("Select r.*,from_unixtime(r.timestamp, 'yyyy-MM-dd H:m:s') as Tmstmp_Normalized  from Ratings_DF_View r")
df.show()

**8- Find out the min and max date over this newly created column i.e. “Tmstmp_Normalized”.**

In [None]:
df=spark.sql("Select min(from_unixtime(r.timestamp, 'yyyy-MM-dd')) as min_date , max(from_unixtime(r.timestamp, 'yyyy-MM-dd')) as max_date  from Ratings_DF_View r")
df.show()

**9- Find out the min and max date of rating for each movie. (Hint: Group by over movies, then min max date)**

In [None]:
df=spark.sql("Select r.movieID, min(from_unixtime(r.timestamp, 'yyyy-MM-dd')) as min_date , max(from_unixtime(r.timestamp, 'yyyy-MM-dd')) as max_date  from Ratings_DF_View r group by r.movieID ")
df.show()

**10- Find out the top 10 movies that have the maximum rating count given by all users.**

In [None]:
df=spark.sql("Select r.movieID,count(r.rating) as rating_counts from Ratings_DF_View r  group by r.movieID order by rating_counts desc limit 10 ")
df.show()

**11-Find out the movies, their average ratings, genres this movie belong to, all tags given by the users, maximum and minimum time of ratings. (Joins, Group By, Sum, Min ,Max)**

In [None]:
DF_tags_temp.createOrReplaceTempView("DF_tags_temp_view")
Movies_DF_First=spark.sql("Select m.movieId,m.title,m.genres,tv.tags from Movies_DF_View m inner join DF_tags_temp_view tv on m.movieId=tv.movieId")

Movies_DF_First.createOrReplaceTempView("Movies_DF_First_view")

Ratings_DF_Grouped=df=spark.sql("Select r.movieID, min(from_unixtime(r.timestamp, 'yyyy-MM-dd')) as min_date , max(from_unixtime(r.timestamp, 'yyyy-MM-dd')) as max_date, avg(r.rating) as avg_rating  from Ratings_DF_View r group by r.movieID ")
Ratings_DF_Grouped.createOrReplaceTempView("Ratings_DF_Grouped_view")

Movies_DF_Final=spark.sql("Select m.movieId,m.title,m.genres,m.tags,r.min_date,r.max_date,r.avg_rating from Movies_DF_First_view m inner join Ratings_DF_Grouped_view r on m.movieId=r.movieId").show()