# LAB07: Answering Business Question

In [None]:
%pip install pyspark

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Answering Business Question').getOrCreate()

### Read youtube dataset

In [None]:
#https://www.dropbox.com/s/wk8v3ly5mclsj5d/stg_youtube_1K.csv
#https://www.dropbox.com/s/i2rjdae18k8fd0j/stg_youtube.csv 

In [None]:
!mkdir -p dataset

In [None]:
!wget -P dataset https://www.dropbox.com/s/i2rjdae18k8fd0j/stg_youtube.csv 

In [None]:
! ls -lah dataset

In [None]:
! head dataset/stg_youtube.csv

In [None]:
from pyspark.sql.types import *

file_path = "dataset/stg_youtube.csv"
youtube_schema=StructType([
    StructField("video_id", StringType(), True),
    StructField("trending_date", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("channel_title", StringType(), True),
    StructField("category_id", IntegerType(), True),
    StructField("publish_time", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("views", IntegerType(), True),
    StructField("likes", IntegerType(), True),
    StructField("dislikes", IntegerType(), True),
    StructField("comment_count", IntegerType(), True)
])

df_stg_youtube = spark.read \
 .option("delimiter", "\t") \
 .schema(youtube_schema) \
 .option("inferSchema", "True") \
 .csv(file_path)

In [None]:
print ("Total Records: " + str( df_stg_youtube.count()))
df_stg_youtube.show(3, truncate=False)

### Read video_category

In [None]:
!wget -P dataset https://www.dropbox.com/s/b7wut6mj9wpfyi1/video_category.dat

In [None]:
! ls -lah dataset

In [None]:
! head dataset/video_category.dat

In [None]:
! wc -l dataset/video_category.dat

In [None]:
file_path = "dataset/video_category.dat"
youtube_vd_cat_schema=StructType([
    StructField("category_id", IntegerType(), True),
    StructField("category_name", StringType(), True),
])


youtube_video_cat = spark.read \
 .option("delimiter", "|") \
 .schema(youtube_vd_cat_schema) \
 .option("inferSchema", "True") \
 .csv(file_path)

print ("Total Records: " + str( youtube_video_cat.count()))
youtube_video_cat.show(5, truncate=True)

### JOIN stg_youtube and video_category

In [None]:
df_stg_youtube.printSchema()

In [None]:
youtube = df_stg_youtube.join(youtube_video_cat, on=['category_id'], how='inner')

In [None]:
df_stg_youtube.count()

In [None]:
youtube.count()

In [None]:
youtube.show(5)

In [None]:
print ("Total Records: " + str( youtube.count()))
print("Number of Columns: " + str(len(youtube.columns)))

In [None]:
youtube.select("trending_date", "video_id", "channel_title").show(5)

In [None]:
youtube.createOrReplaceTempView("youtube")

### AGGREGATE

In [None]:
spark.sql("\
select video_id, max(title), max(category_name), count(*) num_days, max(views) views, max(likes) likes, max(dislikes) dislikes, max(comment_count) num_comment from youtube group by video_id limit 5\
").show()

In [None]:
video = spark.sql("\
select video_id, max(title) title, max(category_name) category, count(*) num_days, max(views) views, max(likes) likes, max(dislikes) dislikes, max(comment_count) num_comment from youtube group by video_id \
")

In [None]:
video.createOrReplaceTempView("video")

### Top 10 Most Viewed Video

In [None]:
spark.sql("\
...
").show(truncate = False)

### Num max days trending

In [None]:
spark.sql("\
...
").show(truncate = False)

### Top 10 most commented video

In [None]:
spark.sql("\
select title, category, num_comment from video order by num_comment desc limit 10 \
").show(truncate = False)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

video \
  .orderBy('num_comment', ascending=False) \
  .select(col("title"), col("category"), col("num_comment")) \
  .show(10, truncate = False)

In [None]:
video \
  .orderBy('num_comment', ascending=False) \
  .select(col("title"), col("category"), col("num_comment")) \
  .toPandas().head(10)

### Top 5 most viewed video

In [None]:
spark.sql("\
select category, sum(views) total_view, avg(views) avg_view, count(1) num_video from video \
group by category \
order by total_view desc limit 5 \
").show(truncate = False)

In [None]:
video \
  .groupBy("category") \
  .agg( \
      F.sum(col('views')).alias('total_views'), \
      F.avg(col('views')).alias('avg_views'), \
      F.count(F.lit(1)).alias('num_video') \
      ) \
  .orderBy('total_views', ascending=False).show()

### Data Visualization in Seaborn

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

result_pd_df = spark.sql(" \
select category, count(*) num_video from video \
group by category \
order by num_video desc \
").toPandas()

sns.set_style("whitegrid")
ax = sns.barplot(y='category',x='num_video', \
                 data=result_pd_df, \
                 orient='h' \
                )
plt.xlabel("Number of Videos")
plt.ylabel("Categories")
plt.title("Catogories of trend videos")

In [None]:
df2 = spark.sql("SELECT category, sum(likes) as total_likes, \
                             sum(dislikes) as total_dislikes, \
                             (sum(likes)/sum(dislikes)) as ratio \
                      FROM video \
                      GROUP BY category \
                      ORDER BY ratio DESC" \
                    )
                    
result_pd_df = df2.toPandas()
result_pd_df.head(20)

In [None]:
sns.set_style("whitegrid")
ax = sns.barplot(y='category',x='ratio', \
                 data=result_pd_df, \
                 orient='h' \
                )
plt.xlabel("Ratio")
plt.ylabel("Categories")
plt.title("Ratio like vs dislike video")