In [139]:
pip install notebook

Note: you may need to restart the kernel to use updated packages.


In [140]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [141]:
from pyspark.sql import SparkSession

In [142]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-jupyter-demo") \
    .getOrCreate()

In [143]:
# Test the setup
data = [("Kia", "Niro", 2025), ("Toyota", "Rav4", 2019), ("BYD", "Atto3", 2024)]
df = spark.createDataFrame(data, ["Car", "Model", "Year"])
df.show()

+------+-----+----+
|   Car|Model|Year|
+------+-----+----+
|   Kia| Niro|2025|
|Toyota| Rav4|2019|
|   BYD|Atto3|2024|
+------+-----+----+



In [144]:
df_lines = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .option("delimiter", ",").option("quote", '"') \
 .option("escape", '"').option("multiLine", True) \
 .load("/Users/stavhecht/pyspark/simpsons/simpsons_script_lines.csv")
df_lines.createOrReplaceTempView("lines_table")
## print lines file schema
df_lines.printSchema()
## print all fields for first 5 rows
spark.sql("SELECT * from lines_table").show(5)
## print the shortest 10 script lines:
spark.sql("""SELECT int(word_count) AS wc, raw_text FROM lines_table WHERE TRY_CAST(word_count
AS INT) > 0 ORDER BY wc """).show(10,False)
## count the scenes that took more than 10 minutes
spark.sql("""SELECT count(*) FROM lines_table WHERE TRY_CAST(timestamp_in_ms AS INT) > (10 * 60
* 1000)""").show()


root
 |-- id: string (nullable = true)
 |-- episode_id: string (nullable = true)
 |-- number: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- timestamp_in_ms: string (nullable = true)
 |-- speaking_line: string (nullable = true)
 |-- character_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- raw_character_text: string (nullable = true)
 |-- raw_location_text: string (nullable = true)
 |-- spoken_words: string (nullable = true)
 |-- normalized_text: string (nullable = true)
 |-- word_count: string (nullable = true)

+----+----------+------+--------------------+---------------+-------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|  id|episode_id|number|            raw_text|timestamp_in_ms|speaking_line|character_id|location_id|  raw_character_text|   raw_location_text|        spoken_words|     normalized_text|word_count|
+----+----------+------+-------------

In [145]:
## print the first 50 locations that has the word "Springfield" in them , ignoring letters case.
## ## write your code here ## Q1

spark.sql("""SELECT DISTINCT raw_location_text AS springfield_locations
FROM lines_table
WHERE raw_location_text LIKE '%Springfield%'""").show(50, False)

+-------------------------------------------+
|springfield_locations                      |
+-------------------------------------------+
|Springfield Police Station                 |
|Springfield YMCA                           |
|Springfield                                |
|Springfield Pond                           |
|Springfield Mall                           |
|Springfield Community Theater              |
|Springfield Elementary School Band Room    |
|Springfield Opry House                     |
|Springfield Swap Meet                      |
|Springfield Air and Space Museum           |
|Springfield Trade Center                   |
|Lake Springfield                           |
|Springfield Coast Highway                  |
|Springfield Wall of Fame                   |
|Springfield Shopper                        |
|Springfield Prison                         |
|Springfield Buddhist Temple                |
|Springfield - Homer's Boyhood              |
|Springfield Public Beach         

In [146]:
## print 20 quotes that are located in any place that has Jerusalem in its name.
## Note that Jerusalem may appear in any case and in any part of the location name.
## Use JOIN for this query on another table
## ## write your code here ## Q2
df_locations = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .option("delimiter", ",").option("quote", '"') \
 .option("escape", '"').option("multiLine", True) \
 .load("/Users/stavhecht/pyspark/simpsons/simpsons_locations.csv")
df_locations.createOrReplaceTempView("locations_table")
df_locations.printSchema()


spark.sql("""
    SELECT lines_table.raw_text AS quetes
    FROM lines_table
    JOIN locations_table
    ON lines_table.location_id = locations_table.id
    WHERE LOWER(locations_table.name) LIKE '%jerusalem%'
""").show(20, False)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- normalized_name: string (nullable = true)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|quetes                                                                                                                                                                                    |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(JERUSALEM ROYAL PALACE: ext. jerusalem royal palace - plaza - day)                                                                                                                       |
|Bart Simpson: I'm bored. Send in my jester.                                                                             

In [147]:
## print first 20 most used locations with the count of lines spoken in them.
## use GROUP BY for that
## ## write your code here ## Q3

spark.sql("""SELECT raw_location_text AS locations, COUNT(*) speaking_line
             FROM lines_table
             WHERE raw_location_text IS NOT NULL
             GROUP BY raw_location_text
             ORDER BY speaking_line DESC
             """).show(20, False)

+-------------------------------+-------------+
|locations                      |speaking_line|
+-------------------------------+-------------+
|Simpson Home                   |35059        |
|Springfield Elementary School  |7092         |
|Moe's Tavern                   |4628         |
|Springfield Nuclear Power Plant|3594         |
|Kwik-E-Mart                    |1476         |
|First Church of Springfield    |1416         |
|Simpson Living Room            |1378         |
|Springfield Street             |1334         |
|Springfield                    |1314         |
|Simpson Car                    |1239         |
|Flanders Home                  |1166         |
|Street                         |1124         |
|Springfield Town Hall          |1103         |
|Springfield Retirement Castle  |1049         |
|Burns Manor                    |998          |
|Springfield Mall               |833          |
|Simpson Kitchen                |816          |
|Courtroom                      |813    

In [148]:
## find the seasons in which the average imdb rating was the highest.
## Print the seasons number, the number of episodes in each one and the average rating
## in a descending order from highest average rating to lowest.
## ## write your code here ## Q4

df_episodes = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .option("delimiter", ",").option("quote", '"') \
 .option("escape", '"').option("multiLine", True) \
 .load("/Users/stavhecht/pyspark/simpsons/simpsons_episodes.csv")
df_episodes.createOrReplaceTempView("episodes_table")
df_episodes.printSchema()

spark.sql("""SELECT season, COUNT(*) AS num_episodes,
             ROUND(AVG(CAST(imdb_rating AS FLOAT)),2) AS avg_rating
             FROM episodes_table
             WHERE imdb_rating IS NOT NULL
             GROUP BY season
             ORDER BY avg_rating DESC""").show()

root
 |-- id: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- imdb_rating: string (nullable = true)
 |-- imdb_votes: string (nullable = true)
 |-- number_in_season: string (nullable = true)
 |-- number_in_series: string (nullable = true)
 |-- original_air_date: string (nullable = true)
 |-- original_air_year: string (nullable = true)
 |-- production_code: string (nullable = true)
 |-- season: string (nullable = true)
 |-- title: string (nullable = true)
 |-- us_viewers_in_millions: string (nullable = true)
 |-- video_url: string (nullable = true)
 |-- views: string (nullable = true)

+------+------------+----------+
|season|num_episodes|avg_rating|
+------+------------+----------+
|     5|          22|      8.34|
|     7|          25|      8.32|
|     6|          25|      8.31|
|     4|          22|      8.27|
|     8|          25|      8.22|
|     3|          24|      8.15|
|     2|          22|      8.04|
|     9|          25|      7.84|
|     1|          13|  

In [149]:
#my query : print all the places the simpsons visit in Jerusalem

spark.sql("""
    SELECT DISTINCT lines_table.raw_location_text AS location
    FROM lines_table
    JOIN locations_table
      ON lines_table.location_id = locations_table.id
    WHERE LOWER(locations_table.name) LIKE '%jerusalem%'
    ORDER BY location
""").show(50, False)


+------------------------+
|location                |
+------------------------+
|JERUSALEM ROYAL PALACE  |
|Jerusalem               |
|STREETS OF OLD JERUSALEM|
+------------------------+

