# Kafka Project - Data Pre-Processing and Transformation with Pyspark

In [1]:
import json
import requests

response = requests.get("https://api.tvmaze.com/shows")
print(response.status_code)

# print(response.json())

200


## Pre-processing in Spark

In [1]:
# create spark session and build spark application
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('tv_shows_api_app').getOrCreate()

22/11/18 16:57:22 WARN Utils: Your hostname, pallavi-xps resolves to a loopback address: 127.0.1.1; using 192.168.1.85 instead (on interface wlp2s0)
22/11/18 16:57:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/18 16:57:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# read the api's json file into spark dataframe and view the schema

shows_df = spark.read\
    .format('json')\
    .option('inferSchema', 'true')\
    .option('multiLine', 'true')\
    .load('data/tvshows_data.json')

shows_df.printSchema()

root
 |-- _links: struct (nullable = true)
 |    |-- nextepisode: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |    |-- previousepisode: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |    |-- self: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |-- averageRuntime: long (nullable = true)
 |-- dvdCountry: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- timezone: string (nullable = true)
 |-- ended: string (nullable = true)
 |-- externals: struct (nullable = true)
 |    |-- imdb: string (nullable = true)
 |    |-- thetvdb: long (nullable = true)
 |    |-- tvrage: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- image: struct (nullable = true)
 |    |-- medium: string (nullable = true)
 |    |-- original: string (nullable = true)
 |-- language: string

### The data is nested with 3 levels of nesting. So, the nesting must be removed. Arrays will be de-nested by using explode() and struct data types will be cleaned by using getItem() functions. Columns will be type-casted to suitable data types and renamed where needed.

In [4]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, DateType, TimestampType
from pyspark.sql.functions import col,struct,when

In [5]:
# Struct Type columns are schedule, rating, network, webChannel, dvdCountry, externals, image, and links
# columns webChannel and dvdCountry contain all "null" data, So these columns shall be dropped

shows_df = shows_df.drop("webChannel").drop("dvdCountry")

# dropping column 'images'
shows_df = shows_df.drop("image")

# runtime and averageRuntime is similar for most of the rows, so averageRuntime shall be dropped
shows_df = shows_df.drop("averageRuntime")

In [6]:
# column: 'externals' (struct) --> new columns: 'imdb', 'tvrage', 'thetvdb'

shows_df = shows_df\
            .withColumn('imdb_id', F.col('externals').getItem('imdb'))\
            .withColumn('tvrage_id', F.col('externals').getItem('tvrage').cast('int'))\
            .withColumn('thetvdb_id', F.col('externals').getItem('thetvdb').cast('int'))

In [7]:
# column: 'network' (struct) --> new columns: 'id', 'name', 'country', 'officialSite'

shows_df = shows_df\
    .withColumn('network_id', F.col('network').getItem('id').cast('int'))\
    .withColumn('network_name', F.col('network').getItem('name'))\
    .withColumn('network_country', F.col('network').getItem('country'))\
    .withColumn('network_official_site', F.col('network').getItem('officialSite'))

In [8]:
# newly made column 'network_country' is also StructType
# network_country --> 'network_country_name', 'network_country_code', 'network_country_timezone'

shows_df = shows_df\
    .withColumn('network_country_name', F.col('network_country').getItem('name'))\
    .withColumn('network_country_code', F.col('network_country').getItem('code'))\
    .withColumn('network_country_timezone', F.col('network_country').getItem('timezone'))

In [9]:
# column 'schedule' (struct) --> 'airing_days', 'airing_time'

shows_df = shows_df\
    .withColumn('airing_time', F.col('schedule').getItem('time'))\
    .withColumn('airing_days', F.col('schedule').getItem('days'))

In [10]:
# column 'airing_days' (array) --> explode() --> strings

shows_df = shows_df.withColumn('airing_days', F.explode('airing_days'))

In [11]:
# column 'rating' (struct) --> 'average_rating' (renamed as 'rating')

shows_df = shows_df\
    .withColumn('rating', F.col('rating').getItem('average').cast('float'))

In [12]:
# column '_links' (struct) --> 'next_episode_link', 'prev_episode_link', 'current_episode_link'

shows_df = shows_df\
    .withColumn('current_episode_link', F.col('_links').getItem('self'))\
    .withColumn('next_episode_link', F.col('_links').getItem('nextepisode'))\
    .withColumn('prev_episode_link', F.col('_links').getItem('previousepisode'))

In [13]:
# Each column is of struct type, containing only one column 'href'

shows_df = shows_df\
    .withColumn('current_episode_link', F.col('current_episode_link').getItem('href'))\
    .withColumn('next_episode_link', F.col('next_episode_link').getItem('href'))\
    .withColumn('prev_episode_link', F.col('prev_episode_link').getItem('href'))

In [14]:
# column 'genres' (array) --> explode() --> strings

shows_df = shows_df.withColumn('genre', F.explode('genres'))

In [15]:
# type-casting and renaming columns:

shows_df = shows_df.withColumn('premiere_date', col('premiered').cast(DateType()))\
    .withColumn('end_date', col('ended').cast(DateType()))\
    .withColumn('airing_time', col('airing_time').cast(TimestampType()))\
    .withColumn('id', col('id').cast(IntegerType()))\
    .withColumn('runtime', col('runtime').cast(IntegerType()))\
    .withColumn('weight', col('weight').cast(IntegerType()))

In [16]:
# keeping only cleaned columns in the dataframe

shows_df = shows_df\
    .select('id','url', 'name', 'type', 'language', 'genre', 'status', 'runtime', 'premiere_date',\
            'end_date', 'officialSite', 'airing_days', 'airing_time', 'rating', 'weight', 'network_id',\
            'network_name', 'network_country_name', 'network_country_code','network_country_timezone',\
            'network_official_site', 'imdb_id', 'tvrage_id', 'thetvdb_id', 'updated', 'summary',\
            'current_episode_link', 'next_episode_link', 'prev_episode_link'
            )

In [17]:
shows_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- url: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- language: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- status: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- premiere_date: date (nullable = true)
 |-- end_date: date (nullable = true)
 |-- officialSite: string (nullable = true)
 |-- airing_days: string (nullable = true)
 |-- airing_time: timestamp (nullable = true)
 |-- rating: float (nullable = true)
 |-- weight: integer (nullable = true)
 |-- network_id: integer (nullable = true)
 |-- network_name: string (nullable = true)
 |-- network_country_name: string (nullable = true)
 |-- network_country_code: string (nullable = true)
 |-- network_country_timezone: string (nullable = true)
 |-- network_official_site: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- tvrage_id: integer (nullable = true)
 |-- thetvdb_id: integer (n

In [18]:
shows_df.show(3)

22/11/18 16:58:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---+--------------------+--------------+--------+--------+---------------+------+-------+-------------+----------+--------------------+-----------+-------------------+------+------+----------+------------+--------------------+--------------------+------------------------+---------------------+---------+---------+----------+----------+--------------------+--------------------+-----------------+--------------------+
| id|                 url|          name|    type|language|          genre|status|runtime|premiere_date|  end_date|        officialSite|airing_days|        airing_time|rating|weight|network_id|network_name|network_country_name|network_country_code|network_country_timezone|network_official_site|  imdb_id|tvrage_id|thetvdb_id|   updated|             summary|current_episode_link|next_episode_link|   p

In [19]:
shows_df.count()

582

# Data Transformation in Spark

### Tasks:
1. Count the number of TV shows of each genre and sort from highest to lowest.
2. List the tv shows by type (Scripted/Reality/Animation, etc.) and status (running or ended), then give the number of shows in each list
3. Find the average, maximum, and minimum weight of shows grouped by network name
4. Highest rated TV show(s) of each country along with rating

## 1. Count the number of TV shows of each genre and sort from highest to lowest.

In [20]:
# groupby, aggregation, and sorting

from pyspark.sql.functions import desc

genre_shows_count = shows_df\
        .groupBy('genre')\
        .count()\
        .orderBy(desc('count'))

genre_shows_count.show()

+---------------+-----+
|          genre|count|
+---------------+-----+
|          Drama|  151|
|         Comedy|   73|
|          Crime|   54|
|         Action|   53|
|       Thriller|   38|
|Science-Fiction|   38|
|        Romance|   32|
|      Adventure|   24|
|         Horror|   21|
|         Family|   19|
|   Supernatural|   18|
|        Mystery|   13|
|        Fantasy|   12|
|        Medical|    6|
|          Legal|    6|
|          Music|    5|
|          Anime|    4|
|            War|    4|
|        History|    4|
|        Western|    3|
+---------------+-----+
only showing top 20 rows



## 2. List the tv shows by type (Scripted/Reality/Animation, etc.) and status (running or ended), then give the number of shows in each list

In [21]:
# window function, UDF, ordering

from pyspark.sql.functions import collect_set, col
from pyspark.sql import Window as W  

# using window function
window_spec_2 = W.partitionBy('type', 'status')

# making list of tv shows by type and status
shows_list = shows_df\
                    .withColumn('TV Shows List', collect_set('name').over(window_spec_2))\
                    .select(col('type').alias('Type'), col('status').alias('Status'), col('TV Shows List'))\
                    .distinct()


# creating udf to count number of elements in a list
list_len_udf = F.udf(lambda x: len(x), IntegerType())

# adding number of shows column
shows_list_count = shows_list\
                .withColumn('Number of Shows', list_len_udf(col('TV Shows List')))\
                .orderBy(desc(col('Number of Shows')))


shows_list_count.show()


[Stage 11:>                                                         (0 + 1) / 1]

+-----------+-------+--------------------+---------------+
|       Type| Status|       TV Shows List|Number of Shows|
+-----------+-------+--------------------+---------------+
|   Scripted|  Ended|[Parks and Recrea...|            191|
|   Scripted|Running|[Fargo, True Dete...|             16|
|  Animation|  Ended|[Star Wars: Rebel...|              7|
|    Reality|Running|[The Amazing Race...|              6|
|  Animation|Running|[South Park, Amer...|              6|
|  Talk Show|  Ended|[The Daily Show w...|              3|
|Documentary|  Ended|       [Long Shadow]|              1|
+-----------+-------+--------------------+---------------+




                                                                                

## 3. Find the average, maximum, and minimum weight of shows grouped by network name

In [22]:
# summary statistics

from pyspark.sql.functions import avg, min, max

show_weight_by_network = shows_df\
    .groupBy(col("network_name").alias("Network Name")).agg(avg("weight").alias("Average Weight"),\
                                                            max("weight").alias("Maximum Weight"),\
                                                            min("weight").alias("Minimum Weight"))\
    .orderBy(desc("Average Weight"))\
    .dropna().distinct()

show_weight_by_network.show()

+--------------+-----------------+--------------+--------------+
|  Network Name|   Average Weight|Maximum Weight|Minimum Weight|
+--------------+-----------------+--------------+--------------+
|           NBC|89.80952380952381|           100|            49|
|           RTL|             88.0|            88|            88|
|      Showtime|91.73529411764706|            98|            48|
|           CMT|             89.0|            89|            89|
|           ITV|             69.0|            69|            69|
|   BBC America|             93.0|            93|            93|
|       Cinemax|             95.6|            96|            95|
|El Rey Network|             98.0|            98|            98|
|Comedy Central|86.11111111111111|            99|            78|
|   WGN America|             93.0|            93|            93|
|     Disney XD|             98.0|            98|            98|
|      Lifetime|            83.75|            91|            65|
|          Syfy|93.333333

## 4. Highest rated TV show(s) of each country along with rating.

In [23]:
# window function (denserank), groupBy, aggregation, collect_set

window_spec_5 = W.partitionBy('network_country_name').orderBy(desc('rating'))

ranked_df = shows_df.withColumn('rank', F.dense_rank().over(window_spec_5)).filter('rank == 1')

max_rating_df = ranked_df.select(col('network_country_name').alias('Country'),\
                                 col('name'),\
                                 col('rating').alias('Rating')).dropna().distinct()


from pyspark.sql.functions import collect_set

highest_rated_shows = max_rating_df\
    .groupBy(col('Country'), col('Rating'))\
    .agg(collect_set(col('name')).alias('Highest Rated Show(s)'))

highest_rated_shows.show()

+--------------+------+---------------------+
|       Country|Rating|Highest Rated Show(s)|
+--------------+------+---------------------+
|        Canada|   8.6| [Vikings, Orphan ...|
|        France|   7.9|     [Crossing Lines]|
|       Germany|   7.2| [Transporter: The...|
|         Japan|   8.8|         [Death Note]|
|United Kingdom|   8.5|             [Utopia]|
| United States|   9.2|       [Breaking Bad]|
+--------------+------+---------------------+



## Write each transformed dataframe to json file

In [24]:
genre_shows_count.toPandas().to_json('output_json/q1_genre_shows_count.json', orient='records')
shows_list_count.toPandas().to_json('output_json/q2_shows_list_count.json', orient='records')
show_weight_by_network.toPandas().to_json('output_json/q3_shows_weight.json', orient='records')
highest_rated_shows.toPandas().to_json('output_json/q4_highest_rated_shows.json', orient='records')

### Viewing the schema of each dataframe to create schema in schemas.py file

In [25]:
genre_shows_count.printSchema()

root
 |-- genre: string (nullable = true)
 |-- count: long (nullable = false)



In [26]:
shows_list_count.printSchema()

root
 |-- Type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- TV Shows List: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- Number of Shows: integer (nullable = true)



In [27]:
show_weight_by_network.printSchema()

root
 |-- Network Name: string (nullable = true)
 |-- Average Weight: double (nullable = true)
 |-- Maximum Weight: integer (nullable = true)
 |-- Minimum Weight: integer (nullable = true)



In [28]:
highest_rated_shows.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Rating: float (nullable = true)
 |-- Highest Rated Show(s): array (nullable = false)
 |    |-- element: string (containsNull = false)

