# YouTube Trending Video Dataset
In this project, we use the spark python library as a Data Ingestion to read and improve the efficiency of data to create a pipeline of YouTube trending videos in ten countries.


## Import Libraries¶

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StructType,StringType,IntegerType,FloatType,BooleanType,DateType,TimestampType,LongType,ArrayType
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('youtube') \
    .getOrCreate()

## Data Ingestion

In [2]:
# read all files (both csv and json)
csv_file = []
json_file = []

for fname in os.listdir('archive'):
    if fname.split('.')[1] == 'csv':
        csv_file.append(fname)
    elif (fname.split('.')[1] == 'json'):
        json_file.append(fname)
        
print('csv Files Name: ', csv_file)   
print('json Files Name: ', json_file)

csv Files Name:  ['CAvideos.csv', 'DEvideos.csv', 'FRvideos.csv', 'GBvideos.csv', 'INvideos.csv', 'JPvideos.csv', 'KRvideos.csv', 'MXvideos.csv', 'RUvideos.csv', 'USvideos.csv']
json Files Name:  ['CA_category_id.json', 'DE_category_id.json', 'FR_category_id.json', 'GB_category_id.json', 'IN_category_id.json', 'JP_category_id.json', 'KR_category_id.json', 'MX_category_id.json', 'RU_category_id.json', 'US_category_id.json']


In [3]:
# change colunms datatype to be same type in all files

data_schema = [StructField('video_id', StringType(), True), StructField('trending_date', StringType(), True),
               StructField('title', StringType(), True), StructField('channel_title', StringType(), True),
               StructField('category_id', IntegerType(), True), StructField('publish_time', TimestampType(), True),
               StructField('tags', StringType(), True), StructField('views', LongType(), True),
               StructField('likes', IntegerType(), True), StructField('dislikes', IntegerType(), True),
               StructField('comment_count', IntegerType(), True),StructField('thumbnail_link', StringType(), True),
               StructField('comments_disabled', BooleanType(), True),StructField('ratings_disabled', BooleanType(), True),
               StructField('video_error_or_removed', BooleanType(), True),StructField('description', StringType(), True)]
final = StructType(fields=data_schema)


In [4]:
#read csv files 

count = 0
for file in csv_file:
    if count == 0 :
        csv_df = spark.read.csv(f'archive\\{file}',schema=final, header=True)
        count+=1
        print(file)
        continue
    else:
        print(file)
        csv_df = csv_df.union(spark.read.csv(f'archive\\{file}',schema=final))
        print('df new count :' , csv_df.count())
        
print('Total Rows count: ' ,  csv_df.count())
print('Schema'.center(70,'-'))
print(csv_df.printSchema())
#df.distinct()

CAvideos.csv
DEvideos.csv
df new count : 92518
FRvideos.csv
df new count : 138657
GBvideos.csv
df new count : 181953
INvideos.csv
df new count : 220482
JPvideos.csv
df new count : 241928
KRvideos.csv
df new count : 278659
MXvideos.csv
df new count : 322479
RUvideos.csv
df new count : 368740
USvideos.csv
df new count : 416878
Total Rows count:  416878
--------------------------------Schema--------------------------------
root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: long (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled

In [None]:
# save data as a backup csv file 
df1 = csv_df.toPandas()
df1.to_csv('all_data.csv')

In [5]:
# write pyspark dataframe to mysql database
csv_df.write.format("jdbc").option("url", "jdbc:mysql://localhost:3306/youtube") \
	.option("driver", "com.mysql.jdbc.Driver").option("dbtable", "csvtable") \
	.option("user", "root").option("password", "Mostafa11!").save()

In [5]:
# now take a look at data
df = spark.read.csv('archive\\CAvideos.csv',inferSchema=True, header=True)
df_pd = df.toPandas()
df_pd.head()
#df_pd['channelId'].unique()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,n1WpP7iowLc,17.14.11,Eminem - Walk On Water (Audio) ft. Beyoncé,EminemVEVO,10,2017-11-10T17:00:03.000Z,"""Eminem""|""Walk""|""On""|""Water""|""Aftermath/Shady/...",17158579,787425,43420,125882,https://i.ytimg.com/vi/n1WpP7iowLc/default.jpg,False,False,False,Eminem's new track Walk on Water ft. Beyoncé i...
1,0dBIkQ4Mz1M,17.14.11,PLUSH - Bad Unboxing Fan Mail,iDubbbzTV,23,2017-11-13T17:00:00.000Z,"""plush""|""bad unboxing""|""unboxing""|""fan mail""|""...",1014651,127794,1688,13030,https://i.ytimg.com/vi/0dBIkQ4Mz1M/default.jpg,False,False,False,STill got a lot of packages. Probably will las...
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146035,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,d380meD0W0M,17.14.11,I Dare You: GOING BALD!?,nigahiga,24,2017-11-12T18:01:41.000Z,"""ryan""|""higa""|""higatv""|""nigahiga""|""i dare you""...",2095828,132239,1989,17518,https://i.ytimg.com/vi/d380meD0W0M/default.jpg,False,False,False,I know it's been a while since we did this sho...
4,2Vv-BfVoq4g,17.14.11,Ed Sheeran - Perfect (Official Music Video),Ed Sheeran,10,2017-11-09T11:04:14.000Z,"""edsheeran""|""ed sheeran""|""acoustic""|""live""|""co...",33523622,1634130,21082,85067,https://i.ytimg.com/vi/2Vv-BfVoq4g/default.jpg,False,False,False,🎧: https://ad.gt/yt-perfect\n💰: https://atlant...


In [None]:
#check json files
count = 0
for file in json_file:
    if count == 0 :
        f = open(f'archive\\{file}')
        jfile = json.load(f)
        length = len(jfile['items'])
        jitem = jfile['items']
        count+=1
        print(file)
        title = dict(zip([int(jitem[i]['id']) for i in range(length)],[jitem[i]['snippet']['title'] for i in range(length)]))
        print(title)
        continue
    else:
        f = open(f'archive\\{file}')
        jfile = json.load(f)
        length = len(jfile['items'])
        jitem = jfile['items']
        title = dict(zip([int(jitem[i]['id']) for i in range(length)],[jitem[i]['snippet']['title'] for i in range(length)]))
        #assignable = dict(zip([int(jitem[i]['id']) for i in range(length)],[jitem[i]['snippet']['assignable'] for i in range(length)]))
        print(file)
        print(title)

f.close()
print(title)
print(assignable)

In [8]:
#read json file
import json

# read important field in json (id, title, assignable)
# note that category id is identical for all json file
f = open('archive/US_category_id.json')
jfile = json.load(f)
length = len(jfile['items'])
jitem = jfile['items']
 
title = dict(zip([int(jitem[i]['id']) for i in range(length)],[jitem[i]['snippet']['title'] for i in range(length)]))
assignable = dict(zip([int(jitem[i]['id']) for i in range(length)],[jitem[i]['snippet']['assignable'] for i in range(length)]))
f.close()
print(title)
print(assignable)

{1: 'Film & Animation', 2: 'Autos & Vehicles', 10: 'Music', 15: 'Pets & Animals', 17: 'Sports', 18: 'Short Movies', 19: 'Travel & Events', 20: 'Gaming', 21: 'Videoblogging', 22: 'People & Blogs', 23: 'Comedy', 24: 'Entertainment', 25: 'News & Politics', 26: 'Howto & Style', 27: 'Education', 28: 'Science & Technology', 29: 'Nonprofits & Activism', 30: 'Movies', 31: 'Anime/Animation', 32: 'Action/Adventure', 33: 'Classics', 34: 'Comedy', 35: 'Documentary', 36: 'Drama', 37: 'Family', 38: 'Foreign', 39: 'Horror', 40: 'Sci-Fi/Fantasy', 41: 'Thriller', 42: 'Shorts', 43: 'Shows', 44: 'Trailers'}
{1: True, 2: True, 10: True, 15: True, 17: True, 18: False, 19: True, 20: True, 21: False, 22: True, 23: True, 24: True, 25: True, 26: True, 27: True, 28: True, 29: True, 30: False, 31: False, 32: False, 33: False, 34: False, 35: False, 36: False, 37: False, 38: False, 39: False, 40: False, 41: False, 42: False, 43: False, 44: False}


In [9]:
# convert each category id in csv_df into name 
titleList = []
assignList = []
column_data = csv_df.select('category_id')
category_id = column_data.collect()
i = 0
for cat_id in category_id:

    if cat_id[i] == None or cat_id[i] == 7:
        titleList.append(None)
        assignList.append(None)
    elif cat_id[i] not in title:
        titleList.append(None)
        assignList.append(None)
    else:    
        titleList.append(title[(cat_id[i])])
        assignList.append(assignable[cat_id[i]])

print('title length: ', len(titleList)) 
print(titleList[:10])
print('assignable length: ', len(assignList))            
print(assignList[:10])

title length:  416878
['Music', 'Comedy', 'Comedy', 'Entertainment', 'Music', 'News & Politics', 'Comedy', 'People & Blogs', 'Entertainment', 'People & Blogs']
assignable length:  416878
[True, True, True, True, True, True, True, True, True, True]


In [10]:
#add this two list as new dataframe repreasent json files
import numpy as np

# Convert the lists to a NumPy array
data = np.array([titleList, assignList]).T
schema = StructType([
	StructField("vedio_title", StringType(), True),
	StructField("is_assignable", StringType(), True)
])

# Create the PySpark DataFrame
jsondf = spark.createDataFrame(data.tolist(), schema=schema)
# Show the DataFrame
jsondf.show()

+----------------+-------------+
|     vedio_title|is_assignable|
+----------------+-------------+
|           Music|         true|
|          Comedy|         true|
|          Comedy|         true|
|   Entertainment|         true|
|           Music|         true|
| News & Politics|         true|
|          Comedy|         true|
|  People & Blogs|         true|
|   Entertainment|         true|
|  People & Blogs|         true|
|           Music|         true|
|   Howto & Style|         true|
|Film & Animation|         true|
|           Music|         true|
|   Entertainment|         true|
|  People & Blogs|         true|
|          Comedy|         true|
|   Entertainment|         true|
|Film & Animation|         true|
|Film & Animation|         true|
+----------------+-------------+
only showing top 20 rows



In [19]:
# save data as a backup csv file 
jsondf.write.format("csv").save("json_data")

In [7]:
# write pyspark dataframe to mysql database
jsondf.select("vedio_title","is_assignable").write.format("jdbc").option("url", "jdbc:mysql://localhost:3306/youtube") \
      .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "jsontable") \
      .option("user", "root").option("password", "Mostafa11!").save()


