In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DataManipulation').getOrCreate()
spark

In [2]:
videos = (spark.read
          .format('csv')
          .option('inferSchema', 'true')
          .option('header', 'true')
          .load('./data/youtubevideos.csv'))

In [3]:
videos.limit(2).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."


In [4]:
videos.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [5]:
from pyspark.sql.types import (DataType, NullType, StringType, BinaryType,
                               BooleanType, DateType, TimestampType, DecimalType,
                               DoubleType, FloatType, ByteType, IntegerType, LongType,
                               ShortType, ArrayType, MapType, StructField, StructType)

from pyspark.sql.functions import to_date, to_timestamp, regexp_replace

In [6]:
videos.columns

['video_id',
 'trending_date',
 'title',
 'channel_title',
 'category_id',
 'publish_time',
 'tags',
 'views',
 'likes',
 'dislikes',
 'comment_count',
 'thumbnail_link',
 'comments_disabled',
 'ratings_disabled',
 'video_error_or_removed',
 'description']

In [7]:
df = (videos.withColumn('views', videos['views'].cast(IntegerType()))
            .withColumn('likes', videos['likes'].cast(IntegerType()))
            .withColumn('dislikes', videos['dislikes'].cast(IntegerType()))
            .withColumn('trending_date', to_date(videos['trending_date'], 'yy.dd.mm'))
     )

In [8]:
videos.select('publish_time').show(5, False)

+------------------------+
|publish_time            |
+------------------------+
|2017-11-13T17:13:01.000Z|
|2017-11-13T07:30:00.000Z|
|2017-11-12T19:05:24.000Z|
|2017-11-13T11:00:04.000Z|
|2017-11-12T18:01:41.000Z|
+------------------------+
only showing top 5 rows



In [9]:
df = df.withColumn('publish_time_2', regexp_replace(df['publish_time'], 'T', ' '))

In [10]:
df = df.withColumn('publish_time_2', regexp_replace(df['publish_time_2'], 'Z', ''))

In [13]:
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publish_time_2: string (nullable = true)



In [15]:
df = df.withColumn('publish_time_3', to_timestamp(df['publish_time_2'], 'yyyy-MM-dd HH:mm:ss.SSS'))

In [16]:
df.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)
 |-- publish_time_2: string (nullable = true)
 |-- publish_time_3: timestamp (nullable = true)



In [18]:
df.select('publish_time', 'publish_time_2', 'publish_time_3').show(5, False)

+------------------------+-----------------------+-------------------+
|publish_time            |publish_time_2         |publish_time_3     |
+------------------------+-----------------------+-------------------+
|2017-11-13T17:13:01.000Z|2017-11-13 17:13:01.000|2017-11-13 17:13:01|
|2017-11-13T07:30:00.000Z|2017-11-13 07:30:00.000|2017-11-13 07:30:00|
|2017-11-12T19:05:24.000Z|2017-11-12 19:05:24.000|2017-11-12 19:05:24|
|2017-11-13T11:00:04.000Z|2017-11-13 11:00:04.000|2017-11-13 11:00:04|
|2017-11-12T18:01:41.000Z|2017-11-12 18:01:41.000|2017-11-12 18:01:41|
+------------------------+-----------------------+-------------------+
only showing top 5 rows

