In [1]:
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.errors import HttpError
import urllib.parse as p
import re
import os
import pickle
from google.auth.exceptions import RefreshError
import logging
import pandas as pd 
import warnings 

logging.basicConfig(level=logging.WARNING, filename='logging.txt')
#logging.basicConfig(filename='logging.txt')
logger = logging.getLogger(__name__)

SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"]

In [2]:
def youtube_authenticate():
    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
    api_service_name = "youtube"
    api_version = "v3"
    
             ## you need a google developper account and install youtube api to the api key 
    client_secrets_file = r'code_secret_client.json' # change if you ran it on your personal environment

    creds = None
    # the file token.pickle stores the user's access and refresh tokens, and is
    # created automatically when the authorization flow completes for the first time
    if os.path.exists("token.pickle"):
        with open("token.pickle", "rb") as token:
            creds = pickle.load(token)
    # if there are no (valid) credentials availablle, let the user log in.
    if creds and creds.refresh_token:
        try:
            creds.refresh(Request())
        except RefreshError:
            logger.error("Credentials could not be refreshed, possibly the authorization was revoked by the user.")
            os.unlink('token.pickle')
            return
    else:
        flow = InstalledAppFlow.from_client_secrets_file(
            client_secrets_file, SCOPES)
        creds = flow.run_local_server(port=0)
    # Save the credentials for the next run
    with open('token.pickle', 'wb') as token:
        pickle.dump(creds, token)
            
            


    return build(api_service_name, api_version, credentials=creds)

# authenticate to YouTube API
youtube = youtube_authenticate()

In [3]:
def get_video_id_by_url(url):
    """
    Return the Video ID from the video `url`
    """
    # split URL parts
    parsed_url = p.urlparse(url)
    # get the video ID by parsing the query of the URL
    video_id = p.parse_qs(parsed_url.query).get("v")
    if video_id:
        return video_id[0]
    else:
        raise Exception(f"Wasn't able to parse video URL: {url}")

In [4]:
def get_video_details(youtube, **kwargs):
    return youtube.videos().list(
        part="snippet,contentDetails,statistics",
        **kwargs
    ).execute()

In [5]:
def save_video_infos(video_response, df):
    items = video_response.get("items")[0]
    # get the snippet, statistics & content details from the video response
    snippet         = items["snippet"]
    statistics      = items["statistics"]
    content_details = items["contentDetails"]
    id_vid          = items['id']
    # get infos from the snippet
    channel_title = snippet["channelTitle"]
    title         = snippet["title"]
    description   = snippet["description"]
    publish_time  = snippet["publishedAt"]
    if "tags" in snippet : 
        key_words = snippet["tags"]
    else :
        key_words = []
        
    category      = snippet["categoryId"]
    # get stats infos
    if "commentCount" in statistics : 
        comment_count = statistics["commentCount"]
    else : 
        comment_count = -1

    if "likeCount" in statistics :
        like_count = statistics["likeCount"]
    else :
        like_count = -1
        
    favorite_count = statistics["favoriteCount"]
    view_count     = statistics["viewCount"]    
    # get duration from content detail  
    duration   = content_details["duration"]
    definition = content_details["definition"]
    caption    = content_details["caption"]
    licensed   = content_details["licensedContent"]
    # duration in the form of something like 'PT5H50M15S'
    # parsing it to be something like '5:50:15'
    parsed_duration = re.search(f"PT(\d+H)?(\d+M)?(\d+S)", duration)
    
    duration_str = ""
    if parsed_duration != None :
        for d in parsed_duration.groups():
            if d:
                duration_str += f"{d[:-1]}:"
        duration_str = duration_str.strip(":")
    
    df = df.append({"id" : id_vid, "Title": title, "Description" : description,
                    "Channel Title" : channel_title, "Publish time" : publish_time,
                    "Tags": key_words, "Category" : category, "Duration" : duration_str,
                    "Number of comments" : comment_count, "Number of likes" : like_count,
                    "Number of views" : view_count, "Number of favorites" : favorite_count,
                    "Video quality" : definition, "Licensed" : licensed} ,   ignore_index=True)
    return df

In [6]:
def get_channel_videos(path, channel):
    
    channel_csv = pd.read_csv(path + channel)
    channel_csv.drop_duplicates(inplace=True)  # drop ducplicates 
    channel_csv.dropna(inplace=True)  # drop nan
    videos = channel_csv['videos'].apply(lambda x : str(x)) # all video urls are an str type
    videos = videos[videos.apply(lambda x : "https://www.youtube.com/watch?v" in x)] # keep only videos
    video_ids = videos.apply(lambda x : get_video_id_by_url(x)) # extract video id
    
    error = None
    df = pd.DataFrame() 
    for video_id in video_ids :
        try :
            response = get_video_details(youtube, id=video_id)
            df = save_video_infos(response, df)
            #print("got details for", video_id)
            
        except Exception as inst:
            print("Error in video id ", video_id)
            error = inst
            if (type(error) == HttpError):
                break
    
    return df, error

In [None]:
import time
path = 'channelVideos/'  # change if you ran it on your personal environment
all_videos = os.listdir(path)

for channel in all_videos :
    df, error = get_channel_videos(path, channel)
    if (type(error) == HttpError):
        warnings.warn('Scrapping was interrupted due to the quota being exceeded. Please wait till tomorrow!!')
        day = 60*60*24
        time.sleep(day)     
    
    df, error = get_channel_videos(path, channel)
    channel_videos = 'channel' + channel.split("_")[1] + 'videos.csv'
    df.to_csv(channel_videos, index=False)
    print('scrapped ', channel) 
    

Error in video id  LBraVzyYDKg


  
