In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType, ArrayType

In [2]:
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

In [3]:
video = './youtube_data/video/*.csv'
category = './youtube_data/category/*.json'

In [4]:
schema_category = StructType([
    StructField('etag', StringType(), True),
    StructField('items', ArrayType(
            StructType([
                StructField('etag', StringType(), True),
                StructField('id', StringType(), True),
                StructField('kind', StringType(), True),
                StructField('snippet', StructType([
                    StructField('assignable', BooleanType(), True),
                    StructField('channelId', StringType(), True),
                    StructField('title', StringType(), True)
                    ])
                ),
            ])
    )),
    StructField('kind', StringType(), True)
])

In [5]:
df_category = spark.read\
                .option('multiline', 'true')\
                .schema(schema_category)\
                .json(category)

In [6]:
df_category.printSchema()

root
 |-- etag: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- etag: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- kind: string (nullable = true)
 |    |    |-- snippet: struct (nullable = true)
 |    |    |    |-- assignable: boolean (nullable = true)
 |    |    |    |-- channelId: string (nullable = true)
 |    |    |    |-- title: string (nullable = true)
 |-- kind: string (nullable = true)



In [7]:
df_category.show()

+--------------------+--------------------+--------------------+
|                etag|               items|                kind|
+--------------------+--------------------+--------------------+
|"m2yskBQFythfE4ir...|[["m2yskBQFythfE4...|youtube#videoCate...|
|"XI7nbFXulYBIpL0a...|[["XI7nbFXulYBIpL...|youtube#videoCate...|
|"XI7nbFXulYBIpL0a...|[["XI7nbFXulYBIpL...|youtube#videoCate...|
|"XI7nbFXulYBIpL0a...|[["XI7nbFXulYBIpL...|youtube#videoCate...|
|"XI7nbFXulYBIpL0a...|[["XI7nbFXulYBIpL...|youtube#videoCate...|
|"m2yskBQFythfE4ir...|[["m2yskBQFythfE4...|youtube#videoCate...|
|"XI7nbFXulYBIpL0a...|[["XI7nbFXulYBIpL...|youtube#videoCate...|
|"ld9biNPKjAjgjV7E...|[["ld9biNPKjAjgjV...|youtube#videoCate...|
|"ld9biNPKjAjgjV7E...|[["ld9biNPKjAjgjV...|youtube#videoCate...|
|"ld9biNPKjAjgjV7E...|[["ld9biNPKjAjgjV...|youtube#videoCate...|
+--------------------+--------------------+--------------------+



In [6]:
explode_category = df_category.select(F.explode('items').alias('category')).select('category.*')

In [9]:
explode_category.show()

+--------------------+---+--------------------+--------------------+
|                etag| id|                kind|             snippet|
+--------------------+---+--------------------+--------------------+
|"m2yskBQFythfE4ir...|  1|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...|  2|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 10|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 15|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 17|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 18|youtube#videoCate...|[false, UCBR8-60-...|
|"m2yskBQFythfE4ir...| 19|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 20|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 21|youtube#videoCate...|[false, UCBR8-60-...|
|"m2yskBQFythfE4ir...| 22|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 23|youtube#videoCate...|[true, UCBR8-60-B...|
|"m2yskBQFythfE4ir...| 24|youtube#

In [7]:
dim_category = explode_category.select('etag', 'id', 'snippet.assignable', 'snippet.channelId', 'snippet.title')
dim_category = dim_category.dropDuplicates(['id', 'title'])

In [8]:
dim_category = dim_category.withColumn('id', dim_category.id.cast(IntegerType()))
dim_category = dim_category.select(['id', 'title']).sort('id', ascending=True)

In [18]:
cate1 = dim_category.toPandas()

In [16]:
cate1 = cate1.values.tolist()

In [20]:
cate1['id'].dtypes

dtype('int32')

In [10]:
cate

Unnamed: 0,id,title
0,1,Film & Animation
1,2,Autos & Vehicles
2,10,Music
3,15,Pets & Animals
4,17,Sports
5,18,Short Movies
6,19,Travel & Events
7,20,Gaming
8,21,Videoblogging
9,22,People & Blogs


In [5]:
schema_video = StructType([
    StructField('video_id', StringType(), True),
    StructField('trending_date', StringType(), True),
    StructField('title', StringType(), True),
    StructField('channel_title', StringType(), True),
    StructField('category_id', IntegerType(), True),
    StructField('publish_time', TimestampType(), True),
    StructField('tags', StringType(), True),
    StructField('views', IntegerType(), True),
    StructField('likes', IntegerType(), True),
    StructField('dislikes', IntegerType(), True),
    StructField('comment_count', IntegerType(), True),
    StructField('thumbnail_link', StringType(), True),
    StructField('comments_disabled', BooleanType(), True),
    StructField('ratings_disabled', BooleanType(), True),
    StructField('video_error_or_removed', BooleanType(), True),
    StructField('description', StringType(), True)
])

In [6]:
df_video = spark.read.format("csv")\
            .option("header", True)\
            .option("multiLine", True)\
            .option("delimiter", ",")\
            .schema(schema_video)\
            .load(video)

In [17]:
df_video.count()

375941

In [None]:
df_video.groupby(['video_id']).count().where('count > 1').sort('count', ascending=False).show()

In [6]:
df_video = df_video.dropDuplicates(['video_id'])

In [20]:
df_video.count()

184287

In [21]:
df_video.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)
 |-- description: string (nullable = true)



In [7]:
df_video = df_video.drop(F.col('thumbnail_link'))

In [23]:
df_video.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)
 |-- description: string (nullable = true)



In [8]:
df_video = df_video.withColumn('trending_date', F.to_date(F.col('trending_date'), 'yy.dd.MM').alias('date'))

In [9]:
tags_to_list = F.udf(lambda x: x.replace('"' ,'').split('|'))

In [10]:
video_table = df_video.withColumn('tags', F.when(F.col('tags') != '[none]',
                                        tags_to_list(F.col('tags')))
                                        .otherwise('[]'))

In [11]:
video_table.show(truncate=True)

+-----------+-------------+----------------------------+------------------+-----------+-------------------+------------------------------+-------+------+--------+-------------+-----------------+----------------+----------------------+----------------------------+
|   video_id|trending_date|                       title|     channel_title|category_id|       publish_time|                          tags|  views| likes|dislikes|comment_count|comments_disabled|ratings_disabled|video_error_or_removed|                 description|
+-----------+-------------+----------------------------+------------------+-----------+-------------------+------------------------------+-------+------+--------+-------------+-----------------+----------------+----------------------+----------------------------+
|-H90GPnH1q8|   2017-12-11|        The Walking Dead ...|         AresPromo|         24|2017-12-11 03:40:15|          [The Walking Dead...|  90508|     0|       0|         1136|            false|            tr

In [12]:
fact_youtube_trending = video_table.select(['video_id', 'trending_date', 'category_id', 'publish_time', 'views', 'likes', 'dislikes', 'comment_count'])

In [13]:
fact_youtube_trending.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: date (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- comment_count: integer (nullable = true)



In [14]:
fact_youtube_trending.show()

+-----------+-------------+-----------+-------------------+-------+------+--------+-------------+
|   video_id|trending_date|category_id|       publish_time|  views| likes|dislikes|comment_count|
+-----------+-------------+-----------+-------------------+-------+------+--------+-------------+
|-H90GPnH1q8|   2017-12-11|         24|2017-12-11 03:40:15|  90508|     0|       0|         1136|
|-mCPxSHIrPc|   2017-12-12|         24|2017-12-11 14:30:58|  90289|  6540|     442|          581|
|02QdxSLdVQc|   2018-01-24|         22|2018-01-23 22:10:38| 222149| 16337|     240|         2840|
|08-n4j46okM|   2018-02-08|         22|2018-02-07 16:42:46|  63894|  8030|      64|          215|
|08URtcZ8em0|   2018-05-14|         24|2018-05-13 14:56:59|  28982|  2962|      45|          461|
|0BbDgMYIiEU|   2017-12-26|         24|2017-12-25 15:48:41| 162866|  9501|     899|          438|
|0Bl7xVD7Xtg|   2018-01-24|         24|2018-01-24 01:00:02|   8276|   698|      25|          159|
|0C6b6U9fz68|   2018

In [101]:
fact_youtube_trending.show()

+-----------+-------------+-----------+-------------------+-------+------+--------+-------------+
|   video_id|trending_date|category_id|       publish_time|  views| likes|dislikes|comment_count|
+-----------+-------------+-----------+-------------------+-------+------+--------+-------------+
|-H90GPnH1q8|   2017-12-11|         24|2017-12-11 03:40:15|  90508|     0|       0|         1136|
|-mCPxSHIrPc|   2017-12-12|         24|2017-12-11 14:30:58|  90289|  6540|     442|          581|
|02QdxSLdVQc|   2018-01-24|         22|2018-01-23 22:10:38| 222149| 16337|     240|         2840|
|08-n4j46okM|   2018-02-08|         22|2018-02-07 16:42:46|  63894|  8030|      64|          215|
|08URtcZ8em0|   2018-05-14|         24|2018-05-13 14:56:59|  28982|  2962|      45|          461|
|0BbDgMYIiEU|   2017-12-26|         24|2017-12-25 15:48:41| 162866|  9501|     899|          438|
|0Bl7xVD7Xtg|   2018-01-24|         24|2018-01-24 01:00:02|   8276|   698|      25|          159|
|0C6b6U9fz68|   2018

In [15]:
dim_video = video_table.select(['video_id', 'title', 'channel_title', 'tags', 'comments_disabled', 'ratings_disabled', 'video_error_or_removed'])

In [16]:
dim_video.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- comments_disabled: boolean (nullable = true)
 |-- ratings_disabled: boolean (nullable = true)
 |-- video_error_or_removed: boolean (nullable = true)
 |-- description: string (nullable = true)



In [20]:
vid = dim_video.toPandas()
vid = vid.values.tolist()

In [6]:
import psycopg2
import pandas as pd

In [26]:
def quality_check(cur):
    cur.execute('SELECT COUNT(*) FROM dim_category')
    response = cur.fetchone()
    if response[0] <= 0:
        return 'Quality check failed !!!!'
    else: 
        return 'There have data. Pipeline success write data'

In [27]:
quality_check(cur)

'Quality check failed !!!!'

In [7]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

Unnamed: 0,Param,Value
0,DWH_CLUSTER_TYPE,multi-node
1,DWH_NUM_NODES,4
2,DWH_NODE_TYPE,dc2.large
3,DWH_CLUSTER_IDENTIFIER,dwhcluster
4,DWH_DB,dwh_youtube
5,DWH_DB_USER,thuydd7
6,DWH_DB_PASSWORD,Amazon1234
7,DWH_PORT,5439
8,DWH_IAM_ROLE_NAME,thuyrole


In [8]:
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [12]:
cur.execute('SELECT COUNT(*) FROM dim_category')
response = cur.fetchone()

In [22]:
import sql_queries