## 1. PySpark environment setup

In [1]:
import warnings 
warnings.filterwarnings("ignore")

In [2]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## 2. Data source and Spark data abstraction (DataFrame) setup

In [3]:
CA_DF = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("CAvideos2.csv")

                                                                                

## 3. Data set metadata analysis
### A. Display schema and size of the DataFrame

In [4]:
from IPython.display import display, Markdown

CA_DF.printSchema()
display(Markdown("This DataFrame has **%d rows**." % CA_DF.count()))

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



This DataFrame has **45560 rows**.

### B. Define a Schema

After looking at the data source schema, we are going to create a new schema to reflect more accurately some columns, and make data analysis smoother. Additionally, we plan to combine USA, MEXICO, and CANADA data into one DF for North America data

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, BooleanType, DateType, TimestampType

NASchema = StructType(\
     [StructField("video_id",StringType(),True),\
     StructField("trending_date",StringType(),True),\
     StructField("title",StringType(),True),\
     StructField("channel_title",StringType(),True),\
     StructField("category_id",IntegerType(),True),\
     StructField("publish_time",TimestampType(),True),\
     StructField("tags",StringType(),True),\
     StructField("views",IntegerType(),True),\
     StructField("likes",IntegerType(),True),\
     StructField("dislikes",IntegerType(),True),\
     StructField("comment_count",IntegerType(),True),\
     StructField("thumbnail_link",StringType(),True),\
     StructField("comments_disabled",BooleanType(),True),\
     StructField("ratings_disabled",BooleanType(),True),\
     StructField("video_error_or_removed",BooleanType(),True),\
     StructField("description",StringType(),True)])

**STILL NEED TO CONVERT STRINGS TO DATE TYPE FORMAT**

In [7]:
NA_DF = spark.read.schema(NASchema).option("header", "true").option("sep", ",").csv("NA_youtube_data/*.csv")
NA_DF.first()


Row(video_id='n1WpP7iowLc', trending_date='17.14.11', title='Eminem - Walk On Water (Audio) ft. Beyoncé', channel_title='EminemVEVO', category_id=10, publish_time=datetime.datetime(2017, 11, 10, 18, 0, 3), tags='"Eminem"|"Walk"|"On"|"Water"|"Aftermath/Shady/Interscope"|"Rap"', views=17158579, likes=787425, dislikes=43420, comment_count=125882, thumbnail_link='https://i.ytimg.com/vi/n1WpP7iowLc/default.jpg', comments_disabled=False, ratings_disabled=False, video_error_or_removed=False, description="Eminem's new track Walk on Water ft. Beyoncé is available everywhere: http://shady.sr/WOWEminem \\nPlaylist Best of Eminem: https://goo.gl/AquNpo\\nSubscribe for more: https://goo.gl/DxCrDV\\n\\nFor more visit: \\nhttp://eminem.com\\nhttp://facebook.com/eminem\\nhttp://twitter.com/eminem\\nhttp://instagram.com/eminem\\nhttp://eminem.tumblr.com\\nhttp://shadyrecords.com\\nhttp://facebook.com/shadyrecords\\nhttp://twitter.com/shadyrecords\\nhttp://instagram.com/shadyrecords\\nhttp://trustshady.

In [8]:
NA_DF.select("trending_date","publish_time").limit(2).show(truncate = False)

+-------------+-------------------+
|trending_date|publish_time       |
+-------------+-------------------+
|17.14.11     |2017-11-10 18:00:03|
|17.14.11     |2017-11-13 18:00:00|
+-------------+-------------------+



In [23]:
from pyspark.sql.functions import to_date, to_timestamp, col, concat, lit

NA_DF.select("trending_date","publish_time")\
        .limit(1)\
        .withColumn("dt_trending_date", to_date(concat(lit("20"), col("trending_date")),"y.d.M"))\
        .withColumn("dt_publish_time", to_timestamp(col("publish_time")))\
        .show(truncate=False)

+-------------+-------------------+----------------+-------------------+
|trending_date|publish_time       |dt_trending_date|dt_publish_time    |
+-------------+-------------------+----------------+-------------------+
|17.14.11     |2017-11-10 18:00:03|2017-11-14      |2017-11-10 18:00:03|
+-------------+-------------------+----------------+-------------------+



In [31]:
from pyspark.sql.functions import to_date, to_timestamp, col, concat, lit

dataframe = NA_DF.select("trending_date","publish_time")\
        .withColumn("dt_trending_date", to_date(concat(lit("20"), col("trending_date")),"y.d.M"))\
        .withColumn("dt_publish_time", to_timestamp(col("publish_time")))\
        .show(truncate=False)

+-------------+-------------------+----------------+-------------------+
|trending_date|publish_time       |dt_trending_date|dt_publish_time    |
+-------------+-------------------+----------------+-------------------+
|17.14.11     |2017-11-10 18:00:03|2017-11-14      |2017-11-10 18:00:03|
|17.14.11     |2017-11-13 18:00:00|2017-11-14      |2017-11-13 18:00:00|
|17.14.11     |2017-11-12 20:05:24|2017-11-14      |2017-11-12 20:05:24|
|17.14.11     |2017-11-12 19:01:41|2017-11-14      |2017-11-12 19:01:41|
|17.14.11     |2017-11-09 12:04:14|2017-11-14      |2017-11-09 12:04:14|
|17.14.11     |2017-11-13 08:37:51|2017-11-14      |2017-11-13 08:37:51|
|17.14.11     |2017-11-13 00:52:13|2017-11-14      |2017-11-13 00:52:13|
|17.14.11     |2017-11-13 18:13:01|2017-11-14      |2017-11-13 18:13:01|
|17.14.11     |2017-11-12 21:19:24|2017-11-14      |2017-11-12 21:19:24|
|17.14.11     |2017-11-10 15:10:46|2017-11-14      |2017-11-10 15:10:46|
|17.14.11     |2017-11-10 20:00:02|2017-11-14      

### Unfortunately, the output of the above query is a NoneType, and we cannot use it for any aggregations, or append to our dataframe

In [37]:
dataframe.schema

AttributeError: 'NoneType' object has no attribute 'schema'

In [11]:
from IPython.display import display, Markdown

display(Markdown("This DataFrame has **%d rows**." % NA_DF.count()))

                                                                                

This DataFrame has **137516 rows**.

In [13]:
NA_DF.columns

['video_id',
 'trending_date',
 'title',
 'channel_title',
 'category_id',
 'publish_time',
 'tags',
 'views',
 'likes',
 'dislikes',
 'comment_count',
 'thumbnail_link',
 'comments_disabled',
 'ratings_disabled',
 'video_error_or_removed',
 'description']

In [15]:
from IPython.display import display, Markdown

NA_DF.printSchema()
display(Markdown("This DataFrame has **%d rows**." % NA_DF.count()))

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)
 |-- description: string (nullable = true)



This DataFrame has **137516 rows**.

In [16]:
AS_DF = spark.read.schema(NASchema).option("header", "true").option("sep", ",").csv("ASIA_youtube_data/*.csv")
AS_DF.first()

Row(video_id='kzwfHumJyYc', trending_date='17.14.11', title='Sharry Mann: Cute Munda ( Song Teaser) | Parmish Verma | Releasing on 17 November', channel_title='Lokdhun Punjabi', category_id=1, publish_time=datetime.datetime(2017, 11, 12, 13, 20, 39), tags='"sharry mann|""sharry mann new song""|""sharry mann cute munda""|""sharry mann latest song""|""sharry mann punjabi song 2017""|""parmish verma""|""parmish verma new song""|""parmish verma sharry mann""|""parmish verma sharry mann new song""|""parmish verma cute munda""|""new punjabi song 2017""|""punjabi song 2017""|""parmish verma new song 2017""|""parmish verma latest song 2017""|""punjabi songs 2017"""', views=1096327, likes=33966, dislikes=798, comment_count=882, thumbnail_link='https://i.ytimg.com/vi/kzwfHumJyYc/default.jpg', comments_disabled=False, ratings_disabled=False, video_error_or_removed=False, description='Presenting Sharry Mann latest Punjabi Song  Cute Munda Teaser . The music of new punjabi song is given by Gift Rul

In [17]:
EU_DF = spark.read.schema(NASchema).option("header", "true").option("sep", ",").csv("EU/*.csv")
EU_DF.first()

Row(video_id='Ro6eob0LrCY', trending_date='17.14.11', title='Malika LePen : Femme de Gauche - Trailer', channel_title='Le Raptor Dissident', category_id=24, publish_time=datetime.datetime(2017, 11, 13, 18, 32, 55), tags='"Raptor""|""Dissident""|""Expliquez""|""moi""|""cette""|""merde"', views=212702, likes=29282, dislikes=1108, comment_count=3817, thumbnail_link='https://i.ytimg.com/vi/Ro6eob0LrCY/default.jpg', comments_disabled=False, ratings_disabled=False, video_error_or_removed=False, description="Dimanche.\\n18h30.\\nSoyez présents pour la vidéo la plus réalistiquement haineuse du YouTube Game.\\n\\nStrike sous 24h garanti.\\n\\nMontage de NastyTanooki : https://www.youtube.com/channel/UCJvYnKvJZSIz-2deI7Xppww\\n\\n------------------------------------------------------------------------------------------------\\n\\nRAPTORCOACHING PRO : raptorcoachingpro@gmail.com\\n\\nSi tu pratiques la musculation et que tu es intéressé(e) par un Programme/Diète Personnalisés + Suivi, envoie nous

In [18]:
NA_NUM_DF= NA_DF[['views', 'likes', 'dislikes','comment_count']]
NA_NUM_DF.summary().show()

[Stage 19:>                                                         (0 + 4) / 4]

+-------+-----------------+------------------+------------------+------------------+
|summary|            views|             likes|          dislikes|     comment_count|
+-------+-----------------+------------------+------------------+------------------+
|  count|           122281|            122281|            122281|            122281|
|   mean|1287309.052223976| 43350.59626597755|2161.7377924616253|   5189.3286773906|
| stddev|4879842.267669742|161800.71685038836| 21075.12560555939|26380.843898437804|
|    min|              157|                 0|                 0|                 0|
|    25%|            67952|              1144|                59|               190|
|    50%|           283305|              7123|               257|               926|
|    75%|           928038|             28177|               965|              3237|
|    max|        225211923|           5613827|           1674420|           1361580|
+-------+-----------------+------------------+------------------+

                                                                                

In [19]:
NA_PD = NA_DF.toPandas()

                                                                                

# North America Statistics

In [68]:
NA_DF.describe('likes', 'views', 'dislikes', 'comment_count').show()

+-------+------------------+-----------------+------------------+------------------+
|summary|             likes|            views|          dislikes|     comment_count|
+-------+------------------+-----------------+------------------+------------------+
|  count|            122281|           122281|            122281|            122281|
|   mean| 43350.59626597755|1287309.052223976|2161.7377924616253|   5189.3286773906|
| stddev|161800.71685038836|4879842.267669742| 21075.12560555939|26380.843898437804|
|    min|                 0|              157|                 0|                 0|
|    max|           5613827|        225211923|           1674420|           1361580|
+-------+------------------+-----------------+------------------+------------------+



                                                                                

# EUROPE statistics

In [69]:
EU_DF.describe('likes', 'views', 'dislikes', 'comment_count').show()

+-------+-----------------+------------------+-----------------+------------------+
|summary|            likes|             views|         dislikes|     comment_count|
+-------+-----------------+------------------+-----------------+------------------+
|  count|            79640|             79640|            79640|             79640|
|   mean| 74624.6488448016|  3103592.36084882|4136.601104972376| 7332.636300853842|
| stddev|259175.3514812264|1.36222260989259E7|36697.65666761797|37241.603765286694|
|    min|                0|               223|                0|                 0|
|    max|          5613827|         424538912|          1944971|           1626501|
+-------+-----------------+------------------+-----------------+------------------+



# ASIA statistics

In [72]:
AS_stats = AS_DF.describe('likes', 'views', 'dislikes', 'comment_count')
AS_stats.show()

+-------+------------------+------------------+------------------+------------------+
|summary|             likes|             views|          dislikes|     comment_count|
+-------+------------------+------------------+------------------+------------------+
|  count|             57873|             57873|             57873|             57873|
|   mean|20337.620323812487|  777363.601627702|1204.7476543465864|2151.9194961380954|
| stddev|  93095.2819554454|2699519.1951408307|13026.639994488636|14912.022999546438|
|    min|                 0|               798|                 0|                 0|
|    max|           4470923|         125432237|           1545017|            905925|
+-------+------------------+------------------+------------------+------------------+



In [58]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit


print ("NORTH AMERICA Summary of columns views, likes, dislikes, comment_count")
NA_DF.select('views', 'likes', 'dislikes','comment_count').summary().show()

print ("EUROPE Summary of columns views, likes, dislikes, comment_count")
EU_DF.select('views', 'likes', 'dislikes','comment_count').summary().show()

print ("ASIA Summary of columns views, likes, dislikes, comment_count")
AS_DF.select('views', 'likes', 'dislikes','comment_count').summary().show()


NA_CAT_DF = NA_DF.groupBy("category_id").agg(count(lit(1)).alias("Total"))

EU_CAT_DF = EU_DF.groupBy("category_id").agg(count(lit(1)).alias("Total"))

AS_CAT_DF = AS_DF.groupBy("category_id").agg(count(lit(1)).alias("Total"))


print ("Most VIDEOS PER Category in NA:")
NA_CAT_DF.orderBy(col("Total").desc()).show()

print("Most VIDEOS PER Category  in EU:")
EU_CAT_DF.orderBy(col("Total").desc()).show()

print("Most VIDEOS PER Category  in ASIA:")
AS_CAT_DF.orderBy(col("Total").desc()).show()


NORTH AMERICA Summary of columns views, likes, dislikes, comment_count


                                                                                

+-------+-----------------+------------------+------------------+------------------+
|summary|            views|             likes|          dislikes|     comment_count|
+-------+-----------------+------------------+------------------+------------------+
|  count|           122281|            122281|            122281|            122281|
|   mean|1287309.052223976| 43350.59626597755|2161.7377924616253|   5189.3286773906|
| stddev|4879842.267669742|161800.71685038836| 21075.12560555939|26380.843898437804|
|    min|              157|                 0|                 0|                 0|
|    25%|            67952|              1144|                59|               190|
|    50%|           283305|              7123|               257|               926|
|    75%|           928038|             28177|               965|              3237|
|    max|        225211923|           5613827|           1674420|           1361580|
+-------+-----------------+------------------+------------------+

In [59]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit


print ("NORTH AMERICA Summary of columns views, likes, dislikes, comment_count")
NA_DF.select('views', 'likes', 'dislikes','comment_count').summary().show()

print ("EUROPE Summary of columns views, likes, dislikes, comment_count")
EU_DF.select('views', 'likes', 'dislikes','comment_count').summary().show()

print ("ASIA Summary of columns views, likes, dislikes, comment_count")
AS_DF.select('views', 'likes', 'dislikes','comment_count').summary().show()

print("Most views per Category in NA:")
NA_CATviews_DF = NA_DF.groupBy("category_id")
NA_VIEWS = NA_CATviews_DF.agg({'views': 'sum'})
NA_VIEWS.orderBy(col('sum(views)').desc()).show()

print("Most views per Category in EU:")
EU_CATviews_DF = EU_DF.groupBy("category_id")
EU_VIEWS = EU_CATviews_DF.agg({'views': 'sum'})
EU_VIEWS.orderBy(col('sum(views)').desc()).show()

print("Most views per Category in ASIA:")
AS_CATviews_DF = AS_DF.groupBy("category_id")
AS_VIEWS = AS_CATviews_DF.agg({'views': 'sum'})
AS_VIEWS.orderBy(col('sum(views)').desc()).show()










NORTH AMERICA Summary of columns views, likes, dislikes, comment_count


                                                                                

+-------+-----------------+------------------+------------------+------------------+
|summary|            views|             likes|          dislikes|     comment_count|
+-------+-----------------+------------------+------------------+------------------+
|  count|           122281|            122281|            122281|            122281|
|   mean|1287309.052223976| 43350.59626597755|2161.7377924616253|   5189.3286773906|
| stddev|4879842.267669742|161800.71685038836| 21075.12560555939|26380.843898437804|
|    min|              157|                 0|                 0|                 0|
|    25%|            67952|              1144|                59|               190|
|    50%|           283305|              7123|               257|               926|
|    75%|           928038|             28177|               965|              3237|
|    max|        225211923|           5613827|           1674420|           1361580|
+-------+-----------------+------------------+------------------+

                                                                                

+-----------+-----------+
|category_id| sum(views)|
+-----------+-----------+
|         10|57472455667|
|         24|38303178040|
|          1|10685137184|
|         23| 9620512481|
|         22| 9374921560|
|         17| 8791446839|
|         26| 6114979104|
|         28| 5115710691|
|         20| 3804712783|
|         25| 3579824789|
|         27| 1755608642|
|         15| 1045873184|
|          2|  769947652|
|         19|  502717086|
|         29|  336176021|
|         43|  123116002|
|         30|   17120490|
|       null|       null|
+-----------+-----------+

Most views per Category in EU:
+-----------+------------+
|category_id|  sum(views)|
+-----------+------------+
|         10|176187312291|
|         24| 33565783270|
|          1|  9281224644|
|         22|  7035131362|
|         23|  5858346792|
|         17|  5338547233|
|         20|  2193779708|
|         28|  2103380550|
|         25|  1904176938|
|         26|  1658499640|
|         15|   584659814|
|         27|   56

In [60]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit

print("Most comments per Category in NA:")
NA_CAT_COM = NA_DF.groupBy("category_id")
NA_COMMENT = NA_CAT_COM.agg({'comment_count': 'sum'})
NA_COMMENT.orderBy(col('sum(comment_count)').desc()).show()

print("Most comments per Category in EU:")
EU_CAT_COM = EU_DF.groupBy("category_id")
EU_COMMENT = EU_CAT_COM.agg({'comment_count': 'sum'})
EU_COMMENT.orderBy(col('sum(comment_count)').desc()).show()

print("Most comments per Category in ASIA:")
AS_CAT_COM = AS_DF.groupBy("category_id")
AS_COMMENT = AS_CAT_COM.agg({'comment_count': 'sum'})
AS_COMMENT.orderBy(col('sum(comment_count)').desc()).show()

Most comments per Category in NA:


                                                                                

+-----------+------------------+
|category_id|sum(comment_count)|
+-----------+------------------+
|         10|         199476213|
|         24|         158568420|
|         23|          49022945|
|         22|          48901087|
|         26|          35174605|
|          1|          30021806|
|         20|          27292905|
|         17|          22059593|
|         28|          20080654|
|         25|          17627896|
|         29|           8992924|
|         27|           8706312|
|         15|           4650304|
|          2|           2168744|
|         19|           1625222|
|         43|            172691|
|         30|             13979|
|       null|              null|
+-----------+------------------+

Most comments per Category in EU:
+-----------+------------------+
|category_id|sum(comment_count)|
+-----------+------------------+
|         10|         318459429|
|         24|         132627364|
|         23|          25855630|
|         22|          22817967|
|       

In [61]:
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit


print("Most dislikes per Category in NA:")
NA_CAT_DIS = NA_DF.groupBy("category_id")
NA_DISLIKE = NA_CAT_DIS.agg({'dislikes': 'sum'})
NA_DISLIKE.orderBy(col('sum(dislikes)').desc()).show()

print("Most dislikes per Category in EU:")
EU_CAT_DIS = EU_DF.groupBy("category_id")
EU_DISLIKE = EU_CAT_DIS.agg({'dislikes': 'sum'})
EU_DISLIKE.orderBy(col('sum(dislikes)').desc()).show()

print("Most dislikes per Category in ASIA:")
AS_CAT_DIS = AS_DF.groupBy("category_id")
AS_DISLIKE = AS_CAT_DIS.agg({'dislikes': 'sum'})
AS_DISLIKE.orderBy(col('sum(dislikes)').desc()).show()

Most dislikes per Category in NA:


                                                                                

+-----------+-------------+
|category_id|sum(dislikes)|
+-----------+-------------+
|         10|     81234533|
|         24|     79918306|
|         22|     22888255|
|         23|     15003594|
|         20|     12569587|
|          1|      9703060|
|         17|      9699371|
|         25|      8401469|
|         26|      7996781|
|         28|      6615417|
|         29|      6141769|
|         27|      2212828|
|         15|       776095|
|          2|       524538|
|         19|       496541|
|         43|       145035|
|         30|        12280|
|       null|         null|
+-----------+-------------+

Most dislikes per Category in EU:
+-----------+-------------+
|category_id|sum(dislikes)|
+-----------+-------------+
|         10|    169142554|
|         24|     98457729|
|         22|     13763281|
|         17|     12060511|
|         23|      8021809|
|          1|      7285678|
|         29|      6667967|
|         25|      4556217|
|         20|      3401916|
|         26|