In [1]:
from googleapiclient.discovery import build
import json, time, sys

video_collected = []

def formatComment(comment):
    doc = {}
    doc['id']=comment['id']
    doc['comment'] = comment['snippet']['topLevelComment']['snippet']['textOriginal']
    doc['user'] = comment['snippet']['topLevelComment']['snippet']['authorDisplayName']
    try:
        doc['userId'] = comment['snippet']['topLevelComment']['snippet']['authorChannelId']['value']
    except:
        doc['userId'] = ""
    doc['likeCount'] = comment['snippet']['topLevelComment']['snippet']['likeCount']
    doc['publishedAt']=comment['snippet']['topLevelComment']['snippet']['publishedAt']
    doc['totalReplyCount'] = comment['snippet']['totalReplyCount']
    doc['videoId'] = comment['snippet']['videoId']
    doc['parentId'] = ""
    return doc

def formatReply(reply):
    doc = {}
    doc['id']=reply['id']
    doc['comment'] = reply['snippet']['textOriginal']
    doc['user'] = reply['snippet']['authorDisplayName']
    try:
        doc['userId'] = reply['snippet']['authorChannelId']['value']
    except:
        doc['userId'] = ""
    doc['likeCount'] = reply['snippet']['likeCount']
    doc['publishedAt']=reply['snippet']['publishedAt']
    doc['replyCount'] = 0
    doc['videoId'] = reply['snippet']['videoId']
    doc['parentId'] = reply['snippet']['parentId']
    return doc

class Collector:
    def __init__(self,collected_videos=set()):
        with open('config.json') as config:
            config=json.load(config)
            keys = config['api_keys']
        self.youtubes = [build('youtube', 'v3', developerKey=key) for key in keys]
        self.youtube = self.youtubes[0]
        self.current_queries = 0
        self.collected_videos = collected_videos
    
    def roll(self):
        try:
            self.youtube = self.youtubes[self.current_queries//9000]
        except:
            print('Quota exceeded,sleeping')
            time.sleep(60*60*24)
            self.youtube=self.youtubes[0]
            self.current_queries = 0

    def getVideos(self, categories,method='relevance'):
        videos_metadata=[]
        part=["id","snippet"]
        for category in categories:
            videos = []

            while True:
                try:
                    resp = self.youtube.search().list(regionCode='us',part='snippet',maxResults=50,type='video'
                                                    ,order=method,relevanceLanguage='en',videoCategoryId=category)
                    result = resp.execute()
                    self.current_queries+=100
                    self.roll()
                    break
                except:#service unavailable
                    time.sleep(60)
            for item in result['items']:
                if item['id']['videoId'] in self.collected_videos:
                    continue
                formattedResult = {}
                formattedResult['videoId'] = item['id']['videoId']
                formattedResult['publishedAt']=item['snippet']['publishedAt']
                formattedResult['title']=item['snippet']['title']
                formattedResult['description']=item['snippet']['description']
                formattedResult['channelTitle']=item['snippet']['channelTitle']
                formattedResult['channelId']=item['snippet']['channelId']
                formattedResult['liveBroadCastContent'] = (item['snippet']['liveBroadcastContent']!='none')
                formattedResult['category'] = category
                formattedResult['method'] = method
                videos.append(formattedResult)
                self.collected_videos.add(item['videoId'])
            while result:
                if (len(videos) >= 1000) or ('nextPageToken' not in result):
                    break
                while True:
                    try:
                        resp = self.youtube.search().list(regionCode='us',pageToken=result['nextPageToken'],part='snippet',maxResults=50,type='video'
                                                    ,order=method,relevanceLanguage='en',videoCategoryId=category)
                        result = resp.execute()
                        self.current_queries+=100
                        self.roll()
                        break
                    except:
                        time.sleep(60)
                for item in result['items']:
                    if item['id']['videoId'] in self.collected_videos:
                        continue
                    formattedResult = {}
                    formattedResult['videoId'] = item['id']['videoId']
                    formattedResult['publishedAt']=item['snippet']['publishedAt']
                    formattedResult['title']=item['snippet']['title']
                    formattedResult['description']=item['snippet']['description']
                    formattedResult['channelTitle']=item['snippet']['channelTitle']
                    formattedResult['channelId']=item['snippet']['channelId']
                    formattedResult['liveBroadCastContent'] = (item['snippet']['liveBroadcastContent']!='none')
                    formattedResult['category'] = category
                    formattedResult['method'] = method 
                    videos.append(formattedResult)
                    self.collected_videos.add(item['videoId'])
            videos_metadata.extend(videos)
        return videos_metadata
    
    def getVideoStatistics(self,videos):
        for video in videos:
            while True:
                try:
                    resp = self.youtube.videos().list(part=['snippet','contentDetails','topicDetails','statistics'],id=video['videoId'])
                    result = resp.execute()
                    self.current_queries+=1
                    self.roll()
                    break
                except:
                    time.sleep(60)
            for item in result['items']:
                try:
                    video['tags'] = item['snippet']['tags']
                except:
                    video['tags'] = []
                try:
                    video['language'] = item['snippet']['defaultLanguage']
                except:
                    video['language'] = ''
                video['duration'] = item['contentDetails']['duration']
                try:
                    video['topics'] = item['topicDetails']['topicCategories']
                except:
                    video['topics'] = []
                video.update(item['statistics'])
        return videos
    
    def getChannelStatistics(self,channels):
        channels_metadata=[]
        for channelId in channels:
            while True:
                try:
                    resp= self.youtube.channels().list(part=['id','statistics','snippet'],id=channelId)
                    result = resp.execute()
                    self.current_queries+=1
                    self.roll()
                    break
                except Exception as e:
                    raise(e)
                    time.sleep(60)
            if 'items' not in result:
                continue
            channel_data = {}
            channel_data['title']=result['items'][0]['snippet']['title']
            channel_data['id'] = result['items'][0]['id']
            channel_data['viewCount'] = result['items'][0]['statistics']['viewCount']
            try:
                channel_data['subscriberCount'] = result['items'][0]['statistics']['subscriberCount']
            except:
                channel_data['subscriberCount'] = None
            try:
                channel_data['description'] = result['items'][0]['snippet']['description']
            except:
                channel_data['description'] = ''
            try:
                channel_data['publishedAt'] = result['items'][0]['snippet']['publishedAt']
            except:
                channel_data['publishedAt'] = None
            channel_data['videoCount'] = result['items'][0]['statistics']['videoCount']
            channels_metadata.append(channel_data)
        return channels_metadata
            
    
    def getComments(self,videoId):
        comments = []
        response = self.youtube.commentThreads().list(part=["id","snippet","replies"],videoId=videoId,maxResults=100).execute()
        self.current_queries+=1
        self.roll()
        while response:
            for item in response['items']:
                formattedResult = formatComment(item)
                comments.append(formattedResult)
                if formattedResult['ReplyCount']>0:
                    try:
                        for reply in item['replies']['comments']:
                            reply = formatReply(reply)
                            comments.append(reply)
                    except:
                        pass
            if 'nextPageToken' in response:
                while True:
                    try:
                        response = self.youtube.commentThreads().list(part=["id","snippet","replies"],videoId = videoId,maxResults=100,pageToken=response['nextPageToken']).execute()
                        self.current_queries+=1
                        self.roll()
                        break
                    except:
                        time.sleep(60)
            else:
                break
        return comments

In [None]:
try:
    with open('Data/categories.json') as file:
        categories = json.load(file)
except:
    categories = youtube.videoCategories().list(part='snippet',regionCode='US')
    with open('Data/categories.json','w') as file:
        json.dump(categories,file)


category_ids = [category['id'] for category in categories]
print(f'Collecting data for {len(category_ids)} categories using method')

collector = Collector(collected_videos=set())
videos=collector.getVideos(category_ids,method='viewCount')
print(f'Finished category queries {len(videos)} found')
videos = collector.getVideoStatistics(videos)
print(f'Finished collecting statistics')
with open('Data/video_metadata.jsonl','a',encoding='utf-8') as file:
    for line in videos:
        json.dump(line,file,ensure_ascii=False)
        file.write('\n')

In [None]:
for video in videos:
    video_collected.append(video['videoId'])
    try:
        comments=collector.getComments(video['videoId'])
    except Exception as e:
        if 'disabled comments' in str(e):
            print(f'Video {video["videoId"]} has disabled comments')
            continue
    print(f'Collected {len(comments)} comments for {video["videoId"]}')
    with open('Data/comments.jsonl','a',encoding='utf-8') as file:
        for comment in comments:
            json.dump(comment,file,ensure_ascii=False)
            file.write('\n')
print('Done')

In [None]:
channels = set([video['channelId'] for video in videos])
print(f'Collecting data for {len(channels)} channels')
channel_statistics = collector.getChannelStatistics(channels)
with open('Data/channel_metadata.jsonl','w',encoding='utf-8') as file:
     for channel in channel_statistics:
        json.dump(channel,file,ensure_ascii=False)
        file.write('\n')