In [1]:
!pip install pyspark==3.5.0 findspark

Collecting pyspark==3.5.0
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=86c22e3f5f6b071d555cacc0c5a0d337e16a541124c4af20756b01815b38538e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: findspark, pyspark
Successfully installed findspark-2.0.1 pyspark-3.5.0


In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Day14").getOrCreate()

In [5]:
movies_df = spark.read.csv("sample_data/movies_metadata.csv", header = True, inferSchema = True)

In [7]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

ratings_schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", DoubleType(), True),
    StructField("timestamp", IntegerType(), True)  # Changed to TimestampType
])


In [8]:
ratings_df = spark.read.csv("sample_data/ratings.csv", header=True, schema = ratings_schema)
ratings_df.show(5)


+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
+------+-------+------+----------+
only showing top 5 rows



In [73]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [77]:
movies_and_ratings = movies_df.join(ratings_df, movies_df['id'] == ratings_df['movieId'], 'inner')


In [81]:
from pyspark.sql.functions import desc

best_rated_by_genre = movies_and_ratings.groupBy("genres").avg("rating").orderBy(desc("avg(rating)"))

best_rated_by_genre.show()

+--------------------+------------------+
|              genres|       avg(rating)|
+--------------------+------------------+
|[{'id': 12, 'name...|               4.5|
|[{'id': 10749, 'n...|               4.5|
|[{'id': 99, 'name...|              4.25|
|[{'id': 53, 'name...|              4.25|
|[{'id': 28, 'name...|              4.25|
|[{'id': 10749, 'n...| 4.213932576813326|
|[{'id': 18, 'name...|4.2131118634504165|
|[{'id': 18, 'name...| 4.199924604171902|
|[{'id': 14, 'name...|4.1806621880998085|
|[{'id': 18, 'name...| 4.166666666666667|
|[{'id': 18, 'name...| 4.152347687892122|
|[{'id': 16, 'name...| 4.150013762730525|
|[{'id': 18, 'name...| 4.135950817381585|
|[{'id': 10770, 'n...|4.1306878306878305|
|[{'id': 28, 'name...| 4.130216013452335|
|[{'id': 18, 'name...| 4.114169927333706|
|[{'id': 9648, 'na...|  4.11226428884666|
|[{'id': 18, 'name...| 4.109830097087379|
|[{'id': 28, 'name...| 4.104114395999649|
|[{'id': 28, 'name...| 4.086393827822901|
+--------------------+------------

In [13]:
from pyspark.sql.functions import to_timestamp, date_format, year, month
import pyspark.sql.functions as F

from pyspark.sql.functions import from_unixtime, col

# Convert the timestamp column to timestamp data type.
ratings_df = ratings_df.withColumn("timestamp", to_timestamp(F.col("timestamp"), 'yyyy-MM-dd HH:mm:ss'))

# Extract year and month from the timestamp
ratings_df = ratings_df.withColumn("year", year(F.col("timestamp"))).withColumn("month", month(F.col("timestamp")))

# Total ratings per year
ratings_by_year = ratings_df.groupBy("year").count().orderBy("year")
ratings_by_year.show()

+----+-----+
|year|count|
+----+-----+
|1996|23147|
|1997| 9170|
|1998| 3700|
|1999|13457|
|2000|23399|
|2001|13582|
|2002|11849|
|2003|13309|
|2004|14835|
|2005|19152|
|2006|15971|
|2007|22723|
|2008|14243|
|2009|14097|
|2010|15200|
|2011|11311|
|2012| 9242|
|2013| 6811|
|2014| 5868|
|2015|19147|
+----+-----+
only showing top 20 rows

