In [1]:
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from datetime import datetime, date
import pandas as pd
import os
from dataclasses import dataclass, fields
import psycopg2

In [2]:
# data classes
@dataclass
class Channel:
    channelId: str = None
    title: str = None
    publishedAt: date = None
    country: str = None
    description: str = None
@dataclass
class Video:
    video_id: str = None
    channelId: str = None
    categoryId: int = None
    title: str = None
    publishedAt: date =  None
    description: str = None
@dataclass
class ChannelMetrics:
    channelId: str = None
    extract_date: date = None
    subscriberCount: int = None
    viewCount: int = None
    videoCount: int = None

@dataclass
class VideoMetrics:
    video_id: str = None
    extract_date: date = None
    viewCount:  int = None
    likeCount: int = None
    commentCount:  int = None


In [3]:
# config

# api key name
API_KEY = 'YT_API_KEY'
#formats
SOURCE_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
TARGET_DATE_FORMAT = '%Y-%m-%d'
DATE_COLUMN = 'publishedAt'
##
CHANNELS = ['lekkostronniczy', 'Marvecc']
START_DATE = '2023-04-10'

#extraction filed Channels info
CHANNELS_SNIPPED = ['title',  'publishedAt', 'country', 'description']
CHANNELS_METRICS = ['viewCount', 'subscriberCount', 'videoCount']

#extractions fields for videos
VIDEOS_SNIPPED =['title', 'categoryId', 'publishedAt','description']
VIDOS_METRICS = ['viewCount', 'likeCount', 'commentCount']

TABLES = [Channel, Video, ChannelMetrics, VideoMetrics]
# database
HOST = 'localhost'
USER = 'postgres'
DBNAME='test'
PORT = 5432
PASSWORD = os.environ['POSTGRESQL_PASSWORD']

In [4]:
TODAY = datetime.today().strftime(TARGET_DATE_FORMAT)
# build youtube service
youtube = build('youtube', 'v3', developerKey=os.environ['YT_API_KEY'])

In [5]:
# table initialization in database
def init_table(data_class):
    
    table_name = data_class.__name__.lower()
    str_to_varchar = lambda x: "VARCHAR" if x == 'STR' else x

    table_fileds = ", ".join([f'{field.name} {str_to_varchar(field.type.__name__.upper())}' for field in fields(data_class)])

    query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
        {table_fileds}
        );
        """

    connection = psycopg2.connect(
            host=HOST,
            dbname=DBNAME,
            user=USER,
            password=PASSWORD,
            port=PORT 
                            )

    cur = connection.cursor()

    cur.execute(query)
    connection.commit()
    
    cur.close()
    connection.close()
    
    return True

# extract functions
def get_api_request(request):
    try:
        response = request.execute()
        return response
    except HttpError as e:
        return 'Error response status code : {0}, reason : {1}'.format(e.status_code, e.error_details)
    
def get_channelId(channel_name):
    
    request = youtube.search().list(
        part='id',
        type='channel',
        q=channel_name
    )
    response = get_api_request(request)
    channelId = response['items'][0]['id']['channelId']
    return channelId

def extract_channel_data(channelId):
    request = youtube.channels().list(
    part="snippet,statistics",
    id=channelId
    )
    response = get_api_request(request)
    return response

def extract_video_data(channelId, start_date):
    
    max_results = 50
    next_page_token = None
    videos_results  = []
    start_date = datetime.strptime(start_date, TARGET_DATE_FORMAT)

    next_page = True
    while next_page:

        search_request = youtube.search().list(
                channelId=channelId,
                part='id',
                order='date',
                type='video',
                maxResults=max_results,
                publishedAfter=start_date.isoformat() + 'Z',
                pageToken=next_page_token
            )

        search_response = get_api_request(search_request)

        videos_ids = ",".join([item['id']['videoId'] for item in search_response['items']])

        videos_request = youtube.videos().list(
            id=videos_ids,
            part='snippet,statistics')

        videos_response = get_api_request(videos_request)

        videos_results.extend(videos_response['items'])

        if 'nextPageToken' in search_response.keys():
            next_page_token = search_response['nextPageToken']
        else:
            next_page = False

    return videos_results
# tranform functions
def transform_channel(channel_data, channelId):
    
    channel_info = {}
    for field in CHANNELS_SNIPPED:
        channel_info[field] = channel_data['items'][0]['snippet'][field]
    
    date_obj = datetime.strptime(channel_info['publishedAt'], SOURCE_DATE_FORMAT)
    channel_info['publishedAt'] = date_obj.strftime(TARGET_DATE_FORMAT)
    channel_info['description'] = channel_info['description'].rstrip()
    channel_info['channelId'] = channelId

    return channel_info

def transform_channel_metrics(channel_data, channelId):

    channel_stats = {}
    for field in CHANNELS_METRICS:
        channel_stats[field] = int(channel_data['items'][0]['statistics'][field])
    channel_stats['channelId'] = channelId
    channel_stats['extract_date'] = TODAY

    return channel_stats

def transform_video(video, channelId):
    video_record = {}
    video_id = video['id']
    for field in VIDEOS_SNIPPED:
        video_record[field] =  video['snippet'][field]

    date_obj = datetime.strptime(video_record['publishedAt'], SOURCE_DATE_FORMAT)
    video_record['publishedAt'] = date_obj.strftime(TARGET_DATE_FORMAT)
    video_record['categoryId'] = int(video_record['categoryId'])
    video_record['video_id'], video_record['channelId'] = video_id, channelId

    return video_record

def transform_video_metrics(video):

    video_metrics_record = {}
    video_id = video['id']
    for field in VIDOS_METRICS:
        video_metrics_record[field] = int(video['statistics'][field])
    video_metrics_record['video_id'], video_metrics_record['extract_date'] = video_id, TODAY

    return video_metrics_record
    
def tansform_video_data(videos_data, channelId):

    videos = []
    videos_metrics = []

    for video in videos_data:

        video_record = transform_video(video, channelId)
        video_metrics_record = transform_video_metrics(video)
        
        videos.append(video_record)
        videos_metrics.append(video_metrics_record)

    return videos, videos_metrics

# load functions
def drop_duplicated_rows():
    pass


def load_to_db_table(data_obj):
    table_name = table_name = type(data_obj).__name__.lower()
    columns, values = list(zip(*vars(data_obj).items()))
    columns_str = ', '.join(columns)
    placeholders_str = ', '.join(['%s']*len(columns))

    query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders_str});"
    
    connection = psycopg2.connect(
        host=HOST,
        dbname=DBNAME,
        user=USER,
        password=PASSWORD,
        port=PORT 
                        )

    cur = connection.cursor()
    cur.execute(query, values)

    connection.commit()

    cur.close()
    connection.close()
    return True

def load_data_list_to_db_table(data_list):

    for data_obj in data_list:
        load_to_db_table(data_obj)
    return 0 

# ETL process function 
def yt_etl(channel_name):

    #extract
    channelId = get_channelId(channel_name)
    
    channel_data = extract_channel_data(channelId)
    videos_data = extract_video_data(channelId, START_DATE)
    
    #transform
    channel = transform_channel(channel_data, channelId)
    channel_metrics = transform_channel_metrics(channel_data, channelId)
    videos, videos_metrics = tansform_video_data(videos_data, channelId)
    channel = Channel(**channel)
    channel_metrics = ChannelMetrics(**channel_metrics)
    videos = [Video(**video) for video in videos]
    videos_metrics = [VideoMetrics(**video) for video in videos_metrics]

    #load
    for data in [channel, channel_metrics]:
        load_to_db_table(data)
    for data in [videos, videos_metrics]:
        load_data_list_to_db_table(data)

    return 0

# main functions
def main():
    for table in TABLES:
        init_table(table)

    for channel_name in CHANNELS:
        yt_etl(channel_name)

    
# execute process    
main()
