In [2]:
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

# Define the file paths
movies_path = '/content/drive/MyDrive/Dataset/Movie Analytics/movies.dat'
ratings_path = '/content/drive/MyDrive/Dataset/Movie Analytics/ratings.dat'
users_path = '/content/drive/MyDrive/Dataset/Movie Analytics/users.dat'

# Load the data with the specified encoding
movies = pd.read_csv(movies_path, delimiter='::', engine='python', header=None, names=['MovieID', 'Title', 'Genres'], encoding='ISO-8859-1')
ratings = pd.read_csv(ratings_path, delimiter='::', engine='python', header=None, names=['UserID', 'MovieID', 'Rating', 'Timestamp'], encoding='ISO-8859-1')
users = pd.read_csv(users_path, delimiter='::', engine='python', header=None, names=['UserID', 'Gender', 'Age', 'Occupation', 'Zip-code'], encoding='ISO-8859-1')

# Display the first few rows of each dataframe
print("Movies Data:")
print(movies.head())

print("\nRatings Data:")
print(ratings.head())

print("\nUsers Data:")
print(users.head())


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Movies Data:
   MovieID                               Title                        Genres
0        1                    Toy Story (1995)   Animation|Children's|Comedy
1        2                      Jumanji (1995)  Adventure|Children's|Fantasy
2        3             Grumpier Old Men (1995)                Comedy|Romance
3        4            Waiting to Exhale (1995)                  Comedy|Drama
4        5  Father of the Bride Part II (1995)                        Comedy

Ratings Data:
   UserID  MovieID  Rating  Timestamp
0       1     1193       5  978300760
1       1      661       3  978302109
2       1      914       3  978301968
3       1     3408       4  978300275
4       1     2355       5  978824291

Users Data:
   UserID Gender  Age  Occupation Zip-code
0       1      F    1          10    48067
1       2      M   56          16    70072
2       3  

#**Install PySpark**

In [3]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=9eb863abe0de89374b89a1af95b4602a2dc04dd2732595b27ef49edf2a6200c9
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


**Set Up Spark Session**

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MovieLensAnalysis") \
    .getOrCreate()


**Convert Pandas DataFrames to Spark DataFrames**

In [5]:

movies_spark_df = spark.createDataFrame(movies)
ratings_spark_df = spark.createDataFrame(ratings)
users_spark_df = spark.createDataFrame(users)


**Register DataFrames as temporary views for Spark SQL**

In [6]:
movies_spark_df.createOrReplaceTempView("movies")
ratings_spark_df.createOrReplaceTempView("ratings")
users_spark_df.createOrReplaceTempView("users")

# **Analytical Queries:**

1. Top 10 Most Viewed Movies

In [7]:
top_viewed_movies = spark.sql("""
    SELECT m.Title, COUNT(r.MovieID) as view_count
    FROM ratings r
    JOIN movies m ON r.MovieID = m.MovieID
    GROUP BY m.Title
    ORDER BY view_count DESC
    LIMIT 10
""")
top_viewed_movies.show()


+--------------------+----------+
|               Title|view_count|
+--------------------+----------+
|American Beauty (...|      3428|
|Star Wars: Episod...|      2991|
|Star Wars: Episod...|      2990|
|Star Wars: Episod...|      2883|
|Jurassic Park (1993)|      2672|
|Saving Private Ry...|      2653|
|Terminator 2: Jud...|      2649|
|  Matrix, The (1999)|      2590|
|Back to the Futur...|      2583|
|Silence of the La...|      2578|
+--------------------+----------+



2. Distinct List of Genres

In [8]:
import pyspark.sql.functions as F

# Ensure genres are properly split and exploded
movies_spark_df = movies_spark_df.withColumn("Genre", F.explode(F.split(F.col("Genres"), "\\|")))

# Register the updated DataFrame as a temporary view
movies_spark_df.createOrReplaceTempView("movies")

# Extract distinct genres
distinct_genres = spark.sql("""
    SELECT DISTINCT Genre
    FROM movies
""")

# Show distinct genres
distinct_genres.show(truncate=False)



+-----------+
|Genre      |
+-----------+
|Crime      |
|Romance    |
|Thriller   |
|Adventure  |
|Children's |
|Drama      |
|War        |
|Documentary|
|Fantasy    |
|Mystery    |
|Musical    |
|Animation  |
|Film-Noir  |
|Horror     |
|Western    |
|Comedy     |
|Action     |
|Sci-Fi     |
+-----------+



3. Number of Movies for Each Genre

In [9]:
movies_per_rating = spark.sql("""
    SELECT Rating, COUNT(*) as count
    FROM ratings
    GROUP BY Rating
    ORDER BY count DESC
""")
movies_per_rating.show()


+------+------+
|Rating| count|
+------+------+
|     4|348971|
|     3|261197|
|     5|226310|
|     2|107557|
|     1| 56174|
+------+------+



4. Number of Movies Starting with Numbers or Letters

In [10]:
users_per_movie = spark.sql("""
    SELECT MovieID, COUNT(DISTINCT UserID) as num_users
    FROM ratings
    GROUP BY MovieID
    ORDER BY num_users DESC
""")
users_per_movie.show()


+-------+---------+
|MovieID|num_users|
+-------+---------+
|   2858|     3428|
|    260|     2991|
|   1196|     2990|
|   1210|     2883|
|    480|     2672|
|   2028|     2653|
|    589|     2649|
|   2571|     2590|
|   1270|     2583|
|    593|     2578|
|   1580|     2538|
|   1198|     2514|
|    608|     2513|
|   2762|     2459|
|    110|     2443|
|   2396|     2369|
|   1197|     2318|
|    527|     2304|
|   1617|     2288|
|   1265|     2278|
+-------+---------+
only showing top 20 rows



5. List of Latest Released Movies

In [11]:
total_rating_per_movie = spark.sql("""
    SELECT MovieID, SUM(Rating) as total_rating
    FROM ratings
    GROUP BY MovieID
    ORDER BY total_rating DESC
""")
total_rating_per_movie.show()


+-------+------------+
|MovieID|total_rating|
+-------+------------+
|   2858|       14800|
|    260|       13321|
|   1196|       12836|
|   1210|       11598|
|   2028|       11507|
|   1198|       11257|
|    593|       11219|
|   2571|       11178|
|   2762|       10835|
|    589|       10751|
|    608|       10692|
|    527|       10392|
|    110|       10346|
|   1270|       10307|
|    318|       10143|
|    858|       10059|
|    480|       10057|
|   1197|        9976|
|   2396|        9778|
|   1617|        9654|
+-------+------------+
only showing top 20 rows



# **Spark SQL**

1. Create tables for movies.dat, users.dat, and ratings.dat in Spark SQL

In [18]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("MovieLensAnalysis").getOrCreate()

# Load Data
movies_path = "/content/drive/MyDrive/Dataset/Movie Analytics/movies.dat"
ratings_path = "/content/drive/MyDrive/Dataset/Movie Analytics/ratings.dat"
users_path = "/content/drive/MyDrive/Dataset/Movie Analytics/users.dat"

# Load Movies Data
movies = spark.read.csv(movies_path, sep='::', header=False, inferSchema=True).toDF("MovieID", "Title", "Genres")
movies.createOrReplaceTempView("movies")

# Load Ratings Data
ratings = spark.read.csv(ratings_path, sep='::', header=False, inferSchema=True).toDF("UserID", "MovieID", "Rating", "Timestamp")
ratings.createOrReplaceTempView("ratings")

# Load Users Data
users = spark.read.csv(users_path, sep='::', header=False, inferSchema=True).toDF("UserID", "Gender", "Age", "Occupation", "Zip-code")
users.createOrReplaceTempView("users")

# Save tables in parquet format (optional)
movies.write.parquet("data/movies_table.parquet")
ratings.write.parquet("data/ratings_table.parquet")
users.write.parquet("data/users_table.parquet")


2. Find the list of the oldest released movies

In [22]:
from pyspark.sql.functions import regexp_extract

# Extract Year from the Title and create a new DataFrame
movies_with_year = movies_spark_df.withColumn("Year", regexp_extract("Title", r"\((\d{4})\)", 1))

# Register the new DataFrame with Year as a temporary SQL table
movies_with_year.createOrReplaceTempView("movies_with_year")

oldest_movies = spark.sql("""
    SELECT Title, Year
    FROM movies_with_year
    WHERE Year IS NOT NULL AND Year != ''
    ORDER BY Year ASC
    LIMIT 10
""")
oldest_movies.show()


+--------------------+----+
|               Title|Year|
+--------------------+----+
|Daddy Long Legs (...|1919|
|Male and Female (...|1919|
|Male and Female (...|1919|
|Spiders, The (Die...|1919|
|Spiders, The (Die...|1919|
| Saphead, The (1920)|1920|
|Dog's Life, A (1920)|1920|
|     Kid, The (1921)|1921|
|Nosferatu (Nosfer...|1922|
|Tess of the Storm...|1922|
+--------------------+----+



3. How many movies are released each year?

In [25]:
from pyspark.sql.functions import regexp_extract

# Extract Year from Title and store it in a new DataFrame
movies_with_year = movies_spark_df.withColumn("Year", regexp_extract("Title", r"\((\d{4})\)", 1))

# Register this new DataFrame as a table
movies_with_year.createOrReplaceTempView("movies_with_year")

movies_per_year = spark.sql("""
    SELECT Year, COUNT(*) as MovieCount
    FROM movies_with_year
    WHERE Year IS NOT NULL AND Year != ''
    GROUP BY Year
    ORDER BY Year ASC
""")
movies_per_year.show()


+----+----------+
|Year|MovieCount|
+----+----------+
|1919|         5|
|1920|         2|
|1921|         1|
|1922|         2|
|1923|         3|
|1925|         7|
|1926|         9|
|1927|         9|
|1928|         3|
|1929|         3|
|1930|        11|
|1931|        11|
|1932|        11|
|1933|        13|
|1934|         9|
|1935|         8|
|1936|         8|
|1937|        21|
|1938|        11|
|1939|        21|
+----+----------+
only showing top 20 rows



4. How many movies are there for each rating?

In [27]:
movies_per_rating = spark.sql("""
    SELECT Rating, COUNT(*) as MovieCount
    FROM ratings
    GROUP BY Rating
    ORDER BY MovieCount DESC
""")
movies_per_rating.show()


+------+----------+
|Rating|MovieCount|
+------+----------+
|     4|    348971|
|     3|    261197|
|     5|    226310|
|     2|    107557|
|     1|     56174|
+------+----------+



5. How many users have rated each movie?

In [28]:
users_per_movie = spark.sql("""
    SELECT MovieID, COUNT(DISTINCT UserID) as UserCount
    FROM ratings
    GROUP BY MovieID
    ORDER BY UserCount DESC
""")
users_per_movie.show()


+-------+---------+
|MovieID|UserCount|
+-------+---------+
|   2858|     3428|
|    260|     2991|
|   1196|     2990|
|   1210|     2883|
|    480|     2672|
|   2028|     2653|
|    589|     2649|
|   2571|     2590|
|   1270|     2583|
|    593|     2578|
|   1580|     2538|
|   1198|     2514|
|    608|     2513|
|   2762|     2459|
|    110|     2443|
|   2396|     2369|
|   1197|     2318|
|    527|     2304|
|   1617|     2288|
|   1265|     2278|
+-------+---------+
only showing top 20 rows



6. What is the total rating for each movie?

In [29]:
total_ratings_per_movie = spark.sql("""
    SELECT r.MovieID, m.Title, SUM(r.Rating) as TotalRating
    FROM ratings r
    JOIN movies m ON r.MovieID = m.MovieID
    GROUP BY r.MovieID, m.Title
    ORDER BY TotalRating DESC
""")
total_ratings_per_movie.show()


+-------+--------------------+-----------+
|MovieID|               Title|TotalRating|
+-------+--------------------+-----------+
|   2858|American Beauty (...|      14800|
|    260|Star Wars: Episod...|      13321|
|   1196|Star Wars: Episod...|      12836|
|   1210|Star Wars: Episod...|      11598|
|   2028|Saving Private Ry...|      11507|
|   1198|Raiders of the Lo...|      11257|
|    593|Silence of the La...|      11219|
|   2571|  Matrix, The (1999)|      11178|
|   2762|Sixth Sense, The ...|      10835|
|    589|Terminator 2: Jud...|      10751|
|    608|        Fargo (1996)|      10692|
|    527|Schindler's List ...|      10392|
|    110|   Braveheart (1995)|      10346|
|   1270|Back to the Futur...|      10307|
|    318|Shawshank Redempt...|      10143|
|    858|Godfather, The (1...|      10059|
|    480|Jurassic Park (1993)|      10057|
|   1197|Princess Bride, T...|       9976|
|   2396|Shakespeare in Lo...|       9778|
|   1617|L.A. Confidential...|       9654|
+-------+--

7. What is the average rating for each movie?

In [30]:
average_ratings_per_movie = spark.sql("""
    SELECT r.MovieID, m.Title, AVG(r.Rating) as AvgRating
    FROM ratings r
    JOIN movies m ON r.MovieID = m.MovieID
    GROUP BY r.MovieID, m.Title
    ORDER BY AvgRating DESC
""")
average_ratings_per_movie.show()


+-------+--------------------+-----------------+
|MovieID|               Title|        AvgRating|
+-------+--------------------+-----------------+
|   3280|    Baby, The (1973)|              5.0|
|    989|Schlafes Bruder (...|              5.0|
|   1830|Follow the Bitch ...|              5.0|
|   3881|Bittersweet Motel...|              5.0|
|    787|Gate of Heavenly ...|              5.0|
|   3233|Smashing Time (1967)|              5.0|
|   3172|Ulysses (Ulisse) ...|              5.0|
|   3607|One Little Indian...|              5.0|
|   3656|        Lured (1947)|              5.0|
|   3382|Song of Freedom (...|              5.0|
|   3245|I Am Cuba (Soy Cu...|              4.8|
|     53|     Lamerica (1994)|             4.75|
|   2503|Apple, The (Sib) ...|4.666666666666667|
|   2905|      Sanjuro (1962)|4.608695652173913|
|   2019|Seven Samurai (Th...|4.560509554140127|
|    318|Shawshank Redempt...|4.554557700942973|
|    858|Godfather, The (1...|4.524966261808367|
|    745|Close Shave

# **Spark Data Frames**

1. Prepare Movies Data: Extracting the Year and Genre from the Text

In [31]:
movies_df = spark.read.csv(movies_path, sep='::', header=False, inferSchema=True)\
    .toDF('MovieID', 'Title', 'Genres')

# Extract year from title
movies_df = movies_df.withColumn('Year', F.regexp_extract(F.col('Title'), r'\((\d{4})\)$', 1).cast('int'))

# Split genres into an array
movies_df = movies_df.withColumn('Genres', F.split(F.col('Genres'), '\\|'))

movies_df.show(truncate=False)

+-------+-------------------------------------+--------------------------------+----+
|MovieID|Title                                |Genres                          |Year|
+-------+-------------------------------------+--------------------------------+----+
|1      |Toy Story (1995)                     |[Animation, Children's, Comedy] |1995|
|2      |Jumanji (1995)                       |[Adventure, Children's, Fantasy]|1995|
|3      |Grumpier Old Men (1995)              |[Comedy, Romance]               |1995|
|4      |Waiting to Exhale (1995)             |[Comedy, Drama]                 |1995|
|5      |Father of the Bride Part II (1995)   |[Comedy]                        |1995|
|6      |Heat (1995)                          |[Action, Crime, Thriller]       |1995|
|7      |Sabrina (1995)                       |[Comedy, Romance]               |1995|
|8      |Tom and Huck (1995)                  |[Adventure, Children's]         |1995|
|9      |Sudden Death (1995)                  |[Action

2. Prepare Users Data: Loading a Double Delimited CSV File

In [32]:
users_df = spark.read.csv(users_path, sep='::', header=False, inferSchema=True)\
    .toDF('UserID', 'Gender', 'Age', 'Occupation', 'Zip-code')

users_df.show(truncate=False)


+------+------+---+----------+--------+
|UserID|Gender|Age|Occupation|Zip-code|
+------+------+---+----------+--------+
|1     |F     |1  |10        |48067   |
|2     |M     |56 |16        |70072   |
|3     |M     |25 |15        |55117   |
|4     |M     |45 |7         |02460   |
|5     |M     |25 |20        |55455   |
|6     |F     |50 |9         |55117   |
|7     |M     |35 |1         |06810   |
|8     |M     |25 |12        |11413   |
|9     |M     |25 |17        |61614   |
|10    |F     |35 |1         |95370   |
|11    |F     |25 |1         |04093   |
|12    |M     |25 |12        |32793   |
|13    |M     |45 |1         |93304   |
|14    |M     |35 |0         |60126   |
|15    |M     |25 |7         |22903   |
|16    |F     |35 |0         |20670   |
|17    |M     |50 |1         |95350   |
|18    |F     |18 |3         |95825   |
|19    |M     |1  |10        |48073   |
|20    |M     |25 |14        |55113   |
+------+------+---+----------+--------+
only showing top 20 rows



3. Prepare Ratings Data: Programmatically Specifying a Schema for the Data Frame

In [33]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

# Define schema
schema = StructType([
    StructField("UserID", IntegerType(), True),
    StructField("MovieID", IntegerType(), True),
    StructField("Rating", IntegerType(), True),
    StructField("Timestamp", LongType(), True)
])

# Load ratings data
ratings_df = spark.read.csv(ratings_path, sep='::', header=False, schema=schema)

ratings_df.show(truncate=False)


+------+-------+------+---------+
|UserID|MovieID|Rating|Timestamp|
+------+-------+------+---------+
|1     |1193   |5     |978300760|
|1     |661    |3     |978302109|
|1     |914    |3     |978301968|
|1     |3408   |4     |978300275|
|1     |2355   |5     |978824291|
|1     |1197   |3     |978302268|
|1     |1287   |5     |978302039|
|1     |2804   |5     |978300719|
|1     |594    |4     |978302268|
|1     |919    |4     |978301368|
|1     |595    |5     |978824268|
|1     |938    |4     |978301752|
|1     |2398   |4     |978302281|
|1     |2918   |4     |978302124|
|1     |1035   |5     |978301753|
|1     |2791   |4     |978302188|
|1     |2687   |3     |978824268|
|1     |2018   |4     |978301777|
|1     |3105   |5     |978301713|
|1     |2797   |4     |978302039|
+------+-------+------+---------+
only showing top 20 rows



5. Save Table without Defining DDL in Hive

In [34]:
# Assuming Hive is set up and configured
movies_df.write.saveAsTable("movies")


6. Broadcast Variable Example

In [35]:
from pyspark.sql import Row

# Create a broadcast variable
movie_titles = {row['MovieID']: row['Title'] for row in movies_df.collect()}
movie_titles_broadcast = spark.sparkContext.broadcast(movie_titles)

# Example of using the broadcast variable in a UDF
def get_movie_title(movie_id):
    return movie_titles_broadcast.value.get(movie_id, 'Unknown')

from pyspark.sql.functions import udf
get_movie_title_udf = udf(get_movie_title)

# Apply the UDF to add a new column with movie titles
ratings_with_titles_df = ratings_df.withColumn('Title', get_movie_title_udf(F.col('MovieID')))
ratings_with_titles_df.show()


+------+-------+------+---------+--------------------+
|UserID|MovieID|Rating|Timestamp|               Title|
+------+-------+------+---------+--------------------+
|     1|   1193|     5|978300760|One Flew Over the...|
|     1|    661|     3|978302109|James and the Gia...|
|     1|    914|     3|978301968| My Fair Lady (1964)|
|     1|   3408|     4|978300275|Erin Brockovich (...|
|     1|   2355|     5|978824291|Bug's Life, A (1998)|
|     1|   1197|     3|978302268|Princess Bride, T...|
|     1|   1287|     5|978302039|      Ben-Hur (1959)|
|     1|   2804|     5|978300719|Christmas Story, ...|
|     1|    594|     4|978302268|Snow White and th...|
|     1|    919|     4|978301368|Wizard of Oz, The...|
|     1|    595|     5|978824268|Beauty and the Be...|
|     1|    938|     4|978301752|         Gigi (1958)|
|     1|   2398|     4|978302281|Miracle on 34th S...|
|     1|   2918|     4|978302124|Ferris Bueller's ...|
|     1|   1035|     5|978301753|Sound of Music, T...|
|     1|  

7. Accumulator Example

In [36]:
# Example accumulator
accumulator = spark.sparkContext.accumulator(0)

def increment_accumulator(rating):
    global accumulator
    if rating > 4:
        accumulator += 1

# Apply the function to each row
ratings_df.foreach(lambda row: increment_accumulator(row['Rating']))

print(f"Number of high ratings: {accumulator.value}")


Number of high ratings: 226310
