In [1]:
#Importing libraries
import random
import os
import pandas as pd

In [2]:
!ls youtube-data/youtube-csv-data/

CAvideos.csv FRvideos.csv INvideos.csv KRvideos.csv RUvideos.csv
DEvideos.csv GBvideos.csv JPvideos.csv MXvideos.csv USvideos.csv


We will create a handy function that will be used to get the country abbreviations from ```csv``` files:

In [3]:
def only_upper(s):
    """
        Obtaining country abbreviation from a filename.
    """
    upper_chars = ""
    for char in s:
        if char.isupper():
            upper_chars += char
    return upper_chars

In [4]:
#Getting the country abbreviations
COUNTRY_ABBREVIATIONS = [only_upper(country) for country in os.listdir("./youtube-data/youtube-csv-data/")]

In [5]:
COUNTRY_ABBREVIATIONS

['MX', 'IN', 'DE', 'JP', 'KR', 'CA', 'RU', 'FR', 'US', 'GB']

In [6]:
#Now we are going to manually write the names of them...
COUNTRY_NAMES = ['Mexico', 'India', 'Germany', 'Japan', 'South Korea', 'Canada', 'Russia', 'France', 'United States', 'Great Britain']

In [7]:
#... and define a dictionary that maps abbreviation with the country name
COUNTRIES_DICT = dict(zip(COUNTRY_ABBREVIATIONS, COUNTRY_NAMES))

Let's sample one of the ```csv``` files at random by creating a function that returns a name of a random ```csv``` file:

In [8]:
def random_csv():
    list_of_csv_files = os.listdir("./youtube-data/youtube-csv-data/")
    
    #We want to be sure that the csv files are 
    weights = [1/len(list_of_csv_files) for element in list_of_csv_files]
    random_csv_file_name = random.choices(population = list_of_csv_files, weights = weights)[0]
    return (random_csv_file_name, only_upper(random_csv_file_name))

In [9]:
RANDOM_COUNTRY = random_csv()

In [13]:
print(f"The first country we are working with is ---> {COUNTRIES_DICT[RANDOM_COUNTRY[1]].upper()} <---.")

The first country we are working with is ---> GERMANY <---.


Yes, when you are doing anything that deals with *randomness* inside the code it is a good practise to do ```random.seed()``` for reproducibility, but I've just played with the idea of randomness here to put my personal *touch* inside the code. ;)

In [14]:
#Some of the rows contain data which is not utf-8 (ex. South Korean video names), so we have to introducte the workaround via encoding parameter
first_country = pd.read_csv(f"./youtube-data/youtube-csv-data/{RANDOM_COUNTRY[0]}", encoding='latin-1')
first_country.head(2)

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,LgVi6y5QIjM,17.14.11,Sing zu Ende! | Gesangseinlagen vom Feinsten |...,inscope21,24,2017-11-13T17:08:49.000Z,"inscope21|""sing zu ende""|""gesangseinlagen""|""ge...",252786,35885,230,1539,https://i.ytimg.com/vi/LgVi6y5QIjM/default.jpg,False,False,False,Heute gibt es mal wieder ein neues Format... w...
1,Bayt7uQith4,17.14.11,Kinder ferngesteuert im Kiosk! Erwachsene abzo...,LUKE! Die Woche und ich,23,2017-11-12T22:30:01.000Z,"Kinder|""ferngesteuert""|""Kinder ferngesteuert""|...",797196,53576,302,1278,https://i.ytimg.com/vi/Bayt7uQith4/default.jpg,False,False,False,Kinder ferngesteuert! Kinder lassen sich sooo ...


Now, I'm going to fix my previous logic for choosing columns as I've described in ```README.md``` and ```data-preparation.ipynb``` inside this repo. Apart from that, I'm going to skip the whole step-by-step explanation that I've already done in my ```data-preparation.ipynb```. The main focus will be to fix one of the final ```csv``` files, but the whole logic can be applied to the rest of the files. 

# FIX 1
```

If a video was trending for three days and had 1000 new views every day, and our dataset is:

01.01 1000 views

02.01 2000 views

03.01 3000 views

we cannot just add 1000 + 2000 + 3000 to find the total number of views.

```

We should obviosly do ```GROUP BY video_id``` and take the ```MAX(views)``` to fix this.  

In [16]:
#We will import pyspark and create a session
import pyspark
from pyspark.sql import SparkSession, types, functions as F
spark = SparkSession.builder.master("local[*]").appName("yt_trending_eda").getOrCreate()

In [17]:
first_country = spark.read.csv(f"./youtube-data/youtube-csv-data/{RANDOM_COUNTRY[0]}", header = True)

In [18]:
COLUMNS_USED = ['video_id', 'trending_date', 'channel_title', 'category_id', 'views', 'likes', 'dislikes', 'video_error_or_removed']

In [19]:
#Filtering only videos that are not removed...
first_country = first_country.select(COLUMNS_USED).filter("video_error_or_removed = False")

In [20]:
#...and dropping the uneccesary column video_error_or_removed
first_country = first_country.drop("video_error_or_removed")

In [21]:
first_country.show(5)

+-----------+-------------+--------------------+-----------+-------+------+--------+
|   video_id|trending_date|       channel_title|category_id|  views| likes|dislikes|
+-----------+-------------+--------------------+-----------+-------+------+--------+
|LgVi6y5QIjM|     17.14.11|           inscope21|         24| 252786| 35885|     230|
|Bayt7uQith4|     17.14.11|LUKE! Die Woche u...|         23| 797196| 53576|     302|
|1ZAPwfrtAFY|     17.14.11|     LastWeekTonight|         24|2418783| 97190|    6146|
|AHtypnRk7JE|     17.14.11|   100SekundenPhysik|         27| 380247| 31821|     458|
|ZJ9We4bjcg0|     17.14.11|                rezo|         24| 822213|100684|    2467|
+-----------+-------------+--------------------+-----------+-------+------+--------+
only showing top 5 rows



In [22]:
first_country.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)



In [23]:
#We have to cast the values such that we can later use some of the SQL functionalities
first_country = first_country.withColumn("category_id",first_country.category_id.cast(types.IntegerType()))\
                             .withColumn("views",first_country.views.cast(types.IntegerType()))\
                             .withColumn("likes",first_country.likes.cast(types.IntegerType()))\
                             .withColumn("dislikes",first_country.dislikes.cast(types.IntegerType()))\
                             .withColumn("channel_title",first_country.channel_title.cast(types.StringType()))
first_country.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)



In [24]:
#For Spark to do FROM statement we have to tell it that the dataframe is a table, so we will create a temporary table
first_country.registerTempTable('first_country_table')

In [25]:
#Since Spark has a way of doing operations 'lazily' it wont execute the SQL until we do some action ('tigger the execution' aka Spark Job) like for example show()

spark.sql("""
SELECT *
FROM first_country_table
""").show(n=10)


+-----------+-------------+--------------------+-----------+-------+------+--------+
|   video_id|trending_date|       channel_title|category_id|  views| likes|dislikes|
+-----------+-------------+--------------------+-----------+-------+------+--------+
|LgVi6y5QIjM|     17.14.11|           inscope21|         24| 252786| 35885|     230|
|Bayt7uQith4|     17.14.11|LUKE! Die Woche u...|         23| 797196| 53576|     302|
|1ZAPwfrtAFY|     17.14.11|     LastWeekTonight|         24|2418783| 97190|    6146|
|AHtypnRk7JE|     17.14.11|   100SekundenPhysik|         27| 380247| 31821|     458|
|ZJ9We4bjcg0|     17.14.11|                rezo|         24| 822213|100684|    2467|
|xapGFgWqtg4|     17.14.11|     Die Allestester|         22|  32709|  3093|     296|
|EIM7RMe39JY|     17.14.11|          Bodyformus|         23| 308683| 35704|     578|
|PaWTaj6Iie0|     17.14.11|          Jay & Arya|         22| 181660| 17998|     169|
|GHct2dGNLks|     17.14.11|         TeddyComedy|         23| 3691

In [26]:
#Let's quickly find the video that was most days in trending... 
spark.sql("""
SELECT video_id, COUNT(1) AS n_times_trending
FROM first_country_table
GROUP BY video_id
ORDER BY n_times_trending DESC
""").show(n=10)

[Stage 3:====>                                                    (1 + 11) / 12]

+-----------+----------------+
|   video_id|n_times_trending|
+-----------+----------------+
|pk0iqFne5eU|               7|
|myXi1KMyClc|               6|
|zTcNN-zwCog|               5|
|3ScQYM1FHrU|               5|
|ZJDMWVZta3M|               5|
|mnXnLeo54FA|               5|
|AdQsDopZfS4|               5|
|6ZfuNTqbHE8|               5|
|q23qghoF6Nk|               5|
|a3YSeVV_HF4|               5|
+-----------+----------------+
only showing top 10 rows



                                                                                

In [27]:
#Depending on the .csv file you get at random, different video_id will be in the top
TOP_VIDEO = spark.sql("""
SELECT video_id, COUNT(1) AS n_times_trending
FROM first_country_table
GROUP BY video_id
ORDER BY n_times_trending DESC
""").first()[0]

                                                                                

In [28]:
#... and see the regression of views and likes for that particular one

spark.sql(f"""
SELECT video_id, trending_date, views, likes
FROM first_country_table
WHERE video_id = "{TOP_VIDEO}"
ORDER BY trending_date ASC
""").show()

+-----------+-------------+-------+-----+
|   video_id|trending_date|  views|likes|
+-----------+-------------+-------+-----+
|pk0iqFne5eU|     18.08.03|  46622| 1296|
|pk0iqFne5eU|     18.09.03| 288392| 4839|
|pk0iqFne5eU|     18.10.03| 817531| 9717|
|pk0iqFne5eU|     18.11.03|1304894|12763|
|pk0iqFne5eU|     18.12.03|1649350|14414|
|pk0iqFne5eU|     18.13.03|1806532|15180|
|pk0iqFne5eU|     18.14.03|1985594|15947|
+-----------+-------------+-------+-----+



Seems that suggested fix makes complete sense. There is an obvious increase in views and/or likes as time progresses. Now, we have to filter those videos that have ```MAX(views)``` but also keeping all of the columns. 

Simple ```GROUP BY``` wont work because in a ```SELECT``` statement you can use only one column for aggregation and the other one is aggregation function. We need all the columns!

The method I will present here is not fully optimized and using ```JOINS``` is expensive, but it will work for this problem. The idea is to create another table from the existing one which will contain unique ```video_id``` and it's corresponding column ```MAX(views)```. Then, we will have two tables which we will ```(SELF)JOIN``` together.

Bear in mind that when we do such a ```JOIN``` we will have "many-to-one" relationship (```n:1```, where ```n``` represents number of time the video was in trending).

If you look at the above example you will see that for each unique ```video_id``` there are ```n``` rows with different number of ```view```s. That is our left table, which we will join with the right table that contains two columns: unique ```video_id``` with it's corresponding ```MAX(views)```. 

Let's visualize it via one of the examples - video that has been the most times in trending!

In [29]:
#Let's first define the right table. I will be using Common Table Expressions (CTEs) after this explanation
first_country_max_views = first_country.groupBy("video_id").agg(F.max("views").alias("max_views"))
first_country_max_views.show(5)

+-----------+---------+
|   video_id|max_views|
+-----------+---------+
|_UEk3WRixnc|    31500|
|bAkEd8r7Nnw|  3300683|
|llyuA7q2BkY|     7182|
|CXZWqVA-pi8|   120790|
|4V38pzhWwCo|  2326516|
+-----------+---------+
only showing top 5 rows



In [30]:
#Let's register it as a table... 
first_country_max_views.registerTempTable('first_country_max_views_table')

In [31]:
#... and test on one of the examples presented above if everything is correct
spark.sql(f"""
SELECT *
FROM first_country_max_views_table
WHERE video_id = "{TOP_VIDEO}"
""").show()

+-----------+---------+
|   video_id|max_views|
+-----------+---------+
|pk0iqFne5eU|  1985594|
+-----------+---------+



In [32]:
#Here is how left table...
spark.sql(f"""
SELECT *
FROM first_country_table
WHERE video_id = "{TOP_VIDEO}"
""").show()

+-----------+-------------+-------------+-----------+-------+-----+--------+
|   video_id|trending_date|channel_title|category_id|  views|likes|dislikes|
+-----------+-------------+-------------+-----------+-------+-----+--------+
|pk0iqFne5eU|     18.08.03|  Wissenswert|         22|  46622| 1296|     184|
|pk0iqFne5eU|     18.09.03|  Wissenswert|         22| 288392| 4839|    2109|
|pk0iqFne5eU|     18.10.03|  Wissenswert|         22| 817531| 9717|    5054|
|pk0iqFne5eU|     18.11.03|  Wissenswert|         22|1304894|12763|    6751|
|pk0iqFne5eU|     18.12.03|  Wissenswert|         22|1649350|14414|    7688|
|pk0iqFne5eU|     18.13.03|  Wissenswert|         22|1806532|15180|    8127|
|pk0iqFne5eU|     18.14.03|  Wissenswert|         22|1985594|15947|    8652|
+-----------+-------------+-------------+-----------+-------+-----+--------+



In [33]:
#... and right table for this particular example look 
spark.sql(f"""
SELECT *
FROM first_country_max_views_table
WHERE video_id = "{TOP_VIDEO}"
""").show()

+-----------+---------+
|   video_id|max_views|
+-----------+---------+
|pk0iqFne5eU|  1985594|
+-----------+---------+



In [34]:
#Let's join those two to showcase what've previously described
spark.sql(f"""
SELECT t1.video_id AS ______VIDEO_ID_______, t1.trending_date,
       t1.category_id, t1.views AS _______LEFT_TABLE_VIEWS_______, t1.likes, t1.dislikes AS dislikes_0,
       t2.video_id AS 0_______VIDEO_ID_______, t2.max_views AS _______RIGHT_TABLE_VIEWS_______
FROM first_country_table AS t1 INNER JOIN first_country_max_views_table AS t2 ON t1.video_id = t2.video_id
WHERE t1.video_id = "{TOP_VIDEO}"
""").show()

+---------------------+-------------+-----------+------------------------------+-----+----------+-----------------------+-------------------------------+
|______VIDEO_ID_______|trending_date|category_id|_______LEFT_TABLE_VIEWS_______|likes|dislikes_0|0_______VIDEO_ID_______|_______RIGHT_TABLE_VIEWS_______|
+---------------------+-------------+-----------+------------------------------+-----+----------+-----------------------+-------------------------------+
|          pk0iqFne5eU|     18.08.03|         22|                         46622| 1296|       184|            pk0iqFne5eU|                        1985594|
|          pk0iqFne5eU|     18.09.03|         22|                        288392| 4839|      2109|            pk0iqFne5eU|                        1985594|
|          pk0iqFne5eU|     18.10.03|         22|                        817531| 9717|      5054|            pk0iqFne5eU|                        1985594|
|          pk0iqFne5eU|     18.11.03|         22|                       1304

I've aliased the columns and put the additional underscores so you can focus your attention to the thing I've explained: while joining you have n:1 type of join and you see that you have all the permutations for views just for one unique ```video_id```. For those of you who are still struggling with understanding the table joins I've put zeroes in names of two columns (dislikes_**0** and **0**\_\_\_\_\_\_\_VIDEO_ID\_______) that represent *fictional* border between two tables (left and right one).  

Well, the obvious trick is, while joining, to filter only columns where the number of views in the left table column is equal to the number of views in the right table. You could do the same by using ```likes``` column and still preserve the ```views``` since each day the number of views is increasing. 

In [35]:
#For the above example it would look like this
spark.sql(f"""
SELECT t1.video_id AS ______VIDEO_ID_______, t1.trending_date,
       t1.category_id, t1.views AS _______LEFT_TABLE_VIEWS_______, t1.likes, t1.dislikes AS dislikes_0,
       t2.video_id AS 0_______VIDEO_ID_______, t2.max_views AS _______RIGHT_TABLE_VIEWS_______
FROM first_country_table AS t1 INNER JOIN first_country_max_views_table AS t2 ON t1.video_id = t2.video_id
WHERE t1.video_id = "{TOP_VIDEO}" AND t1.views = t2.max_views
""").show()

+---------------------+-------------+-----------+------------------------------+-----+----------+-----------------------+-------------------------------+
|______VIDEO_ID_______|trending_date|category_id|_______LEFT_TABLE_VIEWS_______|likes|dislikes_0|0_______VIDEO_ID_______|_______RIGHT_TABLE_VIEWS_______|
+---------------------+-------------+-----------+------------------------------+-----+----------+-----------------------+-------------------------------+
|          pk0iqFne5eU|     18.14.03|         22|                       1985594|15947|      8652|            pk0iqFne5eU|                        1985594|
+---------------------+-------------+-----------+------------------------------+-----+----------+-----------------------+-------------------------------+



Notice that I've added ```AND t1.views = t2.max_views``` into ```WHERE``` clause!

Now, imagine that for each unique ```video_id``` in the entire dataset you will repeat the same process! That is what the next ```SQL``` query will do in a nutshell. Also, we will show only the columns from the left table, since the right table is used here as a *helping table* for filtering ```MAX(views)```.

In [36]:
first_country = spark.sql(f"""

WITH helping_table AS (
    SELECT video_id, MAX(views) AS max_views
    FROM first_country_table
    GROUP BY video_id)

SELECT t1.video_id AS video_id, t1.channel_title AS channel_title, t1.trending_date AS trending_date,
       t1.category_id AS category_id, t1.views AS views, t1.likes AS likes, t1.dislikes AS dislikes
FROM first_country_table AS t1 INNER JOIN helping_table AS t2 ON t1.video_id = t2.video_id
WHERE t1.views = t2.max_views
""")

In [37]:
first_country.show()

+-----------+----------------+-------------+-----------+--------+-------+--------+
|   video_id|   channel_title|trending_date|category_id|   views|  likes|dislikes|
+-----------+----------------+-------------+-----------+--------+-------+--------+
|EIM7RMe39JY|      Bodyformus|     17.14.11|         23|  308683|  35704|     578|
|aZYSFByDGkg|         WALULIS|     17.14.11|          1|   62418|   4749|      44|
|2hu_evXPpMM|    HerrNewstime|     17.14.11|         24|  228574|  11349|     990|
|2Zp-Qm3wJkA|  JP Performance|     17.14.11|          2|  465883|  19928|     216|
|3U51cVIqulM|     PlanetKanax|     17.14.11|         23|   99988|   6397|     298|
|OKYUtHvgMhc|          VOLKAN|     17.14.11|         24|   37877|   1839|     327|
|k_IrAnVSjYE|    Tanja Bremer|     17.14.11|         22|    9413|   1522|      39|
|KLxP8VxZjlk|     BJ Magazine|     17.14.11|         25|   91914|     17|      13|
|cJx5blgWjDw|           Jarow|     17.14.11|         24|  373833|  21320|    2901|
|PK8

Currently, I'm looking at a table of trending videos for Germany (DE). Can't miss the thing that one of the trending videos was from ```Zvezde Granda```. There is a fun saying that *there are now more people from Balkans in Germany than the Germans itself*.

Allow me this memorable ```SQL``` statement:

In [38]:
#I don't know if there is Zvezde Granda in other countries so bear in mind that this might return empty table!
first_country.registerTempTable('first_country_de_table')

spark.sql("""

SELECT *
FROM first_country_de_table
WHERE channel_title = 'Zvezde Granda'
""").show()

+-----------+-------------+-------------+-----------+------+-----+--------+
|   video_id|channel_title|trending_date|category_id| views|likes|dislikes|
+-----------+-------------+-------------+-----------+------+-----+--------+
|oWQuB2lVQLc|Zvezde Granda|     17.14.11|         24|496192| 1503|     443|
|4VqH3Ecx2R0|Zvezde Granda|     17.22.11|         24|744974| 2196|     522|
|4sbm8bB583w|Zvezde Granda|     17.28.11|         24|650645| 1912|     492|
|8euPVkj4NRo|Zvezde Granda|     17.05.12|         24|455935| 1349|     262|
|gP6uGOLeG0M|Zvezde Granda|     17.12.12|         24|603544| 1578|     426|
|SGtiZlq4n8k|Zvezde Granda|     17.19.12|         24|486761| 1499|     418|
|gn4BHh_I5Tg|Zvezde Granda|     17.26.12|         24|476073| 1633|     408|
|pcfyR8a96G8|Zvezde Granda|     18.02.01|         24|318258| 1121|     277|
|xcoDlq2AiOU|Zvezde Granda|     18.09.01|         24|413322| 1382|     344|
|gU492TPrm8g|Zvezde Granda|     18.16.01|         24|507564| 1385|     418|
|JpjfAYrZxXU

In [39]:
#Spark says "only showing top 20 rows":
spark.sql("""

SELECT *
FROM first_country_de_table
WHERE channel_title = 'Zvezde Granda'
""").count()

31

**31 times** music from famous ```Zvezde Granda``` (Serbia) were trending in Germany! Let's see if there were times where it was the most viewed video in trending!

In [40]:
spark.sql("""

WITH sinan_sakic AS (
    SELECT trending_date, MAX(views) AS views
    FROM first_country_de_table
    GROUP BY trending_date)

SELECT t1.video_id AS video_id, t1.channel_title AS channel_title, t1.trending_date AS trending_date, t1.views AS views, t1.likes AS likes, t1.dislikes AS dislikes
FROM first_country_de_table AS t1 INNER JOIN sinan_sakic AS t2 ON t1.trending_date = t2.trending_date
WHERE t1.views = t2.views AND t1.channel_title = 'Zvezde Granda'
""").show()

+--------+-------------+-------------+-----+-----+--------+
|video_id|channel_title|trending_date|views|likes|dislikes|
+--------+-------------+-------------+-----+-----+--------+
+--------+-------------+-------------+-----+-----+--------+



I think the ```SQL``` logic is broken.... ;)

Enough of the fun part! Let's drop the ```trending_date``` column, add the column ```country``` that will represent the country abbreviation and finally merge this table with the corresponding ```JSON``` file to get the ```category_name``` based on the ```category_id```.

In [41]:
first_country = first_country.drop('trending_date').select('*', F.lit(f'{RANDOM_COUNTRY[1]}').alias("country"))
first_country.show()

+-----------+----------------+-----------+--------+-------+--------+-------+
|   video_id|   channel_title|category_id|   views|  likes|dislikes|country|
+-----------+----------------+-----------+--------+-------+--------+-------+
|EIM7RMe39JY|      Bodyformus|         23|  308683|  35704|     578|     DE|
|aZYSFByDGkg|         WALULIS|          1|   62418|   4749|      44|     DE|
|2hu_evXPpMM|    HerrNewstime|         24|  228574|  11349|     990|     DE|
|2Zp-Qm3wJkA|  JP Performance|          2|  465883|  19928|     216|     DE|
|3U51cVIqulM|     PlanetKanax|         23|   99988|   6397|     298|     DE|
|OKYUtHvgMhc|          VOLKAN|         24|   37877|   1839|     327|     DE|
|k_IrAnVSjYE|    Tanja Bremer|         22|    9413|   1522|      39|     DE|
|KLxP8VxZjlk|     BJ Magazine|         25|   91914|     17|      13|     DE|
|cJx5blgWjDw|           Jarow|         24|  373833|  21320|    2901|     DE|
|PK8lHszeXNk|     Cool Mobile|         22|   27356|    704|      31|     DE|

In [97]:
#Finally, let's register it as a new table (overwrite it on the existing one!)
spark.catalog.listTables()

[Table(name='first_country_de_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='first_country_max_views_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='first_country_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [98]:
spark.catalog.dropTempView("first_country_table")
first_country.registerTempTable('first_country_table')

# Accesing JSON file

In [42]:
!ls youtube-data/youtube-json-data/

CA_category_id.json GB_category_id.json KR_category_id.json US_category_id.json
DE_category_id.json IN_category_id.json MX_category_id.json
FR_category_id.json JP_category_id.json RU_category_id.json


In [81]:
# Read: https://analyticshut.com/reading-json-data-in-spark/
# and https://medium.com/expedia-group-tech/working-with-json-in-apache-spark-1ecf553c2a8c
first_country_json = spark.read.option("multiline","true").json(f"./youtube-data/youtube-json-data/{RANDOM_COUNTRY[1]}_category_id.json")
first_country_json.show()

+--------------------+--------------------+--------------------+
|                etag|               items|                kind|
+--------------------+--------------------+--------------------+
|"ld9biNPKjAjgjV7E...|[{"ld9biNPKjAjgjV...|youtube#videoCate...|
+--------------------+--------------------+--------------------+



In [82]:
first_country_json.printSchema()

root
 |-- etag: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- etag: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- kind: string (nullable = true)
 |    |    |-- snippet: struct (nullable = true)
 |    |    |    |-- assignable: boolean (nullable = true)
 |    |    |    |-- channelId: string (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |-- kind: string (nullable = true)



In [83]:
#Snippet of a JSON file
{'kind': 'youtube#videoCategory',
 'etag': '"ld9biNPKjAjgjV7EZ4EKeEGrhao/Xy1mB4_yLrHy_BmKmPBggty2mZQ"',
 'id': '1',
 'snippet': {'channelId': 'UCBR8-60-B28hp2BmDPdntcQ',
  'title': 'Film & Animation',
  'assignable': True}}

{'kind': 'youtube#videoCategory',
 'etag': '"ld9biNPKjAjgjV7EZ4EKeEGrhao/Xy1mB4_yLrHy_BmKmPBggty2mZQ"',
 'id': '1',
 'snippet': {'channelId': 'UCBR8-60-B28hp2BmDPdntcQ',
  'title': 'Film & Animation',
  'assignable': True}}

In [84]:
first_country_json.select("items.id", "items.snippet.title").show()

+--------------------+--------------------+
|                  id|               title|
+--------------------+--------------------+
|[1, 2, 10, 15, 17...|[Film & Animation...|
+--------------------+--------------------+



Let's explore those two columns more clearly:

In [85]:
first_country_json.select(F.explode("items.id").alias("cat_id")).show()

+------+
|cat_id|
+------+
|     1|
|     2|
|    10|
|    15|
|    17|
|    18|
|    19|
|    20|
|    21|
|    22|
|    23|
|    24|
|    25|
|    26|
|    27|
|    28|
|    30|
|    31|
|    32|
|    33|
+------+
only showing top 20 rows



In [86]:
first_country_json.select(F.explode("items.snippet.title").alias("category")).show()

+--------------------+
|            category|
+--------------------+
|    Film & Animation|
|    Autos & Vehicles|
|               Music|
|      Pets & Animals|
|              Sports|
|        Short Movies|
|     Travel & Events|
|              Gaming|
|       Videoblogging|
|      People & Blogs|
|              Comedy|
|       Entertainment|
|     News & Politics|
|       Howto & Style|
|           Education|
|Science & Technology|
|              Movies|
|     Anime/Animation|
|    Action/Adventure|
|            Classics|
+--------------------+
only showing top 20 rows



In [87]:
first_country_json.select("items.id", "items.snippet.title").show()

+--------------------+--------------------+
|                  id|               title|
+--------------------+--------------------+
|[1, 2, 10, 15, 17...|[Film & Animation...|
+--------------------+--------------------+



In [88]:
#Here is a solution on how to 'explode' multiple columns and make a table out of it
#https://stackoverflow.com/questions/51082758/how-to-explode-multiple-columns-of-a-dataframe-in-pyspark
first_country_json.selectExpr('inline(arrays_zip(items.id,items.snippet.title))') \
                  .withColumnRenamed("0", "cat_id") \
                  .withColumnRenamed("1", "category") \
                  .show()

+------+--------------------+
|cat_id|            category|
+------+--------------------+
|     1|    Film & Animation|
|     2|    Autos & Vehicles|
|    10|               Music|
|    15|      Pets & Animals|
|    17|              Sports|
|    18|        Short Movies|
|    19|     Travel & Events|
|    20|              Gaming|
|    21|       Videoblogging|
|    22|      People & Blogs|
|    23|              Comedy|
|    24|       Entertainment|
|    25|     News & Politics|
|    26|       Howto & Style|
|    27|           Education|
|    28|Science & Technology|
|    30|              Movies|
|    31|     Anime/Animation|
|    32|    Action/Adventure|
|    33|            Classics|
+------+--------------------+
only showing top 20 rows



In [89]:
#Let's save it as a temp table
first_country_json = first_country_json.selectExpr('inline(arrays_zip(items.id,items.snippet.title))') \
                  .withColumnRenamed("0", "cat_id") \
                  .withColumnRenamed("1", "category")

In [99]:
first_country_json.registerTempTable('first_country_json_table')

In [101]:
#Let's clear up the unused Temporary Tables (Views)
spark.catalog.listTables()

[Table(name='first_country_de_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='first_country_json_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='first_country_max_views_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='first_country_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [104]:
spark.catalog.dropTempView("first_country_de_table")
spark.catalog.dropTempView("first_country_max_views_table")

In [105]:
spark.catalog.listTables()

[Table(name='first_country_json_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='first_country_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

# JOINING CSV AND JSON FILE

In [112]:
#As I've explained in the data-preparation.ipynb we will use LEFT JOIN in case that there might be possibility of API collecting a wrong id that doesn't exist
#in the accompanying JSON file

first_country_combined = spark.sql("""

SELECT *
FROM first_country_table AS t1 LEFT JOIN first_country_json_table AS t2 ON t1.category_id = t2.cat_id
""").drop("category_id", "cat_id")

first_country_combined.show()

+-----------+----------------+--------+-------+--------+-------+----------------+
|   video_id|   channel_title|   views|  likes|dislikes|country|        category|
+-----------+----------------+--------+-------+--------+-------+----------------+
|EIM7RMe39JY|      Bodyformus|  308683|  35704|     578|     DE|          Comedy|
|aZYSFByDGkg|         WALULIS|   62418|   4749|      44|     DE|Film & Animation|
|2hu_evXPpMM|    HerrNewstime|  228574|  11349|     990|     DE|   Entertainment|
|2Zp-Qm3wJkA|  JP Performance|  465883|  19928|     216|     DE|Autos & Vehicles|
|3U51cVIqulM|     PlanetKanax|   99988|   6397|     298|     DE|          Comedy|
|OKYUtHvgMhc|          VOLKAN|   37877|   1839|     327|     DE|   Entertainment|
|k_IrAnVSjYE|    Tanja Bremer|    9413|   1522|      39|     DE|  People & Blogs|
|KLxP8VxZjlk|     BJ Magazine|   91914|     17|      13|     DE| News & Politics|
|cJx5blgWjDw|           Jarow|  373833|  21320|    2901|     DE|   Entertainment|
|PK8lHszeXNk|   

In [114]:
RANDOM_COUNTRY[1].lower()

'de'

In [120]:
#Finally, we will export it:
first_country_combined.write.mode('overwrite').csv(f'./exported-data-spark/{RANDOM_COUNTRY[1].lower()}_videos.csv', header = True)