# Video Analytic Demo

## Initialize Spark Session

In [12]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("load-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/wesley/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

### Read input

In [14]:
# Read input_1
df_video_views_csv = (
    spark.read
    .format("csv")
    .option("header", True)
    .load("/home/wesley/work/data/video_views.csv")
)

In [153]:
### Data Overview

In [152]:
df_video_views_csv.describe('user_id', 'video_id')

DataFrame[summary: string, user_id: string, video_id: string]

In [46]:
df_video_views_csv.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- watch_time: string (nullable = true)



In [79]:
import pyspark.sql.functions as F
df_video_views_csv.select(F.countDistinct("video_id")).show()

+------------------------+
|count(DISTINCT video_id)|
+------------------------+
|                   14342|
+------------------------+



In [81]:
df_video_views_csv.select(F.countDistinct("user_id")).show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                1950131|
+-----------------------+



In [58]:
# Read input_2
df_videos_json = (
    spark.read
    .format("json")
    .option("header", False)
    .load("/home/wesley/work/data/videos.json")
)

In [48]:
# df_videos_json.describe().show()
df_videos_json.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- genre: string (nullable = true)



In [44]:
df_videos_json.show()

+--------------------+--------+--------------------+
|                 _id|duration|               genre|
+--------------------+--------+--------------------+
|[57a8e043c2aeda56...|     239|                null|
|[57a8e043c2aeda56...|     208|                null|
|[57a8e070c2aeda56...|     430|                null|
|[58380e511e6fcae3...|     269|                null|
|[58a6e6cfa615ec0a...|     210|                null|
|[58b02188a4ec163b...|     187|                null|
|[5915908692c4dd0b...|     810|5d777bd7d1afd1063...|
|[591ee23992c4dd0b...|     811|                null|
|[597fea5092c4dd0b...|     166|5d777c03d1afd1063...|
|[5988151192c4dd0b...|     103|5d777c03d1afd1063...|
|[5993b9dc6d3a8dab...|      99|5d777c03d1afd1063...|
|[5995d3ab6d3a8dab...|     155|5d777c03d1afd1063...|
|[5996e9bc6d3a8dab...|     702|                null|
|[59a017296d3a8dab...|    1076|                null|
|[59a7daee6d3a8dab...|     122|5d777c03d1afd1063...|
|[59af7c056d3a8dab...|     106|5d777c03d1afd10

## Analysis

### Data enrichment

In [60]:
df_video_views_consumption_1 = df_video_views_csv
df_video_views_consumption_1 = df_video_views_csv.join(df_videos_json,
                                                    df_video_views_csv.video_id == df_videos_json._id['$oid'],
                                                    how='left')
df_video_views_consumption_1.show()


+-------------+--------------------+--------------------+----------+--------------------+--------+--------------------+
|    timestamp|             user_id|            video_id|watch_time|                 _id|duration|               genre|
+-------------+--------------------+--------------------+----------+--------------------+--------+--------------------+
|1803025809459|h7pMfBTwvVzh9g6H4...|5f59a816c26928003...|    655.36|[5f59a816c2692800...|    1596|{"$oid":"5e0074a5...|
|1803027779604|PyGvsGVYvy8sREFyw...|5f28cdae0500af003...|   1179.65|[5f28cdae0500af00...|    1426|5e9a78ac86fe16003...|
|1803027361912|GCUxXvicdcneGFDK4...|5f5b0e6fc26928003...|       0.0|[5f5b0e6fc2692800...|     713|{"$oid":"5ecb9893...|
|1803025770330|bFX5WqUkxpva4HNRo...|5f84521e24a01f003...|       0.0|[5f84521e24a01f00...|     649|{"$oid":"5d775f89...|
|1803027039604|Y4NY4sPVYNjrJCJdC...|5f5621eac26928003...|    655.36|[5f5621eac2692800...|     699|{"$oid":"5d775f89...|
|1803024679511|DvpPB5j6VhsoyokNZ...|5f7e

In [108]:
# Checking whether one user has watched a video multiple time
df_video_views_consumption_1.groupby('user_id', 'video_id').count().show()
# Notes: group by video when grouping video

+--------------------+--------------------+-----+
|             user_id|            video_id|count|
+--------------------+--------------------+-----+
|UJMTG89RzszT6jcpN...|5f3ba8054b64f5bc5...|    3|
|VzGRnfMgyWxGxLiGe...|5f5b0e6fc26928003...|    1|
|DKXh2yot2KPt8xn89...|5daf260b39ee65003...|    2|
|QWoNf3d9xxkKPhE3A...|5f7e973700ae34282...|    1|
|Q6xJwNefuJpvnDkag...|5f5b0e6fc26928003...|    1|
|QTzDjXdYqL9wu7R4h...|5f6865dfe4478183f...|    2|
|QUb3ua9am8pCZ2viJ...|5f74682c537ed5003...|    1|
|Ca86topzh8i5vEZaP...|5f5b0e6fc26928003...|    1|
|NidsQySFQmhd2TwJV...|5f68595fe4478183f...|    1|
|6yMccVaYWfPT2e6Rx...|5f7d4f0f00ae34282...|    4|
|ewMvGvQq3nsNA4hBv...|5f0576c5cc004a003...|    1|
|WWQxcCEH79tQveX6p...|5ec66dc67fe48f003...|    2|
|bsNCJtFMc4KxpRies...|5e708f380e59b3003...|    1|
|Dpy8kjcWkosqGQgpP...|5f5b0e6fc26928003...|    1|
|9JbeFGivEbyskR36Y...|5ef98b0bf6e78e159...|    1|
|2LGW6xu3szRxfgPBu...|5f451034c18e10003...|    1|
|cMadXpyPeRHWRzoQ6...|5f86cfb541504c003...|    1|


In [112]:
# Verify
df_video_views_consumption_1.sort('user_id', 'video_id','watch_time').show()

+-------------+--------------------+--------------------+----------+--------------------+--------+--------------------+
|    timestamp|             user_id|            video_id|watch_time|                 _id|duration|               genre|
+-------------+--------------------+--------------------+----------+--------------------+--------+--------------------+
|1802870943049|2226bMBtgokFtUYKr...|5f7eea6400ae34282...|       0.0|[5f7eea6400ae3428...|     539|{"$oid":"5e0074a5...|
|1803210722393|2227xrQ9UTVyRmDra...|5f5f6a7026032d003...|       0.0|[5f5f6a7026032d00...|    5317|{"$oid":"5e5779f3...|
|1803210749229|2227xrQ9UTVyRmDra...|5f633c54e4478183f...|   2228.22|[5f633c54e4478183...|    4423|{"$oid":"5e5779f3...|
|1803290754364|2227xrQ9UTVyRmDra...|5f633c54e4478183f...|   3014.66|[5f633c54e4478183...|    4423|{"$oid":"5e5779f3...|
|1803293811416|2227xrQ9UTVyRmDra...|5f633d49e4478183f...|   1179.65|[5f633d49e4478183...|    4567|{"$oid":"5e5779f3...|
|1803367940477|2227xrQ9UTVyRmDra...|5f63

### Create view for further usages

In [68]:
df_video_views_consumption_1.createOrReplaceTempView("video_views")

### 1. Calculate view percentage of every view

In [75]:
spark.sql("SELECT ROUND(SUM(watch_time) / SUM(duration) * 100, 2) AS view_percentage FROM video_views").show()

+---------------+
|view_percentage|
+---------------+
|          46.56|
+---------------+



### 2. Top 200 videos which satisfy the following conditions

In [77]:
# 2.1 Construct output
spark.sql("SELECT * FROM (\
                        SELECT video_id, COUNT(user_id) AS user_counts,ROUND(SUM(watch_time) / SUM(duration) * 100) AS average_view_percentage\
                        FROM video_views GROUP BY video_id) AS a\
                        WHERE a.user_counts > 100 ORDER BY a.average_view_percentage DESC LIMIT 200").show()

+--------------------+-----------+-----------------------+
|            video_id|user_counts|average_view_percentage|
+--------------------+-----------+-----------------------+
|5f59dae1c26928003...|      11236|                 3923.0|
|5f44e7d830d28c003...|      49995|                  830.0|
|5f48a40b42243d003...|       7362|                  621.0|
|5e86ade1d9056d004...|        278|                  449.0|
|5f96d887ba9c7e003...|       1834|                  258.0|
|5f9ba847f2a010003...|       2042|                  241.0|
|5fa376bbf2a010003...|        223|                  215.0|
|5fa923fc2b1bd2003...|       2264|                  172.0|
|5f8436d524a01f003...|       6047|                  135.0|
|5f8fc29241504c003...|       9055|                  135.0|
|5f746351537ed5003...|      92859|                  125.0|
|5f50b80e2ef15c003...|        235|                   96.0|
|5e8d375c9df89f003...|        199|                   94.0|
|5e4b9d57c14d65003...|        382|                   93.

In [96]:
# 2.2 Save output as file
top200df= spark.sql("SELECT * FROM (\
                                    SELECT video_id, COUNT(user_id) AS user_counts,ROUND(SUM(watch_time) / SUM(duration) * 100, 2) AS average_view_percentage\
                                    FROM video_views GROUP BY video_id) AS a\
                                    WHERE a.user_counts > 100 ORDER BY a.average_view_percentage DESC LIMIT 200")
top200df.write.csv('top_200_videos.csv')

### 3. Most recent watched videos (20 or less) of every user

In [149]:
recent_watched_video_df = spark.sql(" SELECT user_id, video_id, MAX(rank) AS watching_counts\
                                            FROM (\
                                                SELECT *, RANK() OVER(PARTITION BY user_id, video_id ORDER BY timestamp DESC) AS rank\
                                                FROM video_views) AS a\
                                            WHERE rank < 20\
                                            GROUP BY user_id, video_id").show()

+--------------------+--------------------+---------------+
|             user_id|            video_id|watching_counts|
+--------------------+--------------------+---------------+
|222DHnyewx9ot7FD5...|5f8687cf48437a003...|              2|
|227NevTwrsDj5Ha9a...|5f0c0c2dcc004a003...|              1|
|22C7kMueBz2i6uQpv...|5f7d870f00ae34282...|              3|
|22C7kMueBz2i6uQpv...|5f8687cf48437a003...|              1|
|22DTsZc5cpAToDikD...|5eccd30b0d921c003...|              1|
|22FLynEU9gcSoUizj...|5f86858548437a003...|              1|
|22HBuXYLLMxGjE6xg...|5eb3dc23f1ce58003...|              1|
|22HkjkFGqFQ5zUner...|5f5b0e6fc26928003...|              1|
|22RtFjzCkqGb7jRnj...|5f71d93c537ed5003...|              2|
|22SZULB5kLi4pSUxT...|5fabab982b1bd2003...|              1|
|22Ug5S2fuyYogUFyq...|5f5a4224c26928003...|              1|
|22XGHpyJ8hKDcavUL...|5e82fd1ad9a813003...|              2|
|22ZqJBSgYUZbzdfhi...|5e2006e3518bf0003...|              1|
|22ecHE8Cyz6Usfz4c...|5e75cf966aa2cb003.

### 4. Find most recent watched genres (5 or less) of every user

In [150]:
recent_watched_geners_df = spark.sql(" SELECT user_id, genre, MAX(rank) AS watching_counts\
                                            FROM (\
                                                SELECT *, RANK() OVER(PARTITION BY user_id, genre ORDER BY timestamp DESC) AS rank\
                                                FROM video_views) AS a\
                                            WHERE rank < 5\
                                            GROUP BY user_id, genre").show()

+--------------------+--------------------+---------------+
|             user_id|               genre|watching_counts|
+--------------------+--------------------+---------------+
|2256t5bSDMpgVSSDq...|{"$oid":"5e5779f3...|              4|
|225QtQSQEEkSGm5o2...|{"$oid":"5d43d9e4...|              1|
|22FLynEU9gcSoUizj...|{"$oid":"5e0074a5...|              1|
|22W3PknVf5PtFwggJ...|5e0074a524fa4c003...|              1|
|22bC5TQ4xMr8EdBrC...|{"$oid":"5e0074a5...|              4|
|22fE25AiFNsSHgHv7...|{"$oid":"5e5779f3...|              2|
|22wL3zcRQuSfhtjJs...|{"$oid":"5e5779f3...|              4|
|22zbBiPxgDhePxowF...|{"$oid":"5d775f89...|              1|
|23EWpzqKuzhjkhXrj...|{"$oid":"5ecb9893...|              1|
|23QWa7ag6EjB4FoE8...|5d775f89d1afd1063...|              1|
|24BXxX6VnpNUKY3ih...|{"$oid":"5e0074a5...|              4|
|24EPR5DahmM3d7QNc...|{"$oid":"5ecb9893...|              3|
|258qtGXnRMJm4iHM8...|{"$oid":"5e9a78ac...|              1|
|25EfH6YG4D6uyN84X...|                  