In [1]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("simpsons3") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/02 23:34:51 WARN Utils: Your hostname, DESKTOP-LOSA1DI, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/02 23:34:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/02 23:34:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_lines = spark.read.format("com.databricks.spark.csv").options(header='true') \
 .option("delimiter", ",").option("quote", '"') \
 .option("escape", '"').option("multiLine", True) \
 .load("simpsons/simpsons_script_lines.csv")
df_lines.createOrReplaceTempView("lines_table")
## print lines file schema
df_lines.printSchema()


                                                                                

root
 |-- id: string (nullable = true)
 |-- episode_id: string (nullable = true)
 |-- number: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- timestamp_in_ms: string (nullable = true)
 |-- speaking_line: string (nullable = true)
 |-- character_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- raw_character_text: string (nullable = true)
 |-- raw_location_text: string (nullable = true)
 |-- spoken_words: string (nullable = true)
 |-- normalized_text: string (nullable = true)
 |-- word_count: string (nullable = true)



In [3]:
df_locations = spark.read.option("header","true").csv("simpsons/simpsons_locations.csv")
df_locations.createOrReplaceTempView("locations_table")

                                                                                

In [4]:
df_episodes = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("simpsons/simpsons_episodes.csv")

df_episodes.createOrReplaceTempView("episodes_table")
df_episodes.printSchema()


root
 |-- id: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- imdb_rating: double (nullable = true)
 |-- imdb_votes: double (nullable = true)
 |-- number_in_season: integer (nullable = true)
 |-- number_in_series: integer (nullable = true)
 |-- original_air_date: date (nullable = true)
 |-- original_air_year: integer (nullable = true)
 |-- production_code: string (nullable = true)
 |-- season: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- us_viewers_in_millions: double (nullable = true)
 |-- video_url: string (nullable = true)
 |-- views: double (nullable = true)



In [5]:
## print all fields for first 5 rows
spark.sql("SELECT * from lines_table").show(5)


+----+----------+------+--------------------+---------------+-------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|  id|episode_id|number|            raw_text|timestamp_in_ms|speaking_line|character_id|location_id|  raw_character_text|   raw_location_text|        spoken_words|     normalized_text|word_count|
+----+----------+------+--------------------+---------------+-------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|9549|        32|   209|Miss Hoover: No, ...|         848000|         true|         464|          3|         Miss Hoover|Springfield Eleme...|No, actually, it ...|no actually it wa...|        31|
|9550|        32|   210|Lisa Simpson: (NE...|         856000|         true|           9|          3|        Lisa Simpson|Springfield Eleme...|Where's Mr. Bergs...| wheres mr bergstrom|         3|
|9551|        32|   

In [6]:
## print the shortest 10 script lines:
spark.sql("SELECT int(word_count) AS wc, raw_text FROM lines_table WHERE TRY_CAST(word_count AS INT) > 0 ORDER BY wc ").show(10,False)

[Stage 5:>                                                          (0 + 1) / 1]

+---+---------------------------------+
|wc |raw_text                         |
+---+---------------------------------+
|1  |Homer Simpson: Oh.               |
|1  |Homer Simpson: And?              |
|1  |Bart Simpson: Lewis?             |
|1  |Homer Simpson: (SHOCKED) Me?     |
|1  |Wendell Borton: Yayyyyyyyyyyyyyy!|
|1  |Lisa Simpson: Baboon!            |
|1  |Lisa Simpson: Yeah.              |
|1  |Lisa Simpson: No!                |
|1  |Homer Simpson: Oh.               |
|1  |Homer Simpson: Nuts.             |
+---+---------------------------------+
only showing top 10 rows


                                                                                

In [7]:
## count the scenes that took more than 10 minutes
spark.sql("SELECT count(*) FROM lines_table WHERE TRY_CAST(timestamp_in_ms AS INT) > (10 * 60 * 1000)").show()

[Stage 6:>                                                          (0 + 1) / 1]

+--------+
|count(1)|
+--------+
|   86678|
+--------+



                                                                                

In [8]:
## print the first 50 locations that has the word "Springfield" in them , ignoring letters case.

spark.sql("""
    SELECT DISTINCT raw_location_text 
    FROM lines_table 
    WHERE lower(raw_location_text) LIKE '%springfield%' 
    LIMIT 50
""").show(50, False)

[Stage 9:>                                                          (0 + 1) / 1]

+---------------------------------------+
|raw_location_text                      |
+---------------------------------------+
|Springfield Police Station             |
|Springfield YMCA                       |
|SPRINGFIELD REPUBLICAN PARTY CASTLE    |
|Springfield                            |
|Springfield Pond                       |
|Springfield Mall                       |
|SPRINGFIELD ANIMAL REFUGE              |
|Springfield Community Theater          |
|SPRINGFIELD - THE PAST                 |
|PRIMORDIAL SPRINGFIELD                 |
|SPRINGFIELD HOSPITAL HALLWAY           |
|SPRINGFIELD ELEMENTARY SCHOOL BUS      |
|Springfield Elementary School Band Room|
|Springfield Opry House                 |
|Springfield Swap Meet                  |
|SPRINGFIELD PIER                       |
|Springfield Air and Space Museum       |
|Springfield Trade Center               |
|DREAMLIKE SUBURBAN SPRINGFIELD STREET  |
|SPRINGFIELD MONUMENT PARK              |
|Lake Springfield                 

                                                                                

In [9]:
## print 20 quotes that are located in any place that has Jerusalem in its name.
## Note that Jerusalem may appear in any case and in any part of the location name.
## Use JOIN for this query on another table

df_locations = spark.read.option("header","true").csv("simpsons/simpsons_locations.csv")
df_locations.createOrReplaceTempView("locations_table")

spark.sql("""
    SELECT l.raw_text 
    FROM lines_table l 
    JOIN locations_table loc ON l.location_id = loc.id 
    WHERE lower(loc.name) LIKE '%jerusalem%' 
    LIMIT 20
""").show(20, False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|raw_text                                                                                                                                                                                  |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(JERUSALEM ROYAL PALACE: ext. jerusalem royal palace - plaza - day)                                                                                                                       |
|Bart Simpson: I'm bored. Send in my jester.                                                                                                                                               |
|Krusty the Clown: Hey, hey, King David! (GOES INTO ROU

In [10]:
## print first 20 most used locations with the count of lines spoken in them.
## use GROUP BY for that

spark.sql("""
    SELECT loc.name, COUNT(*) AS lines_count 
    FROM lines_table l 
    JOIN locations_table loc ON l.location_id = loc.id 
    GROUP BY loc.name 
    ORDER BY lines_count DESC 
    LIMIT 20
""").show(20, False)

[Stage 16:>                                                         (0 + 1) / 1]

+-------------------------------+-----------+
|name                           |lines_count|
+-------------------------------+-----------+
|Simpson Home                   |35059      |
|Springfield Elementary School  |7092       |
|Moe's Tavern                   |4628       |
|Springfield Nuclear Power Plant|3594       |
|Kwik-E-Mart                    |1476       |
|First Church of Springfield    |1416       |
|Simpson Living Room            |1378       |
|Springfield Street             |1334       |
|Springfield                    |1314       |
|Simpson Car                    |1239       |
|Flanders Home                  |1166       |
|Street                         |1124       |
|Springfield Town Hall          |1103       |
|Springfield Retirement Castle  |1049       |
|Burns Manor                    |998        |
|Springfield Mall               |833        |
|Simpson Kitchen                |816        |
|Courtroom                      |813        |
|Bart's Treehouse               |7

                                                                                

In [11]:
## find the seasons in which the average imdb rating was the highest.
## Print the seasons number, the number of episodes in each one and the average rating
## in a descending order from highest average rating to lowest.

spark.sql("""
    SELECT season, COUNT(*) AS episodes_count, AVG(imdb_rating) AS avg_rating 
    FROM episodes_table 
    WHERE imdb_rating IS NOT NULL 
    GROUP BY season 
    ORDER BY avg_rating DESC
""").show(50, False)

+------+--------------+------------------+
|season|episodes_count|avg_rating        |
+------+--------------+------------------+
|5     |22            |8.336363636363636 |
|7     |25            |8.324             |
|6     |25            |8.312             |
|4     |22            |8.268181818181818 |
|8     |25            |8.219999999999999 |
|3     |24            |8.154166666666665 |
|2     |22            |8.04090909090909  |
|9     |25            |7.8439999999999985|
|1     |13            |7.807692307692307 |
|10    |23            |7.569565217391306 |
|12    |21            |7.361904761904761 |
|11    |22            |7.290909090909093 |
|13    |22            |7.140909090909091 |
|14    |22            |7.077272727272727 |
|15    |22            |7.045454545454546 |
|16    |21            |7.042857142857143 |
|18    |22            |7.0               |
|19    |20            |6.935             |
|20    |21            |6.895238095238096 |
|17    |22            |6.863636363636362 |
|25    |22 

In [12]:
## Additional Question:
## Who are the top 10 characters with the most lines in Moe's Tavern?

spark.sql("""
    SELECT raw_character_text, COUNT(*) AS lines_count 
    FROM lines_table 
    WHERE raw_location_text = "Moe's Tavern"
    GROUP BY raw_character_text 
    ORDER BY lines_count DESC 
    LIMIT 10
""").show()

[Stage 22:>                                                         (0 + 1) / 1]

+------------------+-----------+
|raw_character_text|lines_count|
+------------------+-----------+
|       Moe Szyslak|       1201|
|     Homer Simpson|       1025|
|              NULL|        450|
|     Lenny Leonard|        252|
|     Barney Gumble|        228|
|      Carl Carlson|        201|
|     Marge Simpson|        125|
|      Lisa Simpson|         58|
|      Bart Simpson|         57|
|      Chief Wiggum|         49|
+------------------+-----------+



                                                                                