In [None]:
!pip install cloud-sql-python-connector

In [None]:
!pip install pymysql

In [None]:
'''
  Set the Google Cloud Project
'''
project_id="<Enter your project ID here>"
!gcloud config set project {project_id}

In [None]:
'''
  Aunthenticate to Colab Environment
'''
from google.colab import auth
auth.authenticate_user()

In [None]:
!pip install google-api-python-client

In [None]:
!gcloud services enable sqladmin.googleapis.com

This notebook contains code cells for the following:

*   API Client that consumes the youtube data v3 API to fetch channel, playlist, comment and video information.
*   Code for writing channel, playlists, video and comments information.
*   Methods to retrieve 10 different statistics from MySQL db.
*   Stream Lit application presents a webpage to view channel information, ware house channels and view statistics






      
      
      

In [None]:
''' API Client Methods '''

from enum import IntEnum
from typing import ItemsView

# Connect to YT API

import googleapiclient.discovery
api_service_name = "youtube"
api_version = "v3"
api_key = '<Enter your API key here>'

youtube = googleapiclient.discovery.build(api_service_name, api_version, developerKey=api_key)

# API request for Channel

def get_channel_data(channel_id):
  # Prepare the API request
  request = youtube.channels().list(
      part="snippet,contentDetails,statistics,status",
      id=channel_id)
  # Execute the API request
  response = request.execute()
  item = response["items"][0]

  # Extract from the response, the channel attributes that we are interested in
  data={
     "id": channel_id,
     "name":item["snippet"]["title"],
     "type":item["kind"],
     "views":item["statistics"]["viewCount"],
     "description":item["snippet"]["description"],
     "status":item["status"]["privacyStatus"],
     "playlist_id":item["contentDetails"]["relatedPlaylists"]["uploads"]
  }
  return data

# API request for Playlists

def get_playlist_data(channel_id):
  #Prepare the API request
  request = youtube.playlists().list(
      part="snippet",
      channelId=channel_id
      )

  response = request.execute()
  items = response["items"]

  playlists = dict()
  for item in items:
    data = {
      "id":item["id"],
      "channel_id":item["snippet"]["channelId"],
      "name":item["snippet"]["title"]
            }
    playlists[data["id"]]=data

  return playlists


# API request for Comment

def get_commentthread_data(channel_id):
  #Prepare the API request
  request = youtube.commentThreads().list(
      part="snippet",
      allThreadsRelatedToChannelId=channel_id
      )

  response = request.execute()
  items = response["items"]

  commentthread = dict()
  for item in items:
    data=dict()
    data = {
      "id":item["id"],
      "video_id":item["snippet"]["videoId"],
      "text":item["snippet"]["topLevelComment"]["snippet"]["textDisplay"],
      "author":item["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"],
      "publishdate":item["snippet"]["topLevelComment"]["snippet"]["publishedAt"]
            }
    commentthread[data["id"]]=data
  return commentthread


# API request for Videos

def get_video_data(video_ids, channel):
  idstr=str()
  for video_id in video_ids:
    idstr+=video_id+','
  #Prepare the API request
  if len(idstr)>0:
    request = youtube.videos().list(
    part="snippet,statistics,contentDetails",
      id=idstr
      )
    response = request.execute()
    items = response["items"]
    videos = dict()
    for item in items:
      data = {
      "id":item["id"],
      "name":item["snippet"]["title"],
      "playlist_id":channel["playlist_id"],
      "channel_id":channel["id"],
      "description":item["snippet"]["description"],
      "publishdate":item["snippet"]["publishedAt"],
      "view_count":item["statistics"]["viewCount"],
      "like_count":item["statistics"]["likeCount"],
      "dislike_count":0,
      "favorite_count":item["statistics"]["favoriteCount"],
      "comment_count":item["statistics"]["commentCount"],
      "duration":item["contentDetails"]["duration"],
      "caption_status":item["contentDetails"]["caption"]
            }
      data["thumbnails"]=dict()
      for key in item["snippet"]["thumbnails"].keys():
        data["thumbnails"][key]=item["snippet"]["thumbnails"][key]["url"]
      videos[data["id"]] = data
  else:
    print(f"\nThere are no Videos/Video ID for this channel {channel_id}\n")
  return videos


In [None]:
# Connect with MySQL db
from google.cloud.sql.connector import Connector
import pymysql
import traceback
from sqlalchemy import create_engine

'''
  Connector for connecting to the Youtube channel information
'''

c=Connector()
conn_name='<enter connection name here>'
user='<userid>'
password='<password>'
db='<db>'

# Initializing the Connector
def connection_helper():
  try:
    # dB Connection
    connection = c.connect(conn_name, "pymysql", user=user, password=password, db=db)
    print(f'''Successfully connected to database''')
    return connection
  except:
    print(f'''DB Connection Failed''')

# Get connection from pool
def get_connection():
  # Create a DB connection and connection
  pool=create_engine("mysql+pymysql://", creator=connection_helper)
  return pool.connect()

In [None]:
from operator import index
# Code for storing and retrieving objects from MySQL DB
import pandas as pd
import sqlalchemy

faqs = {
      'Q1':'What are the names of all the videos and their corresponding channels?',
      'Q2':'Which channels have the most number of videos, and how many videos do they have?',
      'Q3':'What are the top 10 most viewed videos and their respective channels?',
      'Q4':'How many comments were made on each video, and what are their corresponding video names',
      'Q5':'Which videos have the highest number of likes, and what are their corresponding channel names?',
      'Q6':'What is the total number of likes and dislikes for each video, and what are their corresponding video names?',
      'Q7':'What is the total number of views for each channel, and what are their corresponding channel names?',
      'Q8':'What are the names of all the channels that have published videos in the year 2022',
      'Q9':'What is the average duration of all videos in each channel, and what are their corresponding channel names?',
      'Q10':'Which videos have the highest number of comments, and what are their corresponding channel names?'
    }


connection = get_connection()

'''
Convert Duration to Seconds
#formats are 'P2DT3H20M30S', 'PT3H20M30S','PT20M30S','PT30S'
'''
def convert_duration(duration):
  day=hour=mins=sec=0
  if duration != None:
    if 'D' in duration:
      day=duration.split('D',1)[0]
      if day is not None:
        day=day.split('P',1)[1]
    if 'H' in duration:
      hour=duration.split('H',1)[0]
      if hour is not None:
        hour= hour.split('T',1)[1]
    if 'M' in duration:
      mins=duration.split('M',1)[0]
      if mins is not None:
        if 'H' in mins:
          mins= mins.split('H')[1]
        else:
          if 'PT' in mins:
            mins=mins.split('PT')[1]
    if 'S' in duration:
      sec=duration.split('S',1)[0]
      if 'M' in sec:
        sec= sec.split('M')[1]
      else:
        if 'PT' in sec:
          sec=sec.split('PT')[1]
    #print(f'''d:{day} h:{hour} m:{mins} s:{sec}''')
    return (int(day)*86400 + int(hour)*3600 + int(mins)*60 + int(sec))

'''
Insert a Channel into dB
'''

def insert_channel(channel,docommit=True):
  sql = '''INSERT INTO channel (id,name,type,views,description,status,playlist_id) VALUES (:id,:name,:type,:views,:description,:status,:playlist_id);'''
  statement = sqlalchemy.text(sql)
  connection.execute(statement,{'id': channel['id'], 'name': channel['name'], 'type': channel['type'], 'views': channel['views'], 'description': channel['description'], 'status': channel['status'],'playlist_id':channel['playlist_id']})
  if docommit == True:
    connection.commit()

'''
Get a channel with given ID
'''
def get_channel(channel_id):
  sql = '''SELECT id,name,type,views,description,status,playlist_id FROM channel WHERE id=%(id)s;'''
  df=pd.read_sql_query(sql, connection, params={'id':channel_id})
  return df

'''
Get all channels
'''
def get_all_channels():
  sql = '''SELECT id,name FROM channel;'''
  df=pd.read_sql_query(sql, connection)
  return df

'''
Get all FAQs
'''
def get_all_faqs():
  return pd.DataFrame.from_dict(faqs, orient='index').reset_index().rename(columns={'index':'id',0:'question'})

'''
Insert Playlists into dB
'''
def insert_playlists(channel_id,playlists,docommit=True):
  sql = '''INSERT INTO playlist (id,channel_id,name) VALUES (:id,:channel_id,:name);'''
  statement = sqlalchemy.text(sql)
  for key in playlists.keys():
    playlist=playlists[key]
    connection.execute(statement,{'id':key, 'name': playlist['name'], 'channel_id': channel_id})
  if docommit == True:
    connection.commit()

'''
Get Playlists for a given Channel ID
'''
def get_playlists_by_channel(channel_id):
  sql = '''SELECT id,channel_id,name FROM playlist WHERE channel_id=%(channel_id)s;'''
  return pd.read_sql_query(sql, connection, params={'channel_id':channel_id})
'''
Insert Comments into dB
'''
def insert_comments(channel_id, comments,docommit=True):
  sql = '''INSERT INTO comment (id,video_id,author,published_date,comment_text) VALUES (:id,:video_id,:author,STR_TO_DATE(:published_date,"%Y-%m-%dT%H:%i:%sZ"),:comment_text);'''
  statement = sqlalchemy.text(sql)
  for key in comments.keys():
    comment=comments[key]
    connection.execute(statement,{'id': comment['id'],'video_id': comment['video_id'],'author': comment['author'],'published_date': comment['publishdate'],'comment_text':comment['text']})
  if docommit == True:
    connection.commit()
'''
Get Comments for a given Channel ID
'''
def get_comments_by_channel(channel_id):
  sql = '''SELECT id,video_id,comment_text,author,published_date FROM comment WHERE video_id IN (SELECT id FROM video WHERE channel_id=%(channel_id)s);'''
  return pd.read_sql_query(sql,connection, params={'channel_id':channel_id})

'''
Insert Videos for a given Channel ID
'''
def insert_videos(channel_id, videos,docommit=True):
  sql = '''INSERT INTO video (id,playlist_id,channel_id,name,description,published_date,view_count,like_count,dislike_count,favorite_count,comment_count,duration,caption_status) VALUES (:id,:playlist_id,:channel_id,:name,:description,STR_TO_DATE(:published_date,"%Y-%m-%dT%H:%i:%sZ"),:view_count,:like_count,:dislike_count,:favorite_count,:comment_count,:duration,:caption_status);'''
  statement = sqlalchemy.text(sql)
  for key in videos.keys():
    video=videos[key]
    connection.execute(statement,{'id':video['id'],'playlist_id':video['playlist_id'],'channel_id':video['channel_id'],'name':video['name'],'description':video['description'],'published_date':video['publishdate'],'view_count':video['view_count'],'like_count':video['like_count'],'dislike_count':video['dislike_count'],'favorite_count':video['favorite_count'],'comment_count':video['comment_count'],'duration':convert_duration(video['duration']),'caption_status':video['caption_status']})
  if docommit == True:
    connection.commit()
'''
Get Videos for a given Channel ID
'''
def get_videos_by_channel(channel_id):
  sql = '''SELECT id,playlist_id,channel_id,name,description,published_date,view_count,like_count,dislike_count,favorite_count,comment_count,duration,caption_status FROM video WHERE video.playlist_id IN (SELECT playlist_id FROM channel WHERE channel_id=%(channel_id)s);'''
  return pd.read_sql_query(sql,connection, params={'channel_id': channel_id},index_col="id")

'''
   Commit all DATA at once
'''
def insert_all_info(channel, playlists, comments, videos):
  insert_channel(channel,False)
  insert_playlists(channel['id'],playlists,False)
  insert_videos(channel['id'], videos,False)
  insert_comments(channel['id'], comments,False)
  connection.commit()

'''
Get FAQs
'''
def get_answer(faq_id):
  sql = ''
  if faq_id == "Q1":
    sql = '''SELECT v.id video_id,v.name video_name,c.id channel_id,c.name channel_name FROM video v JOIN channel c ON (v.channel_id=c.id);'''
  elif faq_id == "Q2":
    sql = '''SELECT c.id channel_id,c.name channel_name,count(*) video_count FROM channel c JOIN video v ON (c.id=v.channel_id) GROUP BY channel_id,channel_name ORDER BY video_count DESC;'''
  elif faq_id == "Q3":
    sql = '''SELECT v.id video_id,v.name video_name,v.view_count view_count,c.id channel_id,c.name channel_name FROM video v JOIN channel c ON (v.channel_id=c.id) ORDER BY view_count DESC LIMIT 10;'''
  elif faq_id == "Q4":
    sql = '''SELECT v.id video_id, v.name video_name, comment_count FROM video v ORDER BY comment_count DESC;'''
  elif faq_id == "Q5":
    sql = '''SELECT v.id video_id,v.name video_name, v.like_count video_like_count, c.id channel_id,c.name channel_name FROM video v JOIN channel c ON (v.channel_id=c.id) ORDER BY video_like_count DESC LIMIT 1;'''
  elif faq_id == "Q6":
    sql = '''SELECT id video_id, name video_name, like_count video_like_count FROM video;'''
  elif faq_id =="Q7":
    sql = '''SELECT id,name,views FROM channel ORDER BY views DESC;'''
  elif faq_id == "Q8":
    sql = '''SELECT c.id channel_id,c.name channel_name, v.published_date published_yr FROM channel c JOIN video v ON (c.id=v.channel_id) WHERE YEAR(v.published_date)=2022;'''
  elif faq_id == "Q9":
    sql = '''SELECT c.id channel_id,c.name channel_name, AVG(v.duration) average_duration FROM channel c JOIN video v ON (c.id=v.channel_id) GROUP BY channel_id,channel_name ORDER BY average_duration DESC;'''
  elif faq_id == "Q10":
    sql = '''SELECT v.id video_id, v.name video_name, c.id channel_id, c.name channel_name, comment_count comment_count_top10 FROM video v JOIN channel c ON (c.id=v.channel_id) ORDER BY comment_count DESC LIMIT 10;'''
  #Test
  return pd.read_sql_query(sql,connection)

In [None]:
!pip install -q streamlit
!pip install -q streamlit_option_menu

In [None]:
%%writefile yt_webapp.py

import streamlit as st
from streamlit_option_menu import option_menu
import pandas as pd
import numpy as np

from enum import IntEnum
from typing import ItemsView
from pathlib import Path
import sys
sys.path.append(str(Path('myutils.py').resolve()))

from myutils import get_channel_data
from myutils import get_playlist_data
from myutils import get_commentthread_data
from myutils import get_video_data

# SQL Methods import
from myutils import insert_channel
from myutils import insert_playlists
from myutils import insert_comments
from myutils import insert_videos
from myutils import insert_all_info

from myutils import get_answer
from myutils import get_all_channels
from myutils import get_all_faqs
from myutils import get_channel
from myutils import get_playlists_by_channel
from myutils import get_comments_by_channel
from myutils import get_videos_by_channel
import googleapiclient.discovery


#Must be first streamlit command
st.set_page_config(layout='wide')

# Initialize Session State

if 'channel_input' not in st.session_state:
  st.session_state.channel_input=None

if "chosen_channel" not in st.session_state:
  st.session_state.chosen_channel = None

if "chosen_faq" not in st.session_state:
  st.session_state.chosen_faq = None

if "channel_names" not in st.session_state:
  st.session_state.channel_names = None
st.session_state.channel_names = get_all_channels()

if "faqs" not in st.session_state:
  st.session_state.faqs = None
st.session_state.faqs = get_all_faqs()

if "channel_info_button" not in st.session_state:
  st.session_state.channel_info_button = None

if "channel_save_button" not in st.session_state:
  st.session_state.channel_save_button = None

if "channel" not in st.session_state:
  st.session_state.channel=None

if "channel_playlists" not in st.session_state:
  st.session_state.channel_playlists=None

if "channel_comments" not in st.session_state:
  st.session_state.channel_comments=None

if "channel_videos" not in st.session_state:
  st.session_state.channel_videos=None


def fetch_channel_details():
  channel_id = st.session_state.channel_input
  channel = get_channel_data(channel_id)
  data_area.dataframe(channel, use_container_width=True)
  st.session_state.channel = channel

  playlists = get_playlist_data(channel_id)
  df_playlists = pd.DataFrame(playlists).T
  data_area.dataframe(df_playlists, use_container_width=True)
  st.session_state.channel_playlists = playlists

  comments = get_commentthread_data(channel_id)
  df_comments = pd.DataFrame(comments).T
  data_area.dataframe(df_comments, use_container_width=True)
  st.session_state.channel_comments = comments

  vids=set()
  for key in comments.keys():
    vids.add(comments[key]['video_id'])

  videos = get_video_data(vids, channel)
  df_videos = pd.DataFrame(videos).T
  data_area.dataframe(df_videos, use_container_width=True)
  st.session_state.channel_videos = videos

def fetch_channel_summary():
  df_all_names = st.session_state.channel_names
  channel_id = df_all_names[df_all_names.name == st.session_state.chosen_channel].iloc[0]['id']
  channel_out = get_channel(channel_id)
  data_area.dataframe(channel_out, use_container_width=True)

def fetch_faq():
  q = st.session_state.chosen_faq
  faqs = st.session_state.faqs
  qid = faqs[faqs.question == q].iloc[0]['id']
  data_area.dataframe(get_answer(qid), use_container_width=True)

def save_channel_info():
  channel = st.session_state.channel
  playlists = st.session_state.channel_playlists
  comments = st.session_state.channel_comments
  videos = st.session_state.channel_videos
  insert_all_info(channel,playlists,comments,videos)
  data_area.text(f'''Channel Info Saved''')

# Main Program
menu,mview = st.columns([15,85])
with st.sidebar:
  my_opt=option_menu(menu_title='Menu', options=['Stored Channels', 'Review and Save', 'Query Analyse'])

mview.header('YouTube Data Harvesting & Warehousing')
if my_opt == 'Stored Channels':
  try:
    st.session_state.chosen_channel = mview.selectbox("Choose a Channel:", key='chosen_channel', options=st.session_state.channel_names.name, on_change=fetch_channel_summary)
  except:
    print('')
elif my_opt == 'Review and Save':
  col1,col2,col3 = mview.columns(3)
  try:
    st.session_state.channel_input = col1.text_input("Enter any Channel ID:", key = 'channel_input')
  except:
    print('')
  try:
    col2.write('')
    col2.write('')
    st.session_state.channel_info_button = col2.button("View Channel Info", key = 'channel_info_button', on_click = fetch_channel_details)
  except:
    print('')
  try:
    col3.write('')
    col3.write('')
    st.session_state.channel_save_button = col3.button("Save Channel Info", key = 'channel_save_button', on_click = save_channel_info)
  except:
    print('')
elif my_opt == 'Query Analyse':
  try:
    st.session_state.chosen_faq = mview.selectbox("Choose a FAQ:", key = 'chosen_faq', options = st.session_state.faqs.question, on_change = fetch_faq)
  except:
    print('')
data_area = mview.container()

In [None]:
'''
Finding Local IP
'''
! wget -q -O - ipv4.icanhazip.com

In [None]:
'''
Hosting the Streamlit Web App
'''
!streamlit run /content/yt_webapp.py &>/content/logs.txt &
!npx localtunnel --port 8501
