# PySpark

In [168]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pyspark").getOrCreate()
spark

## PySpark DF Immutability

In [169]:
# creating a sample dataframe
names = spark.createDataFrame([('a', 'b'), ('c', 'd')], ['first_name', 'last_name'])
names.show()

+----------+---------+
|first_name|last_name|
+----------+---------+
|         a|        b|
|         c|        d|
+----------+---------+



In [170]:
# showing rdd id
names.rdd.id()

407

### Adding Columns

In [171]:
# adding new column
from pyspark.sql.functions import *

# concat first and last name
# using select
names = names.select(names.first_name, names.last_name, concat_ws(' ', names.first_name, names.last_name).alias('full_name'))
# using withColumn
names = names.withColumn('full_name', concat_ws(' ', names.first_name, names.last_name))

In [172]:
# using lit() to add values to the column
names = names.withColumn('Age', lit(23))

In [173]:
names.show()

+----------+---------+---------+---+
|first_name|last_name|full_name|Age|
+----------+---------+---------+---+
|         a|        b|      a b| 23|
|         c|        d|      c d| 23|
+----------+---------+---------+---+



In [174]:
# rdd id
# manipulating the dataframe changes the rdd id - IMMUTABILITY!
names.rdd.id()

413

PySpark RDD/DataFrame collect() is an action operation that is used to retrieve all the elements of the dataset (from all nodes) to the driver node. We should use the collect() on smaller dataset usually after filter(), group() e.t.c. Retrieving larger datasets results in OutOfMemory error.

In [175]:
names_list = names.collect()

## Loading CSV

In [214]:
videos = spark.read.csv('Datasets/youtubevideos.csv', inferSchema = True, header = True)

In [215]:
videos.limit(2).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."


### Editing DF columns datatypes

In [216]:
videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [217]:
from pyspark.sql.types import *

In [218]:
df = videos.withColumn('views', videos['views'].cast(IntegerType())) \
            .withColumn('likes', videos['likes'].cast(IntegerType())) \
            .withColumn('dislikes', videos['dislikes'].cast(IntegerType())) \
            .withColumn('trending_date', to_date(videos['trending_date'], 'yy.dd.MM')) \
            .withColumn('publish_time', to_timestamp(videos['publish_time'], "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 

In [219]:
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [220]:
df.limit(2).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2017-11-14,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13 17:13:01,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2017-11-14,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13 07:30:00,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."


## Renaming a column

In [221]:
# column 'publish_time' -> 'published_time'
df = df.withColumnRenamed('publish_time', 'published_time')

In [222]:
df.select('video_id', 'channel_title', 'published_time').show(3, False)

+-----------+---------------+-------------------+
|video_id   |channel_title  |published_time     |
+-----------+---------------+-------------------+
|2kyS6SvSYSE|CaseyNeistat   |2017-11-13 17:13:01|
|1ZAPwfrtAFY|LastWeekTonight|2017-11-13 07:30:00|
|5qpjK5DgCt4|Rudy Mancuso   |2017-11-12 19:05:24|
+-----------+---------------+-------------------+
only showing top 3 rows



## Translation operation

In [223]:
names.show()

+----------+---------+---------+---+
|first_name|last_name|full_name|Age|
+----------+---------+---------+---+
|         a|        b|      a b| 23|
|         c|        d|      c d| 23|
+----------+---------+---------+---+



In [224]:
names.select( '*', translate(col('full_name'), ' ', '_').alias('translated') ).show()

+----------+---------+---------+---+----------+
|first_name|last_name|full_name|Age|translated|
+----------+---------+---------+---+----------+
|         a|        b|      a b| 23|       a_b|
|         c|        d|      c d| 23|       c_d|
+----------+---------+---------+---+----------+



## Trim operation

In [225]:
# trims white spaces from both extremities
df.select(df.title, trim(df.title).alias('title_trimmed')).show(3, False) # ltrim/rtrim/trim

+--------------------------------------------------------------+--------------------------------------------------------------+
|title                                                         |title_trimmed                                                 |
+--------------------------------------------------------------+--------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |WE WANT TO TALK ABOUT OUR MARRIAGE                            |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |
+--------------------------------------------------------------+--------------------------------------------------------------+
only showing top 3 rows



## Lower operation

In [226]:
df.select(df.title, lower(df.title).alias('title_lowered')).show(3, False)

+--------------------------------------------------------------+--------------------------------------------------------------+
|title                                                         |title_lowered                                                 |
+--------------------------------------------------------------+--------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |we want to talk about our marriage                            |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|the trump presidency: last week tonight with john oliver (hbo)|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |racist superman | rudy mancuso, king bach & lele pons         |
+--------------------------------------------------------------+--------------------------------------------------------------+
only showing top 3 rows



In [227]:
df.limit(3).toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,published_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,2017-11-14,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13 17:13:01,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,2017-11-14,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13 07:30:00,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,2017-11-14,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12 19:05:24,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...


## CASE operation - When, Otherwise - with select/withColumn

In [228]:
df.select(df.channel_title, df.likes, df.dislikes, when(df.likes > df.dislikes, 'Good')
                                                    .when(df.likes < df.dislikes, 'Bad')
                                                    .otherwise('Undefined').alias('vid_rating')).show(3, False)

+---------------+------+--------+----------+
|channel_title  |likes |dislikes|vid_rating|
+---------------+------+--------+----------+
|CaseyNeistat   |57527 |2966    |Good      |
|LastWeekTonight|97185 |6146    |Good      |
|Rudy Mancuso   |146033|5339    |Good      |
+---------------+------+--------+----------+
only showing top 3 rows



In [229]:
df = df.withColumn('vid_rating', when(df.likes > df.dislikes, 'Good')
                                .when(df.likes < df.dislikes, 'Bad')
                                .otherwise('Undefined')).limit(3)
df.select(df.channel_title, df.likes, df.dislikes, df.vid_rating).show(3, False)

+---------------+------+--------+----------+
|channel_title  |likes |dislikes|vid_rating|
+---------------+------+--------+----------+
|CaseyNeistat   |57527 |2966    |Good      |
|LastWeekTonight|97185 |6146    |Good      |
|Rudy Mancuso   |146033|5339    |Good      |
+---------------+------+--------+----------+



In [233]:
# with SQl expression
df.select("channel_title", "likes", "dislikes", expr("CASE WHEN likes > dislikes THEN 'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undefined' END AS vid_rating")).show(3)

+---------------+------+--------+----------+
|  channel_title| likes|dislikes|vid_rating|
+---------------+------+--------+----------+
|   CaseyNeistat| 57527|    2966|      Good|
|LastWeekTonight| 97185|    6146|      Good|
|   Rudy Mancuso|146033|    5339|      Good|
+---------------+------+--------+----------+



In [236]:
# using selectExpr
df.selectExpr("channel_title", "likes", "dislikes", "CASE WHEN likes > dislikes THEN 'Good' WHEN likes < dislikes THEN 'Bad' ELSE 'Undefined' END AS vid_rating").show(3)

+---------------+------+--------+----------+
|  channel_title| likes|dislikes|vid_rating|
+---------------+------+--------+----------+
|   CaseyNeistat| 57527|    2966|      Good|
|LastWeekTonight| 97185|    6146|      Good|
|   Rudy Mancuso|146033|    5339|      Good|
+---------------+------+--------+----------+



## Extracting info from dates

In [240]:
df.select('trending_date', year('trending_date'), month('trending_date')).show(3, False)

+-------------+-------------------+--------------------+
|trending_date|year(trending_date)|month(trending_date)|
+-------------+-------------------+--------------------+
|2017-11-14   |2017               |11                  |
|2017-11-14   |2017               |11                  |
|2017-11-14   |2017               |11                  |
+-------------+-------------------+--------------------+



## Difference between dates

In [247]:
df.select('trending_date', 'published_time', datediff(df.trending_date, df.published_time).alias('days_took_for_trending')).show(5, False)

+-------------+-------------------+----------------------+
|trending_date|published_time     |days_took_for_trending|
+-------------+-------------------+----------------------+
|2017-11-14   |2017-11-13 17:13:01|1                     |
|2017-11-14   |2017-11-13 07:30:00|1                     |
|2017-11-14   |2017-11-12 19:05:24|2                     |
+-------------+-------------------+----------------------+



## Splitting strings

In [250]:
array = df.select('title', split(df.title, ' ').alias('title_split'))
array.show(3, False)

+--------------------------------------------------------------+-------------------------------------------------------------------------+
|title                                                         |title_split                                                              |
+--------------------------------------------------------------+-------------------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |[WE, WANT, TO, TALK, ABOUT, OUR, MARRIAGE]                               |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|[The, Trump, Presidency:, Last, Week, Tonight, with, John, Oliver, (HBO)]|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |[Racist, Superman, |, Rudy, Mancuso,, King, Bach, &, Lele, Pons]         |
+--------------------------------------------------------------+-------------------------------------------------------------------------+



## title_contains() operation

In [252]:
array.select('title', array_contains(array.title_split, 'WANT').alias('title_contains_WANT')).show(5, False)

+--------------------------------------------------------------+-------------------+
|title                                                         |title_contains_WANT|
+--------------------------------------------------------------+-------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |true               |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|false              |
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |false              |
+--------------------------------------------------------------+-------------------+



## array_distinct() operation

In [256]:
array.select('title', array_distinct(array.title_split).alias('distinct_items in title_split')).show(3, False)

+--------------------------------------------------------------+-------------------------------------------------------------------------+
|title                                                         |distinct_items in title_split                                            |
+--------------------------------------------------------------+-------------------------------------------------------------------------+
|WE WANT TO TALK ABOUT OUR MARRIAGE                            |[WE, WANT, TO, TALK, ABOUT, OUR, MARRIAGE]                               |
|The Trump Presidency: Last Week Tonight with John Oliver (HBO)|[The, Trump, Presidency:, Last, Week, Tonight, with, John, Oliver, (HBO)]|
|Racist Superman | Rudy Mancuso, King Bach & Lele Pons         |[Racist, Superman, |, Rudy, Mancuso,, King, Bach, &, Lele, Pons]         |
+--------------------------------------------------------------+-------------------------------------------------------------------------+

