In [None]:
pip install pyspark

In [49]:
from pyspark.sql import SparkSession

In [50]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-jupyter-demo") \
    .getOrCreate()

In [51]:
df_lines = spark.read.format("com.databricks.spark.csv").options(header='true') \
.option("delimiter", ",").option("quote", '"') \
.option("escape", '"').option("multiLine", True) \
.load("simpsons/simpsons_script_lines.csv")
df_lines.createOrReplaceTempView("lines_table")

## print lines file schema
df_lines.printSchema()

## print all fields for first 5 rows
spark.sql("SELECT * from lines_table").show(5)

## print the shortest 10 script lines:
spark.sql("""
SELECT int(word_count) AS wc, raw_text 
FROM lines_table 
WHERE TRY_CAST(word_count
AS INT) > 0 
ORDER BY wc 
""").show(10,False)

## count the scenes that took more than 10 minutes
spark.sql("""
SELECT count(*) 
FROM lines_table
WHERE TRY_CAST(timestamp_in_ms AS INT) > (10 * 60 * 1000)
""").show()

root
 |-- id: string (nullable = true)
 |-- episode_id: string (nullable = true)
 |-- number: string (nullable = true)
 |-- raw_text: string (nullable = true)
 |-- timestamp_in_ms: string (nullable = true)
 |-- speaking_line: string (nullable = true)
 |-- character_id: string (nullable = true)
 |-- location_id: string (nullable = true)
 |-- raw_character_text: string (nullable = true)
 |-- raw_location_text: string (nullable = true)
 |-- spoken_words: string (nullable = true)
 |-- normalized_text: string (nullable = true)
 |-- word_count: string (nullable = true)

+----+----------+------+--------------------+---------------+-------------+------------+-----------+--------------------+--------------------+--------------------+--------------------+----------+
|  id|episode_id|number|            raw_text|timestamp_in_ms|speaking_line|character_id|location_id|  raw_character_text|   raw_location_text|        spoken_words|     normalized_text|word_count|
+----+----------+------+-------------

In [52]:
## print the first 50 locations that has the word "Springfield" in them , ignoring letters case.
df_locations = spark.read.format("com.databricks.spark.csv").options(header='true') \
.option("delimiter", ",").option("quote", '"') \
.option("escape", '"').option("multiLine", True) \
.load("simpsons/simpsons_locations.csv")
df_locations.createOrReplaceTempView("locations_table")

spark.sql("""
SELECT DISTINCT name
FROM locations_table
WHERE LOWER(name) LIKE '%springfield%'
LIMIT 50
""").show(50, False)

+---------------------------------------+
|name                                   |
+---------------------------------------+
|Springfield Police Station             |
|Springfield YMCA                       |
|SPRINGFIELD REPUBLICAN PARTY CASTLE    |
|Springfield                            |
|Springfield Pond                       |
|Springfield Mall                       |
|SPRINGFIELD ANIMAL REFUGE              |
|Springfield Community Theater          |
|SPRINGFIELD - THE PAST                 |
|PRIMORDIAL SPRINGFIELD                 |
|SPRINGFIELD HOSPITAL HALLWAY           |
|SPRINGFIELD ELEMENTARY SCHOOL BUS      |
|Springfield Elementary School Band Room|
|Springfield Opry House                 |
|Springfield Swap Meet                  |
|SPRINGFIELD PIER                       |
|Springfield Air and Space Museum       |
|DREAMLIKE SUBURBAN SPRINGFIELD STREET  |
|Springfield Trade Center               |
|SPRINGFIELD MONUMENT PARK              |
|Lake Springfield                 

In [53]:
## print 20 quotes that are located in any place that has Jerusalem in its name.
## Note that Jerusalem may appear in any case and in any part of the location name.
## Use JOIN for this query on another table
spark.sql("""
SELECT lines_t.spoken_words as quotes
FROM lines_table lines_t
JOIN locations_table loc_t
ON lines_t.location_id = loc_t.id
WHERE LOWER(loc_t.name) LIKE '%jerusalem%'
AND lines_t.speaking_line = 'true'
""").show(20, False)

+---------------------------------------------------------------------------------------------------------------------------------------------+
|quotes                                                                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------------+
|I'm bored. Send in my jester.                                                                                                                |
|Hey, hey, King David! Ha, ha, how ya doin'? Now I'm not saying Jezebel's easy, but before she moved to Sodom, it was known for its pottery.  |
|What else you got?                                                                                                                           |
|Ahh... yeah... well, wait a minute, I got somethin' on the Canaanites. They are so stupid.                                             

In [54]:
## print first 20 most used locations with the count of lines spoken in them.
## use GROUP BY for that
## ## write your code here ##
spark.sql("""
SELECT raw_location_text AS locations, COUNT(*) as lines_count
FROM lines_table
WHERE speaking_line = 'true'
AND raw_location_text IS NOT NULL
GROUP BY raw_location_text
ORDER BY lines_count DESC
""").show(20, False)


+-------------------------------+-----------+
|locations                      |lines_count|
+-------------------------------+-----------+
|Simpson Home                   |30158      |
|Springfield Elementary School  |5970       |
|Moe's Tavern                   |3998       |
|Springfield Nuclear Power Plant|3031       |
|Kwik-E-Mart                    |1258       |
|Simpson Living Room            |1172       |
|First Church of Springfield    |1151       |
|Springfield Street             |1032       |
|Simpson Car                    |998        |
|Springfield                    |996        |
|Flanders Home                  |985        |
|Springfield Town Hall          |931        |
|Springfield Retirement Castle  |882        |
|Street                         |862        |
|Burns Manor                    |843        |
|Courtroom                      |713        |
|Springfield Mall               |691        |
|Simpson Kitchen                |687        |
|Bart's Treehouse               |6

In [55]:
## find the seasons in which the average imdb rating was the highest.
## Print the seasons number, the number of episodes in each one and the average rating
## in a descending order from highest average rating to lowest.
## ## write your code here ##
## print lines file schema
df_episodes = spark.read.format("com.databricks.spark.csv").options(header='true') \
.option("delimiter", ",").option("quote", '"') \
.option("escape", '"').option("multiLine", True) \
.load("simpsons/simpsons_episodes.csv")
df_episodes.createOrReplaceTempView("episodes_table")

spark.sql("""
SELECT season, COUNT(*) num_episodes, ROUND(AVG(TRY_CAST(imdb_rating AS FLOAT)), 1) AS avg_rating
FROM episodes_table
WHERE imdb_rating IS NOT NULL
GROUP BY season
ORDER BY avg_rating DESC
""").show(truncate=False)


+------+------------+----------+
|season|num_episodes|avg_rating|
+------+------------+----------+
|7     |25          |8.3       |
|5     |22          |8.3       |
|6     |25          |8.3       |
|4     |22          |8.3       |
|3     |24          |8.2       |
|8     |25          |8.2       |
|2     |22          |8.0       |
|9     |25          |7.8       |
|1     |13          |7.8       |
|10    |23          |7.6       |
|12    |21          |7.4       |
|11    |22          |7.3       |
|13    |22          |7.1       |
|14    |22          |7.1       |
|15    |22          |7.0       |
|16    |21          |7.0       |
|18    |22          |7.0       |
|17    |22          |6.9       |
|19    |20          |6.9       |
|20    |21          |6.9       |
+------+------------+----------+
only showing top 20 rows


In [56]:
# extra question about the data
# print first 20 characters who have spoken the most lines and the average word count
# in a descending order from most lines to fewest
df_characters = spark.read.format("com.databricks.spark.csv").options(header='true') \
.option("delimiter", ",").option("quote", '"') \
.option("escape", '"').option("multiLine", True) \
.load("simpsons/simpsons_characters.csv")
df_characters.createOrReplaceTempView("characters_table")

spark.sql("""
SELECT char_t.name AS character_name, COUNT(*) AS lines_count, ROUND(AVG(TRY_CAST(lines_t.word_count AS INT)), 1) AS avg_words
FROM characters_table char_t
JOIN lines_table lines_t
ON char_t.id = lines_t.character_id
WHERE lines_t.speaking_line = 'true'
GROUP BY char_t.name
ORDER BY lines_count DESC
""").show(20, truncate=False)


+-----------------------+-----------+---------+
|character_name         |lines_count|avg_words|
+-----------------------+-----------+---------+
|Homer Simpson          |28169      |24.2     |
|Marge Simpson          |13291      |95.6     |
|Bart Simpson           |13201      |16.6     |
|Lisa Simpson           |10895      |33.5     |
|C. Montgomery Burns    |3121       |11.7     |
|Moe Szyslak            |2810       |11.7     |
|Seymour Skinner        |2390       |11.8     |
|Ned Flanders           |2057       |11.1     |
|Grampa Simpson         |1875       |10.8     |
|Milhouse Van Houten    |1798       |8.3      |
|Chief Wiggum           |1796       |11.1     |
|Krusty the Clown       |1720       |12.1     |
|Nelson Muntz           |1145       |7.4      |
|Lenny Leonard          |1144       |8.6      |
|Apu Nahasapeemapetilon |988        |11.6     |
|Waylon Smithers        |965        |9.9      |
|Kent Brockman          |905        |18.0     |
|Carl Carlson           |852        |8.4