In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

#### 1. Create Spark Session

In [7]:
spark = SparkSession.builder \
    .appName("TV Series Data Processing") \
    .getOrCreate()

#### 2. Read JSON dataset

In [8]:
df = spark.read.option("multiline", "true").json("dataset/tv_series.json" )

print("=== Schema ===")
df.printSchema()

[Stage 0:>                                                          (0 + 1) / 1]

=== Schema ===
root
 |-- __v: long (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- created_by: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: struct (nullable = true)
 |    |    |    |-- $oid: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- first_air_date: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: struct (nullable = true)
 |    |    |    |-- $oid: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_production: boolean (nullable = true)
 |-- last_air_date: string (nullable = true)
 |-- name: string (nullable = true)
 |-- number_of_episodes: long (nullable = true)
 |-- number_of_seasons: long (nullable = true)
 |-- origin_country: array (n

                                                                                

#### 3. Retrieve all created_by names where status = 'Canceled'

In [9]:
canceled_created_by = (
    df.filter(col("status") == "Canceled")
      .select(explode(col("created_by")).alias("creator"))
      .select(col("creator.name").alias("created_by_name"))
      .distinct()
)

print("=== Created by (Canceled series) ===")
canceled_created_by.show(truncate=False)

=== Created by (Canceled series) ===
+---------------------+
|created_by_name      |
+---------------------+
|John Wells           |
|Brian Michael Bendis |
|Gabe Sachs           |
|Sean Catherine Derek |
|Matt Tarses          |
|Oleksandr Rodnyanskyy|
|Adam Sztykiel        |
|Will Gluck           |
|Michael Koman        |
|Bob Stephenson       |
|Shelley Eriksen      |
|Julius Sharpe        |
|Craig Pearce         |
|Delia Fiallo         |
|Steven Gould         |
|Carlos del Hoyo      |
|Dan Cross            |
|Charlie Visnic       |
|Arnold Pinnock       |
|Josué Carrión        |
+---------------------+
only showing top 20 rows


                                                                                

#### 4. Retrieve all origin_country with popularity > 5.0

In [10]:
popular_countries = (
    df.filter(col("popularity") > 5.0)
      .select(explode(col("origin_country")).alias("origin_country"))
      .distinct()
)

print("=== Origin countries with popularity > 5 ===")
popular_countries.show()

=== Origin countries with popularity > 5 ===
+--------------+
|origin_country|
+--------------+
|            LT|
|            MM|
|            FI|
|            UA|
|            RO|
|            NL|
|            LA|
|            PL|
|            MK|
|            MX|
|            XG|
|            CN|
|            AT|
|            RU|
|            IQ|
|            BU|
|            HR|
|            CZ|
|            PT|
|            HK|
+--------------+
only showing top 20 rows


#### 5. Retrieve all series names with number_of_episodes < 100

In [11]:
short_series = (
    df.filter(col("number_of_episodes") < 100)
      .select("name", "number_of_episodes")
)

print("=== Series with less than 100 episodes ===")
short_series.show(truncate=False)

=== Series with less than 100 episodes ===
+---------------------------------+------------------+
|name                             |number_of_episodes|
+---------------------------------+------------------+
|Clerks                           |6                 |
|Shuriken School                  |26                |
|W*A*L*T*E*R                      |0                 |
|Star Wars: Droids                |13                |
|Bratz                            |0                 |
|French Fields                    |18                |
|Dolly                            |22                |
|Absolutely                       |28                |
|Houston Knights                  |31                |
|North of 60                      |95                |
|Angela's Eyes                    |13                |
|The Surgeon                      |8                 |
|The Job                          |19                |
|Pavarchin                        |51                |
|Ever Decreasing Circl

#### 6. Basic aggregation example Count series per status

In [12]:
series_by_status = (
    df.groupBy("status")
      .count()
      .orderBy(col("count").desc())
)

print("=== Number of series per status ===")
series_by_status.show()

=== Number of series per status ===
+----------------+-----+
|          status|count|
+----------------+-----+
|           Ended|88150|
|Returning Series|58274|
|        Canceled| 4037|
|   In Production| 1832|
|         Planned|  459|
|           Pilot|  218|
+----------------+-----+



#### 7. Stop Spark

In [13]:
spark.stop()