In [1]:
import os
import googleapiclient.discovery
import urllib.parse as p
import pandas as pd
from youtube_transcript_api import YouTubeTranscriptApi
import json
def obj_dict(obj):
    return obj.__dict__

import subprocess
import requests
import re
import youtube_dl


In [2]:
from dotenv import load_dotenv # add this line
load_dotenv()

True

In [3]:
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore

# Use a service account.
cred = credentials.Certificate('../analytics-c4f16-firebase-adminsdk-plgdk-2a3d65b3a8.json')

app = firebase_admin.initialize_app(cred)

db = firestore.client(app)
batch = db.batch()

In [4]:
# Disable OAuthlib's HTTPS verification when running locally.
# *DO NOT* leave this option enabled in production.

api_service_name = "youtube"
api_version = "v3"
DEVELOPER_KEY = os.getenv('DEVELOPER_KEY')

os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
youtube = googleapiclient.discovery.build(
    api_service_name, api_version, developerKey = DEVELOPER_KEY)

In [5]:
def check_nested_dict_keys(dic, keys):
    d = dic
    for key in keys:
        if key in d.keys():
            d = d[key]
            continue
        else:
            return dic['snippet']['thumbnails']['default']['url'] 
        
    return d

In [6]:
def db_collection_to_list(docs):
    collection = {}
    for doc in docs:
        collection[doc.id]=doc.to_dict()
    return collection

In [7]:
def get_video_db_list_filter(name,channelId,order_by=None):

    if order_by:
        docs = db.collection(u'{}'.format(name)).order_by(order_by, direction=firestore.Query.DESCENDING).where(u"channelID",u"==",u"{}".format(channelId)).stream()
    else:
        docs = db.collection(u'{}'.format(name)).where(u"channelID",u"==",u"{}".format(channelId)).stream()

    collection = []
    for doc in docs:
        vid =doc.to_dict()
        if "#shorts" not in vid['title']:
            collection.append(vid)
    return collection

In [8]:
def get_channel_db():
    docs = db.collection(u'channels').stream()
    channelsCollection = {}
    for doc in docs:
        channelsCollection[doc.id]=doc.to_dict()
    return channelsCollection

def generate_channel_json(sourceValue, source="url"):
    
    if source=="id":
        channel_id = sourceValue
    elif source=="url":
        video_url = sourceValue
        channel_id = video_url_to_channel_id(video_url)
    else:
        raise Exception("{} not a valid source.".format(source))
            
    request = youtube.channels().list(
            part="snippet,contentDetails",
            id=channel_id
        )
    response = request.execute()
    
    channel_df = {}#pd.DataFrame(columns=['channelId', 'title', 'uploads','description','thumbnail'])
    channel_df['channelId'] = channel_id
    channel_df['title'] = response['items'][0]['snippet']['title']
    channel_df['uploads'] = response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
    channel_df['description'] = response['items'][0]['snippet']['description']
    channel_df['thumbnail'] = check_nested_dict_keys(response['items'][0],["snippet","thumbnails","high","url"])

    #full_channel_df = pd.concat([full_channel_df, pd.DataFrame.from_dict(channel_df)])

    return channel_df

def video_url_to_channel_id(video_url):
    video_param_from_url = video_url.split("/watch?v=")[1].split("&")[0]
    request = youtube.videos().list(
            part="snippet,contentDetails",
            id=video_param_from_url
        )
    response = request.execute()
    channel_id = response['items'][0]['snippet']['channelId']
    return channel_id


def get_channel_doc(video_url, allChannels):
    channel_id = video_url_to_channel_id(video_url)
    print(channel_id)
    
    for record in allChannels.values():
        if record['channelId'] == channel_id:
            print("Channel {} already exists.".format(record['title']))
            return record

    channelToAdd = generate_channel_json(video_url)    


    add = input("Create channel for url ? (0 for no, any other key for yes)".format(channelToAdd['title']))

    if add == 0:
        print("No channel for url")
        return
    

    docRef = db.collection(u"channels").document(u"{}".format(channelToAdd['channelId'])).set(channelToAdd)
    print("Channel {} is created.".format(channelToAdd['title']))
    return channelToAdd


In [9]:
def get_uploads_id(youtube, channel_id):
    request = youtube.channels().list(
            part="snippet,contentDetails",
            id=channel_id
        )
    response = request.execute()
    return response['items'][0]['contentDetails']['relatedPlaylists']['uploads']

def get_uploaded_videos_response(youtube, channel_id,**args):
    
    playlist_id = get_uploads_id(youtube, channel_id)
    max_total=args.get("max_total") if args.get("max_total") else 1000
    oldest_date=args.get("oldest_date") if args.get("oldest_date") else False

    
    params = {
        "part":"snippet,contentDetails",
        "playlistId": playlist_id,
        "maxResults": 50
    }
    
    uploaded_videos_content_details_list = []

    while True:
        response = youtube.playlistItems().list(**params).execute()
        
        
        if oldest_date:
            oldestResponseDate = response.get('items')[-1]['snippet']['publishedAt'].split("T")[0]
            if oldestResponseDate <= oldest_date:
                add_response = []
                for r in response.get('items'):
                    date = r['snippet']['publishedAt'].split("T")[0]
                    if date <= oldest_date:
                        break
                    else:
                        add_response.append(r)
                videos_list = uploaded_videos_content_details_list + add_response
                print("New videos found: {}".format(len(videos_list)))
                print("Newest video: {}, {}".format(videos_list[0]['snippet']['title'],videos_list[0]['snippet']['publishedAt']))
                print("Oldest video: {}, {}".format(videos_list[-1]['snippet']['title'],videos_list[-1]['snippet']['publishedAt']))


      
        uploaded_videos_content_details_list = uploaded_videos_content_details_list + response.get('items')
        
        if (max_total and len(uploaded_videos_content_details_list) >= max_total):
            return uploaded_videos_content_details_list[:max_total]
     
        elif 'nextPageToken' in response.keys():
            params['pageToken'] = response['nextPageToken']
            continue
        else:
            break

    return uploaded_videos_content_details_list

def getYoutubeDuration(videoId):
    responseVideoDetails = youtube.videos().list( part="contentDetails",id=videoId).execute()
    print(videoId)
    durationResponse=responseVideoDetails['items'][0]['contentDetails']['duration']

    duration_string = durationResponse.replace('PT',"")
    number_values = re.findall('\d+',duration_string)
    symbols_available= ''.join([i for i in duration_string if not i.isdigit()])
    symbol_map = {}
    for symbol in 'HMS':
        index = symbols_available.find(symbol)
        if index > -1:
            symbol_map[symbol] = number_values[index]

    duration = 0


    for idx in symbol_map:
        if idx == "H":
            duration = int(symbol_map[idx])*60*60 + duration
        if idx == "M":
            duration = int(symbol_map[idx])*60 + duration
        if idx == "S":
            duration = int(symbol_map[idx]) + duration
            
    return duration
def video_response_list_to_list_of_dicts(response_list):
    
    list_of_dicts = []
    for x in response_list:
        if "#shorts" in x['snippet']['title']:
            continue
            
        video_df = {}
        video_df['id'] = x['id']
        video_df['videoId'] = x['contentDetails']['videoId'] 
        video_df['title'] = x['snippet']['title'] 
        video_df['description'] = x['snippet']['description'] 
        video_df['thumbnail'] = check_nested_dict_keys(x,["snippet","thumbnails","maxres","url"])      
        video_df['channelID'] = x['snippet']['channelId'] 
        video_df['videoId'] = x['contentDetails']['videoId'] 
        video_df['publishedAt'] =x['snippet']['publishedAt'].split("T")[0] 
        video_df['videoUrl'] = "https://www.youtube.com/watch?v={}".format(x['contentDetails']['videoId'] )
        video_df['duration'] = getYoutubeDuration(x['contentDetails']['videoId']  )
        
        video_df['audioUrl'] = ''
        video_df['assemblySentences'] ='' 
        video_df['assemblyText'] ='' 
        video_df['availability']=''
        video_df['assemblyId'] = ''
        video_df['ytaTranscript'] = ''
        video_df['mainTranscript'] = ''
        video_df['ytaAvailability'] = ''


        list_of_dicts.append(video_df)


    return list_of_dicts

In [10]:
def yta_transcript_to_dicts(video_id):
    response = YouTubeTranscriptApi.get_transcript(video_id)
    list_of_dicts = []
    
    for x in response:
        transcript_df={}
        transcript_df['text'] = x["text"]
        transcript_df['start'] = x["start"]
        transcript_df['videoId'] = video_id
        transcript_df['transcriptId'] = str(video_id)+"_"+str(x['start'])
  
        transcript_df['prevId'] = 0
        transcript_df['nextId'] = 0
        transcript_df['num_words'] = 0
        transcript_df['word_index'] = 0
        list_of_dicts.append(transcript_df)
        
    return list_of_dicts

In [11]:
def postTranscript(url):
    endpoint = "https://api.assemblyai.com/v2/transcript"
    json = {
        "audio_url": url,
        "auto_highlights": True,
        "iab_categories": True
    }
    headers = {
        "authorization": "d894d2a3c49040a49c6567373feb89e7",
    }
    response = requests.post(endpoint, json=json,headers=headers)
    return response.json()

In [12]:
def getCost(videoList):
    cost = 0
    for vid in videoList:
        cost = vid['duration']*0.00025 + cost
    return cost

In [13]:
def submitAssemblyTranscripts(video_list):
    ydl_opts = {'format': 'bestaudio'}


    ydl =youtube_dl.YoutubeDL(ydl_opts)
    pending_videos =[]
    for doc in video_list:
        videoUrl = "https://www.youtube.com/watch?v={}".format(doc['videoId'])
        info = ydl.extract_info(videoUrl, download=False)
        audioUrl = info['formats'][0]['url']
        doc['audioUrl'] = audioUrl
        
        try:
            response = getTranscript(doc['assemblyId'])
            if 'sentences' in response:
                print("Assembly Transcript already available for {} {}".format(doc['title'],doc['videoId']))
                continue
            
        except:
            doc_ref = db.collection(u'AssemblyTranscripts').document(u'{}'.format(doc['videoId']))

            doc = doc_ref.get()
            if doc.exists:
                print("Assembly Transcript already available for {} {}".format(doc['title'],doc['videoId']))
                continue
        requestForTrancript = postTranscript(audioUrl)
        pending_videos.append(doc)
        doc['assemblyId'] = requestForTrancript['id']
        doc['availability'] = 'pending'
        docRef = db.collection(u"videos").document(u"{}".format(doc['videoId'])).set(doc)
        db.collection(u"AssemblyTranscripts").document(u"{}".format(doc['videoId'])).set(doc) 
        print("submitted {} of time {}".format(doc['title'],doc['duration']))

    print("{} videos added.".format(len(pending_videos)))


In [14]:
def getTranscript(transcriptId):
    endpoint = "https://api.assemblyai.com/v2/transcript/{}/sentences".format(transcriptId)
    headers = {
        "authorization": "d894d2a3c49040a49c6567373feb89e7",
    }
    json = {
        "auto_highlights": True,
        "iab_categories": True
    }
    response = requests.get(endpoint, json=json,headers=headers)
    return response.json()


In [15]:
def pullAssemblyTranscripts(video_list):
    failed_videos = []
    good_videos = []
    for vid in video_list:
        response = getTranscript(vid['assemblyId'])
        if 'error' in response:
            print("ERROR: {} {}".format(vid['videoId'], vid['title']))
            failed_videos.append(vid)
            vid['availability'] = 'error'
            docRef = db.collection(u"videos").document(u"{}".format(vid['videoId'])).set(vid)
            continue
            
        if 'sentences' in response:
            sentences = response['sentences']
            cleanResponse = [record.pop('words') for record in sentences]
            sentencesList = [record['text'] for record in sentences]
            vid['assemblyText'] = " ".join(sentencesList)
            vid['assemblySentences'] = json.dumps(sentences, default=obj_dict)
            vid['availability'] = 'available.assembly'
            vid['mainTranscript'] = vid['assemblyText']

            docRef = db.collection(u"videos").document(u"{}".format(vid['videoId'])).set(vid)
            print("Transcript update for {}".format(vid['title']))
            
            good_videos.append(vid)
        else:print(response)
    return failed_videos, good_videos

In [51]:

def yta_transcript_to_dicts(video_id):
    response = YouTubeTranscriptApi.get_transcript(video_id)
    list_of_dicts = []

    for x in response:
        transcript_df={}
        transcript_df['text'] = x["text"]
        transcript_df['start'] = x["start"]
        transcript_df['videoId'] = video_id

        list_of_dicts.append(transcript_df)
        
    return list_of_dicts
def pullYtaTranscripts(video_list):
    failed_videos = []
    good_videos = []

    for vid in video_list:
        
        try:
            response = yta_transcript_to_dicts(vid['id'])
          
            vid['transcript'] = " ".join(response['text'])
            vid['mainTranscript'] =  vid['transcript'] if vid['assemblyText'] == '' else vid['assemblyText']

            docRef = db.collection(u"videos").document(u"{}".format(vid['videoId'])).set(vid)
            print("Transcript update for {}".format(vid['title']))
            
        except :

            print("ERROR: {} {}".format(vid['videoId'], vid['title']))
            failed_videos.append(vid)
            vid['ytaAvailability'] = 'error'
            docRef = db.collection(u"videos").document(u"{}".format(vid['videoId'])).set(vid)
            continue
            
    return failed_videos,good_videos



In [17]:
def getAllTranscripts():
    endpoint = "https://api.assemblyai.com/v2/transcript"
    headers = {
        "authorization": "d894d2a3c49040a49c6567373feb89e7",
    }
    response = requests.get(endpoint, headers=headers).json()
    transcripts = response['transcripts']
    eq1=False
    i=0
    while response['page_details']['prev_url']:
        endpoint = response['page_details']['prev_url']#response['page_details']['next_url']
        response = requests.get(endpoint, headers=headers).json()
        transcripts = transcripts + response['transcripts']
        i=i+1
    return transcripts

In [18]:
def pullChannel(url, channels_collection):
    
    #Create channel
    channel = get_channel_doc(url,channels_collection)
    
    #check for videos not yet uploaded
    new_video_responses = get_uploaded_videos_response(youtube, channel['channelId'],
                                               max_date=videos_collections[-1]['publishedAt'])
    list_of_new_video_responses_as_dicts = video_response_list_to_list_of_dicts(new_video_responses)
    
    #Upload new videos
    add = input("Input {} videos?".format(len(list_of_new_video_responses_as_dicts)))
    for doc in list_of_video_responses_as_dicts:
        docRef = db.collection(u"videos").document(u"{}".format(doc['videoId'])).set(doc)
        
    return channel

    #add videos to transcript execution

In [19]:
channels_collection = get_channel_db()
#docs = db.collection(u'{}'.format(name)).order_by(order_by, direction=firestore.Query.DESCENDING).where(u"channelID",u"==",u"{}".format(channelId)).stream()

In [20]:
channel = get_channel_doc("https://www.youtube.com/watch?v=dbOXYhjpXAc&t=937s&ab_channel=ColinandSamir", channels_collection)

UCamLstJyCa-t5gfZegxsFMw
Channel Colin and Samir already exists.


In [32]:
videos_collections=get_video_db_list_filter('videos',channel['channelId'],order_by='publishedAt')#load all videos

In [22]:
new_video_responses = get_uploaded_videos_response(youtube, channel['channelId'])
list_of_new_video_responses_as_dicts = video_response_list_to_list_of_dicts(new_video_responses)

1t5oYKEn-1E
gU24yrixXD8
IjoTYJNr8DA
xp2VGAjHZWY
3QZkjsMfj3I
gX0mrw1uZcY
NphQGsm4rvk
cbBxEmGOfk4
lBCOOTyU46M
OE4ti4alRN8
w4iUuroktxw
yNLqaQ6slkw
N5YW4JB07-8
gGBCbswZbnI
0bC1ah_x8zo
hYaGD0V2OkE
wkDlfvTed1c
7qoe2qhcZ-Y
88067BiKU4Y
jzMsnNxzejI
iq71Cb2jEIE
pvtMJFPyiLM
3vbZvRHpM8w
7hrSj5qkHv4
VbNIh88Nq5k
XuVR_elE1Pw
r8bzWKBvZsE
9CxaZWkwHzE
x5lBJE2Ok8E
z_czmz_bJqk
irIN7SHvLDc
t69cK3Ih_Og
BB2HTaXTy0k
HmrjOq8epsg
MLyEDp7e0Q8
9cn_r1z6zjo
o8UBXsiiS24
SpdWaOngRWM
jz_qFyTrS8w
Dh909TbYn7Y
G5c6qof96DM
UZSwDZ72Lp8
dbOXYhjpXAc
YVuIm8OLz-8
r-Y1LRtsFaU
_TAxmgPQtzc
vwtRRdmZSYw
xIC1nS4DU6M
JoI4BRPd8us
PiGCHXt5eBs
6OICqRSTRjY
71Xz6bYoRGc
7PIlp-FSN5I
vDGnnLLXGTo
c8VcUnz3nVc
TW7cH2wLv00
dkIoObZOMbo
BVEzAyZXctY
dwFBlFS4dus
p0Y82yGFqAw
nxyThD3-GTw
wUOdBXYIMM8
pkiW9ygtddA
O0C417mFrRQ
ZlR0Rsu_VeU
TAFYbWgX8K0
rcHNOkA9NNk
lQyAdgLwbLk
HGhT5GMhf8A
WVI2GUdJp2Y
9w0kTRvOQWA
j7eETkQcHLA
5EfQqwDmOnI
7eWpg3WyelA
lBGq8vWx7B0
lGmrpX7KkcU
pVhrApLIraI
f5WZgw0WGto
Fz27v1gnCFM
BSXMM9Y1blk
TBx1uYdq5nk
Xy0cKyWL2J8
6R4zxswb8gQ
knl2

In [23]:
existing_ids = [vid['videoId'] for vid in videos_collections]
additional_responses = [v for v in list_of_new_video_responses_as_dicts if v['videoId'] not in existing_ids]

In [24]:
len(additional_responses)

0

In [None]:
# for doc in additional_responses:
#     docRef = db.collection(u"videos").document(u"{}".format(doc['videoId'])).set(doc)
# videos_collections.append(doc)

In [35]:
for doc in videos_collections:
    if 'mainTranscript' not in doc:
        doc['mainTranscript']=''
        docRef = db.collection(u"videos").document(u"{}".format(doc['videoId'])).set(doc)

In [37]:
# test = [v for v in  videos_collections if v['videoId'] == 'gX0mrw1uZcY']
# videos_wo_transcript=test
# videos_collections=test
videos_wo_transcript = [v for v in videos_collections if (v['mainTranscript'] == '')]


In [38]:
len(videos_wo_transcript)

255

In [52]:
f,g = pullYtaTranscripts(videos_wo_transcript[:5]) 

In [53]:
len(f),len(g)

(5, 0)

In [None]:
#Add assembly

#if there is videos without assembly, select from them
videos_wo_assembly_transcript = [v for v in videos_collections if ((v['assemblyText'] == '') and (v['availability'] != 'pending')) ]
list_of_videos_less_30 = [v for v in videos_wo_assembly_transcript if v['duration'] <= 35*60]
list_of_videos_for_assembly = list_of_videos_less_30[::2][4]
list_of_videos_for_assembly_ids = [v['videoId'] for v in list_of_videos_for_assembly ]
remaining_videos = [v for v in videos_wo_transcript if v['videoId'] not in list_of_videos_for_assembly_ids]

In [None]:
#TODO: pull pending

In [None]:
round(getCost(list_of_videos_for_assembly),6), len(list_of_videos_for_assembly)

In [None]:
submitAssemblyTranscripts(test)

In [None]:
list_of_new_pending_assembly_videos = [v for v in test if (v['availability'] =='pending')]

In [None]:
len(list_of_new_pending_assembly_videos)

In [None]:
f,g = pullAssemblyTranscripts(list_of_new_pending_assembly_videos) 

In [None]:
len(f),len(g)

In [None]:
f,g = pullYtaTranscripts(f) 

In [None]:
#visualize in webapp
#Use cases
##find creators with certain categories
##Correlate topics with likes/comments/engagement
##See when

In [None]:
#nothing
#has yta
#has assembly
#has assembly pending