In [115]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MyApp").getOrCreate()

# Set the desired configuration property
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

24/03/23 13:22:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [1]:
youtube_videos_df = spark.read.json("gs://youtube-data-analysis/cleansed_raw_data_statistics/")

                                                                                

In [2]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

def filename_new(s):
    return (s.rsplit('/', 1)[-1][:2]).strip()
udf_filename = F.udf(lambda z: filename_new(z))
youtube_videos_df = youtube_videos_df.withColumn("Region",udf_filename(F.input_file_name()))

In [6]:
youtube_videos_df.select("id","snippet.title").distinct().show(truncate = False)

[Stage 6:>                                                          (0 + 2) / 2]

+---+--------------------+
|id |title               |
+---+--------------------+
|28 |Science & Technology|
|24 |Entertainment       |
|19 |Travel & Events     |
|1  |Film & Animation    |
|40 |Sci-Fi/Fantasy      |
|18 |Short Movies        |
|20 |Gaming              |
|35 |Documentary         |
|41 |Thriller            |
|25 |News & Politics     |
|33 |Classics            |
|38 |Foreign             |
|10 |Music               |
|36 |Drama               |
|21 |Videoblogging       |
|44 |Trailers            |
|31 |Anime/Animation     |
|39 |Horror              |
|27 |Education           |
|23 |Comedy              |
+---+--------------------+
only showing top 20 rows



                                                                                

In [9]:
youtube_videos_df.groupBy("Region").agg(F.count("etag").alias("cnt")).show()



+------+---+
|Region|cnt|
+------+---+
|    CA| 31|
|    GB| 31|
|    DE| 31|
|    US| 32|
|    FR| 31|
|    MX| 31|
|    RU| 31|
|    KR| 31|
|    IN| 31|
|    JP| 31|
+------+---+



                                                                                

In [78]:
youtube_videos_df.printSchema()

root
 |-- etag: string (nullable = true)
 |-- id: string (nullable = true)
 |-- kind: string (nullable = true)
 |-- snippet: struct (nullable = true)
 |    |-- assignable: boolean (nullable = true)
 |    |-- channelId: string (nullable = true)
 |    |-- title: string (nullable = true)
 |-- Region: string (nullable = true)



In [79]:
youtube_videos_df = youtube_videos_df.select("id","snippet.assignable","snippet.channelId","snippet.title","Region")

In [80]:
youtube_videos_df.show()

[Stage 38:>                                                         (0 + 1) / 1]

+---+----------+--------------------+--------------------+------+
| id|assignable|           channelId|               title|Region|
+---+----------+--------------------+--------------------+------+
|  1|      true|UCBR8-60-B28hp2Bm...|    Film & Animation|    US|
|  2|      true|UCBR8-60-B28hp2Bm...|    Autos & Vehicles|    US|
| 10|      true|UCBR8-60-B28hp2Bm...|               Music|    US|
| 15|      true|UCBR8-60-B28hp2Bm...|      Pets & Animals|    US|
| 17|      true|UCBR8-60-B28hp2Bm...|              Sports|    US|
| 18|     false|UCBR8-60-B28hp2Bm...|        Short Movies|    US|
| 19|      true|UCBR8-60-B28hp2Bm...|     Travel & Events|    US|
| 20|      true|UCBR8-60-B28hp2Bm...|              Gaming|    US|
| 21|     false|UCBR8-60-B28hp2Bm...|       Videoblogging|    US|
| 22|      true|UCBR8-60-B28hp2Bm...|      People & Blogs|    US|
| 23|      true|UCBR8-60-B28hp2Bm...|              Comedy|    US|
| 24|      true|UCBR8-60-B28hp2Bm...|       Entertainment|    US|
| 25|     

                                                                                

In [29]:
video_info_df = spark.read.csv("gs://youtube-data-analysis/Video_info",header = True)

In [30]:
video_info_df.printSchema

<bound method DataFrame.printSchema of DataFrame[video_id: string, trending_date: string, title: string, channel_title: string, category_id: string, publish_time: string, tags: string, views: string, likes: string, dislikes: string, comment_count: string, thumbnail_link: string, comments_disabled: string, ratings_disabled: string, video_error_or_removed: string, description: string]>

In [60]:
import pyspark.sql.functions as F
video_info_df = video_info_df.select("video_id",F.to_date(video_info_df['trending_date'],'yy.dd.mm').alias("trending_date"),
"title", "channel_title", "category_id", F.to_timestamp("publish_time").alias("publish_time"),"tags", 
"views", "likes", "dislikes", "comment_count", "thumbnail_link",
"comments_disabled", "ratings_disabled", "video_error_or_removed", "description")

In [62]:
video_info_df.printSchema

<bound method DataFrame.printSchema of DataFrame[video_id: string, trending_date: date, title: string, channel_title: string, category_id: string, publish_time: timestamp, tags: string, views: string, likes: string, dislikes: string, comment_count: string, thumbnail_link: string, comments_disabled: string, ratings_disabled: string, video_error_or_removed: string, description: string]>

In [65]:
video_info_df = video_info_df.selectExpr("video_id", "trending_date", "title", "channel_title","category_id", "publish_time", "tags",
                         "cast(views as int) views", "cast(likes as int) likes", "cast(dislikes as int) dislikes", 
                         "cast(comment_count as int) comment_count", "thumbnail_link", "cast(comments_disabled as boolean) comments_disabled", 
                         "cast(ratings_disabled as boolean) ratings_disabled",
                         "cast(video_error_or_removed as boolean) as video_error_or_removed", "description")

In [70]:
video_info_df = video_info_df.withColumn("Region",udf_filename(F.input_file_name()))

In [72]:
video_info_df.select("Region").distinct().show()



+------+
|Region|
+------+
|    RU|
|    CA|
|    DE|
|    US|
|    IN|
|    GB|
|    FR|
|    MX|
|    KR|
|    JP|
+------+



                                                                                

In [90]:
joined_df = video_info_df.join(youtube_videos_df,(video_info_df['category_id'] == youtube_videos_df['id']) &
                   (video_info_df['Region'] == youtube_videos_df['Region']),'left').select("id","assignable","channelId",video_info_df["title"],video_info_df["Region"],
"trending_date", "channel_title", "category_id", "publish_time", "tags", 
"views", "likes", "dislikes", "comment_count", "thumbnail_link", "comments_disabled",
"ratings_disabled", "video_error_or_removed", "description")                                                                                          
                                                                                           
                                                                                           
                                                                                           
                                                                                           
                                                                                           

In [109]:
joined_df.printSchema

<bound method DataFrame.printSchema of DataFrame[id: string, assignable: boolean, channelId: string, title: string, Region: string, trending_date: date, channel_title: string, category_id: string, publish_time: timestamp, tags: string, views: int, likes: int, dislikes: int, comment_count: int, thumbnail_link: string, comments_disabled: boolean, ratings_disabled: boolean, video_error_or_removed: boolean, description: string]>

In [117]:
joined_df.write.partitionBy("trending_date").format('bigquery').\
option('table', 'youtube_data_analysis.youtube_videos') \
.option("temporaryGcsBucket", "dataproc-staging-us-central1-771678782511-36ot36l2")\
.option("mode", "dropmalformed").mode("append").save()

                                                                                

In [10]:
youtube_videos_df.select(F.col("id").alias("category_id"),F.col("snippet.title").alias("category")).distinct().write.format('bigquery').\
option('table', 'youtube_data_analysis.video_category_info') \
.option("temporaryGcsBucket", "dataproc-staging-us-central1-771678782511-36ot36l2")\
.option("mode", "dropmalformed").mode("append").save()

                                                                                