# Imports and Drive Input Path

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F


[33m0% [Working][0m            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
[33m0% [Waiting for headers] [Connected to cloud.r-project.org (52.85.151.8)] [Conn[0m                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
                                                                               Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
                                                                               Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://p

In [2]:
from google.colab import drive

drive.mount('/content/drive')

input_path = "/content/drive/MyDrive/student_files/data/TA_restaurants_curated_cleaned.csv"


Mounted at /content/drive


# Part 1

In [3]:
import sys
from pyspark import SparkContext, SparkConf

# sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.
def show_schema(input_path):
  try:
    conf = SparkConf().setAppName("Part 1")
    sc = SparkContext(conf=conf)

    spark = SparkSession(sc)

    df = spark.read.option("header", True).csv(input_path)
    df.show()
    df.printSchema()
  finally:
    sc.stop()

show_schema(input_path)

+---+--------------------+---------+--------------------+-------+------+-----------+-----------------+--------------------+--------------------+---------+
|_c0|                Name|     City|       Cuisine Style|Ranking|Rating|Price Range|Number of Reviews|             Reviews|              URL_TA|    ID_TA|
+---+--------------------+---------+--------------------+-------+------+-----------+-----------------+--------------------+--------------------+---------+
|  0|Martine of Martin...|Amsterdam|[ 'French', 'Dutc...|    1.0|   5.0|   $$ - $$$|            136.0|[ [ 'Just like ho...|/Restaurant_Revie...|d11752080|
|  1| De Silveren Spiegel|Amsterdam|[ 'Dutch', 'Europ...|    2.0|   4.5|       $$$$|            812.0|[ [ 'Great food a...|/Restaurant_Revie...|  d693419|
|  2|             La Rive|Amsterdam|[ 'Mediterranean'...|    3.0|   4.5|       $$$$|            567.0|[ [ 'Satisfaction...|/Restaurant_Revie...|  d696959|
|  3|            Vinkeles|Amsterdam|[ 'French', 'Euro...|    4.0|   5.

QUESTION 1

In [4]:
import sys
from pyspark import SparkContext, SparkConf

def cleanup_csv(input_path):
  try:
    conf = SparkConf().setAppName("Part 1 Question 1")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    df = spark.read.option("header", True).csv(input_path)
    print(f"Original DataFrame count: {df.count()}")

    # Show the count of "Reviews" groups and their frequencies, then sort by count in descending order
    df.groupBy(F.col("Reviews")).agg(F.count("Reviews").alias("count_Reviews")).sort(F.desc("count_Reviews")).show()

    df_filtered = df.filter(
          (df['Rating'].cast('float') >= 1.0) &
          # (df['Reviews'] != "[ [ ], [ ] ]") & # do we consider this as empty or not empty?
          (df['Reviews'].isNotNull())
      )
    print(f"Filtered DataFrame count: {df_filtered.count()}")
    df_filtered.show()

  finally:
    sc.stop()

cleanup_csv(input_path)

Original DataFrame count: 85562
+--------------------+-------------+
|             Reviews|count_Reviews|
+--------------------+-------------+
|      [ [  ], [  ] ]|         9679|
|[ [ 'Great Asian ...|            2|
|[ [ 'Sad unfriend...|            2|
|[ [ 'Fresh, tasty...|            2|
|[ [ 'Exceptional ...|            2|
|[ [ 'Excellent fo...|            2|
|[ [ 'Very average...|            2|
|[ [ 'Excellent' ]...|            2|
|[ [ 'Great Tapas ...|            2|
|[ [ 'Improve your...|            2|
|[ [ 'Amazing food...|            2|
|[ [ 'SPAIN Tour',...|            2|
|[ [ 'Superb', 'Ni...|            2|
|[ [ 'Enjoyable re...|            2|
|[ [ 'Great Dining...|            2|
|[ [ 'Nice restaur...|            2|
|[ [ 'good quality...|            2|
|[ [ 'Nice' ], [ '...|            2|
|[ [ 'Great experi...|            2|
|[ [ 'Cheep, cheer...|            2|
+--------------------+-------------+
only showing top 20 rows

Filtered DataFrame count: 85514
+---+-----------------

QUESTION 2

In [5]:
import sys
from pyspark import SparkContext, SparkConf

def worst_and_best_restaurant(input_path):

  try:

    conf = SparkConf().setAppName("Part 1 Question 2")
    sc = SparkContext(conf=conf)

    spark = SparkSession(sc)

    df = spark.read.option("header", True).csv(input_path) # Replace with hdfs input_path

    df = df.filter(df['Price Range'].isNotNull())
    df = df.withColumn("Rating", df["Rating"].cast('float'))

    # best_df = df.groupBy("City", "Price Range").agg(F.max("Rating").alias("Best_Rating"))
    # worst_df = df.groupBy("City", "Price Range").agg(F.min("Rating").alias("Worst_Rating"))
    best_df = (
      df
      .groupBy("City", "Price Range")
      .agg(F.max("Rating"))
      .withColumn("Rating", F.col("max(Rating)"))
      .orderBy("City")  # Sort by 'City' in ascending order
    )

    worst_df = (
        df
        .groupBy("City", "Price Range")
        .agg(F.min("Rating"))
        .withColumn("Rating", F.col("min(Rating)"))
        .orderBy("City")  # Sort by 'City' in ascending order
    )
    # print("Best restaurants....")
    # best_df.show()
    # print("Worst restaurants....")
    # worst_df.show()

    union_df = best_df.union(worst_df)
    # print("Union df...")
    # union_df.show()

    combined_df = union_df.join(df, on=["City", "Price Range", "Rating"], how="inner")
    combined_df = (
    combined_df.dropDuplicates(["Price Range", "City", "Rating"])
    .select(
        "_c0",
        "Name",
        "City",
        "Cuisine Style",
        "Ranking",
        "Rating",
        "Price Range",
        "Number of Reviews",
        "Reviews",
        "URL_TA",
        "ID_TA",
    )
    .sort(F.col("City").asc(), F.col("Price Range").asc(), F.col("Rating").desc())
  )

    combined_df.show()

  finally:
    sc.stop()



worst_and_best_restaurant(input_path)


+----+--------------------+---------+--------------------+-------+------+-----------+-----------------+--------------------+--------------------+---------+
| _c0|                Name|     City|       Cuisine Style|Ranking|Rating|Price Range|Number of Reviews|             Reviews|              URL_TA|    ID_TA|
+----+--------------------+---------+--------------------+-------+------+-----------+-----------------+--------------------+--------------------+---------+
| 163|          Sir Hummus|Amsterdam|[ 'Healthy', 'Mid...|  164.0|   5.0|          $|            126.0|[ [ 'Great servic...|/Restaurant_Revie...| d7607446|
|2932|     Grillroom Sabba|Amsterdam|[ 'Middle Eastern' ]| 2942.0|   2.5|          $|             12.0|[ [ 'This is a gr...|/Restaurant_Revie...| d6464568|
|   0|Martine of Martin...|Amsterdam|[ 'French', 'Dutc...|    1.0|   5.0|   $$ - $$$|            136.0|[ [ 'Just like ho...|/Restaurant_Revie...|d11752080|
|3239|       Reggae Rita's|Amsterdam|[ 'Caribbean', 'J...|   NUL

QUESTION 3

In [6]:
import sys
from pyspark import SparkContext, SparkConf

def average_rating(input_path):
  try:
    conf = SparkConf().setAppName("Part 1 Question 3")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    df = spark.read.option("header", True).csv(input_path)
    df = df.withColumn("Rating", df["Rating"].cast('float'))
    avg_df = (
        df
        .groupBy("City")
        .agg(F.avg("Rating").alias("AverageRating"))
        .orderBy("City")  # Sort by 'City' in ascending order
    )
    # avg_df.show()

    top3_df = (
        avg_df
        .orderBy(F.desc("AverageRating"))
        .limit(3)

    )
    # top3_df.show()
    top3_df = top3_df.limit(3).withColumn("RatingGroup", F.lit("Top"))

    bot3_df = (
        avg_df
        .orderBy("AverageRating")
        .limit(3)

    )
    # bot3_df.show()
    bot3_df = bot3_df.limit(3).withColumn("RatingGroup", F.lit("Bottom"))

    union_df = top3_df.union(bot3_df)
    union_df.show()

  finally:
    sc.stop()

average_rating(input_path)

+---------+------------------+-----------+
|     City|     AverageRating|RatingGroup|
+---------+------------------+-----------+
|   Athens| 4.241316931982634|        Top|
|     Rome|  4.19908330011957|        Top|
|   Oporto| 4.164118705035971|        Top|
|   Madrid|3.8445586975149957|     Bottom|
|    Milan| 3.846076458752515|     Bottom|
|Stockholm|3.8947530864197533|     Bottom|
+---------+------------------+-----------+



QUESTION 4

In [None]:
import sys
from pyspark import SparkContext, SparkConf

def count_restaurant(input_path):
  try:
    conf = SparkConf().setAppName("Part 1 Question 4")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    df = spark.read.option("header", True).csv(input_path)


    df = df.withColumn("Cuisine Style", F.regexp_replace("Cuisine Style", "^\\[|\\]$", ""))
    df = df.withColumn("Cuisine Style", F.split(F.col("Cuisine Style"), ",\s*"))


    df_exploded = df.withColumn("Cuisine", F.explode("Cuisine Style"))
    df_exploded = df_exploded.withColumn("Cuisine", F.regexp_replace("Cuisine", "'", ""))
    df_exploded = df_exploded.withColumn("Cuisine", F.trim("Cuisine"))


    result_df = (
        df_exploded.groupBy("City", "Cuisine")
        .count()
        .orderBy("City", F.col("count").desc())
    )


    result_df = result_df.select(
        F.col("City").alias("City"),
        F.col("Cuisine").alias("Cuisine"),
        F.col("count")
    )

    result_df.show()

  finally:
    sc.stop()

count_restaurant(input_path)

+---------+-------------------+-----+
|     City|            Cuisine|count|
+---------+-------------------+-----+
|Amsterdam|           European| 1418|
|Amsterdam|Vegetarian Friendly| 1307|
|Amsterdam|              Dutch|  805|
|Amsterdam|      Vegan Options|  524|
|Amsterdam|Gluten Free Options|  497|
|Amsterdam|                Bar|  497|
|Amsterdam|            Italian|  364|
|Amsterdam|              Asian|  352|
|Amsterdam|      International|  345|
|Amsterdam|      Mediterranean|  337|
|Amsterdam|               Cafe|  317|
|Amsterdam|                Pub|  292|
|Amsterdam|             French|  204|
|Amsterdam|              Pizza|  182|
|Amsterdam|           American|  142|
|Amsterdam|            Seafood|  125|
|Amsterdam|          Fast Food|  117|
|Amsterdam|           Japanese|  113|
|Amsterdam|         Steakhouse|  112|
|Amsterdam|            Chinese|   92|
+---------+-------------------+-----+
only showing top 20 rows



# PART 2

In [None]:
input_path2 = "/content/drive/MyDrive/student_files/data/tmdb_5000_credits.parquet"

In [None]:
def show_schema_parquet(input_path):
  try:
    conf = SparkConf().setAppName("Part 1")
    sc = SparkContext(conf=conf)

    spark = SparkSession(sc)

    df = spark.read.option("header", True).parquet(input_path)
    df.show()
    df.printSchema()
  finally:
    sc.stop()

show_schema_parquet(input_path2)

+--------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|
+--------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{"cast_id": 242,...|[{"credit_id": "5...|
|     285|Pirates of the Ca...|[{"cast_id": 4, "...|[{"credit_id": "5...|
|  206647|             Spectre|[{"cast_id": 1, "...|[{"credit_id": "5...|
|   49026|The Dark Knight R...|[{"cast_id": 2, "...|[{"credit_id": "5...|
|   49529|         John Carter|[{"cast_id": 5, "...|[{"credit_id": "5...|
|     559|        Spider-Man 3|[{"cast_id": 30, ...|[{"credit_id": "5...|
|   38757|             Tangled|[{"cast_id": 34, ...|[{"credit_id": "5...|
|   99861|Avengers: Age of ...|[{"cast_id": 76, ...|[{"credit_id": "5...|
|     767|Harry Potter and ...|[{"cast_id": 3, "...|[{"credit_id": "5...|
|  209112|Batman v Superman...|[{"cast_id": 18, ...|[{"credit_id": "5...|
|    1452|    Superman Returns|[{"cast

QUESTION 5

In [None]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
from pyspark.sql.functions import from_json, col, explode, array, array_sort, count
import itertools

input_path = "hdfs://%s:9000/assignment2/part1/input/" % (hdfs_nn)

def find_pairs(input_path):
  try:
    conf = SparkConf().setAppName("Part 1 Question 4")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)

    json_schema = ArrayType(StructType([
        StructField("name", StringType(), nullable=False)
    ]))

    df = spark.read.option("header", True).parquet(input_path)

    df = df.drop("crew")
    actor1_df = df.withColumn("actor1", F.explode(F.from_json(F.col("cast"), json_schema)))
    actor2_df = df.withColumn("actor2", F.explode(F.from_json(F.col("cast"), json_schema)))
    actor1_df = actor1_df.select("movie_id", "title", F.col("actor1.name").alias("actor1"))
    actor2_df = actor2_df.select("movie_id", "title", F.col("actor2.name").alias("actor2"))
    # actor1_df.show()
    # actor2_df.show()

    paired_actors_df = actor1_df.alias("df1") \
    .join(
        actor2_df.alias("df2"),
        (col("df1.movie_id") == col("df2.movie_id")) &
        (col("df1.actor1") != col("df2.actor2"))
    ) \
    .select(
        col("df1.movie_id"),
        col("df1.title"),
        col("df1.actor1").alias("actor1"),
        col("df2.actor2").alias("actor2")
    ) \
    .distinct() \
    .orderBy("movie_id", "actor1", "actor2")

    # paired_actors_df.show()
    df_counts = paired_actors_df.groupBy("actor1", "actor2") \
    .agg(count("*").alias("pair_count")) \
    .filter(col("pair_count") >= 2) \
    .orderBy("actor1", "actor2")

    valid_pairs = df_counts.select("actor1", "actor2")

    # Join back to get all occurrences of these pairs along with movie details
    final_df = paired_actors_df.join(valid_pairs, ["actor1", "actor2"], "inner")
    final_df = final_df.select("movie_id", "title", "actor1", "actor2")
    final_df.show()

  finally:
    sc.stop()

find_pairs(input_path2)

+--------+--------------------+--------------+--------------------+
|movie_id|               title|        actor1|              actor2|
+--------+--------------------+--------------+--------------------+
|  241254|          The Prince|       50 Cent|         John Cusack|
|  199373|   The Frozen Ground|       50 Cent|         John Cusack|
|   22821|The Boondock Sain...|A. Frank Ruffo|           Joe Parro|
|    2976|           Hairspray|A. Frank Ruffo|           Joe Parro|
|  114150|       Pitch Perfect| Aakomon Jones|          C.J. Perry|
|  254470|     Pitch Perfect 2| Aakomon Jones|          C.J. Perry|
|  114150|       Pitch Perfect| Aakomon Jones|      Donald Watkins|
|  239566|           Get on Up| Aakomon Jones|      Donald Watkins|
|  254470|     Pitch Perfect 2| Aakomon Jones|     Elizabeth Banks|
|  114150|       Pitch Perfect| Aakomon Jones|     Elizabeth Banks|
|  114150|       Pitch Perfect| Aakomon Jones|John Benjamin Hickey|
|  239566|           Get on Up| Aakomon Jones|Jo