In [21]:
import tensorflow as tf
import os
from defination import fvecs_write, fvecs_read
from googleapiclient.discovery import build
from urllib.parse import urlparse, parse_qs
import json
import numpy as np
import requests
import re
import concurrent.futures
from googleapiclient.errors import HttpError

In [22]:

def get_element_by_id(id, timeout=3):
    """
    Access a webpage and extract information using an element ID.
    """
    try:
      url = f'http://data.yt8m.org/2/j/i/{id[:2]}/{id}.js'
      # set the user agent to avoid 403 forbidden
      headers = {
          'User-Agent': 'Mozilla/5.0'
      }
      # set max 
      response = requests.get(url, headers=headers, timeout=timeout)
      # response = requests.get(url)
      if response.status_code == 200:
          pattern = re.compile(r'i\(".*?","(.*?)"\);')
          matches = pattern.findall(response.text)
          if matches:
              return matches[0]
          else:
            #   print("No matches found.")
              return None
      else:
        #   print(f"Failed to retrieve the webpage. Status code: {response.status_code}")
          return None
      
    except requests.Timeout:
      print(f"Timeout occurred for {url}")
      return None
    except requests.RequestException as e:
      print(f"Error fetching {url}: {e}")
      return None
    
class YouTubeAPIKeyManager:
    def __init__(self, api_keys):
        self.api_keys = api_keys
        self.current_key_index = 0

    def get_current_key(self):
        return self.api_keys[self.current_key_index]

    def get_cur_key_idx(self):
        return self.api_keys[self.current_key_index], self.current_key_index

    def switch_to_next_key(self, idx):
        self.current_key_index = idx + 1
        if self.current_key_index >= len(self.api_keys):
            return False
        return True

    def out_of_key(self):
        return self.current_key_index >= len(self.api_keys)


def parse_tfrecord(example_proto):
    # Define the feature description dictionary
    feature_description = {
        'id': tf.io.FixedLenFeature([], tf.string),
        'labels': tf.io.VarLenFeature(tf.int64),
        'mean_rgb': tf.io.FixedLenFeature([1024], tf.float32),   # for video-level
        'mean_audio': tf.io.FixedLenFeature([128], tf.float32)   # for video-level
    }
    return tf.io.parse_single_example(example_proto, feature_description)

def get_file_list(dir, prefix):
    files = []
    for file in os.listdir(dir):
        if file.endswith(".tfrecord") and file.startswith(prefix):
            files.append(dir + file)
    return files


def get_comment_count(api_key_manager, id, video_id, index, file):
    """
    Retrieves the comment count for a YouTube video.
    """
    if api_key_manager.out_of_key():
        record = records(valid=False, valid_key=False)
        return record
    if not video_id:
        print("Invalid YouTube URL.")
        return records(valid=False, valid_key=True)
    
    api_key, key_idx = api_key_manager.get_cur_key_idx()
    youtube = build('youtube', 'v3', developerKey=api_key)
    
    try:
        request = youtube.videos().list(
            part='statistics,snippet',
            id=video_id
        )
        response = request.execute()
        if 'items' in response and len(response['items']) > 0:
            statistics = response['items'][0]['statistics']
            comment_count = statistics.get('commentCount', 0)
            subscriber_count = statistics.get('subscriberCount', 0)
            view_count = statistics.get('viewCount', 0)
            like_count = statistics.get('likeCount', 0)

            snippet = response['items'][0]['snippet']
            publish_date = snippet.get('publishedAt', None)
            record = records(id=id, real_id=video_id, idx=index, comment=int(comment_count), subscriber=int(subscriber_count), view=int(view_count), like=int(like_count), date=publish_date, file=file)
            return record
        else:
            # print("Video not found or is private.")
            record = records(valid=False, valid_key=True)
            return record
        
        # record = records(video_id, 1, 1, 222, 22, "101010", file) 
        # return record
    except HttpError as e:
        if e.resp.status == 403:
            error_message = e.content.decode('utf-8')
            if 'quota' in error_message:
                print(f"API key {api_key} has run out of quota.")
            else:
                print(f"API key {api_key} is invalid or access is forbidden.")
            print("switching to next key")
            api_key_manager.switch_to_next_key(key_idx)
            return get_comment_count(api_key_manager, id, video_id, index, file) # keep retrying with the next key
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        exit()


class records:
    def __init__(self, id='x', real_id='x', idx=0, comment=0, subscriber=0, view=0, like=0, date=0, file='x', valid=True, valid_key=True):
        self.id = id
        self.real_id = real_id
        self.idx = idx
        self.comment = comment
        self.subscriber = subscriber
        self.view = view
        self.like = like
        self.date = date
        self.file = file
        self.valid = valid
        self.valid_key = valid_key

def get_attr_parallel(api_key_manager, idx, data, file):
    if api_key_manager.out_of_key():
        record = records(False, True)
        return record

    id = data['id'].numpy().decode('utf-8')
    # id to url
    real_id = get_element_by_id(id)
    if real_id is None:
        record = records(False, True)
        return record
    #   print("video_id: ", id, " real_id: ", real_id)
    # valid = True
    # infos = [0,0,0,0,True]
    record = get_comment_count(api_key_manager, id, real_id, idx, file)
    return record

In [23]:

# data_size = 1000000
# query_size = 10000
# train_size = 10000

data_size = 1000000
# query_size = 10
# train_size = 10
total_size = data_size

output_rgb_file = "/mnt/data/mocheng/dataset/youtube/data_rgb.fvecs"
output_audio_file = "/mnt/data/mocheng/dataset/youtube/data_audio.fvecs"
output_comments_file = "/mnt/data/mocheng/dataset/youtube/data_comments.json"
output_subscribers_file = "/mnt/data/mocheng/dataset/youtube/data_subscribers.json"
output_views_file = "/mnt/data/mocheng/dataset/youtube/data_views.json"
output_likes_file = "/mnt/data/mocheng/dataset/youtube/data_likes.json"
output_publish_dates_file = "/mnt/data/mocheng/dataset/youtube/data_publish_dates.json"
output_info_file = "/mnt/data/mocheng/dataset/youtube/info.json"
progress_file = "/mnt/data/mocheng/dataset/youtube/progress.json"
# youtube key
key1 = 'AIzaSyC7zEJzQyhg5BmKpIMQzo_VNq-f0nQ7CP4'
key2 = 'AIzaSyBCJKs2cfDAvRjY4vJ1Rk2eM6KWzHO-shU'
key3 = 'AIzaSyBxNBPCjWyg8nnQbXpOiVUJcRbs4lc5BpY'
key4 = 'AIzaSyCXhimXqB0c4hCc2yCoiZOEM5skpilba2Q'
key5 = 'AIzaSyAc3H2lHMd9SjZk4SUaJmeuLhg0sw7ZIUA'
key6 = 'AIzaSyBjd-UyU9vON04jNcPZ9W3MFyRKmNKecL0'
key7 = 'AIzaSyCCrwuDuw_XrHiZ6ZJBfDt9zRn7FpFNAYM'
key8 = 'AIzaSyBMCFg1M7tgiNdg7_T_EseEsb587TX6FZ8'
key9 = 'AIzaSyBnL9WEcknR5-WVnFEZO1_h5wQqg2KyWoE'

api_keys = [key2, key3, key4]  # Replace with your actual API keys
api_key_manager = YouTubeAPIKeyManager(api_keys)
# Load a dataset from TFRecord files
dir = "/mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/"
# file = "/mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train0702.tfrecord"

# load files
files = get_file_list(dir, "train")

In [24]:

# wrap mean_rgb into numpy array
mean_rgb = []
mean_audio = []
comment_counts = []                          
subscriber_counts = []
view_counts = []
like_counts = []
publish_dates = []
loaded_infos = []

show_cnt = 0
show = 1

read_file = []
file_cnts = []

In [25]:

# load file if progress file exists
if os.path.isfile(progress_file):
    print("load progress file, and temporary data")
    with open(progress_file, 'r') as f:
        progress = json.load(f)
        read_file = progress["read_file"]
        file_cnts = progress["file_cnts"]
    # load data
    mean_rgb = fvecs_read(output_rgb_file).tolist()
    mean_audio = fvecs_read(output_audio_file).tolist()
    with open(output_comments_file, 'r') as f:
        comment_counts = json.load(f)
    with open(output_subscribers_file, 'r') as f:
        subscriber_counts = json.load(f)
    with open(output_views_file, 'r') as f:
        view_counts = json.load(f)
    with open(output_likes_file, 'r') as f:
        like_counts = json.load(f)
    with open(output_publish_dates_file, 'r') as f:
        publish_dates = json.load(f)
    with open(output_info_file, 'r') as f:
        loaded_infos = json.load(f)

    print("load data from progress file, size=", len(mean_rgb))

load progress file, and temporary data
load data from progress file, size= 108404


In [34]:
print(len(loaded_infos))
print(len(mean_rgb))
print(loaded_infos[100000])
print(comment_counts[100000])
print(subscriber_counts[100000])
print(view_counts[100000])
print(like_counts[100000])
print(publish_dates[100000])

for  i in range(100000):
    if subscriber_counts[i] > 0:
        print("subscriber: ", i, subscriber_counts[i])

108404
108404
['EDk7', 'cwbYiGed3OU', 674, '/mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train0679.tfrecord']
239
0
247039
8952
2013-10-17T14:15:16Z


In [None]:

# iterate in reverse order
for idx, file in enumerate(files):
    if file in read_file:
        print("skip file ", file)
        continue
    
    raw_dataset = tf.data.TFRecordDataset(file)
    parsed_dataset = raw_dataset.map(parse_tfrecord)
    
    dataset = list(parsed_dataset)
    if len(mean_rgb) == total_size:
        break
    
    timeout = 5

    results = [None] * len(dataset)
    with concurrent.futures.ThreadPoolExecutor() as executor:
      attrs = {executor.submit(get_attr_parallel, api_key_manager, idx2, data, file): idx2 for idx2, data in enumerate(dataset)}
      for future in concurrent.futures.as_completed(attrs):
        try:
            index = attrs[future]
            result = future.result()
            if(result.valid and result.valid_key):
                # print(f"Got data suc for index {index}")
                results[index] = result
            if not result.valid_key:
                print(f"Key ran out. Terminating other threads due to failure at index {index}.")
                results[index] = None
                break
            
        except Exception as e:
            print(f"Error processing at index {index}: {e}")
            exit()
            # results[index] = result
    
    valid_result = []
    for idx3, result in enumerate(results):
        if result is None:
            continue
        if result.valid and result.valid_key:
            valid_result.append(result)
    print("got data size:", len(valid_result), " valid percentage: ", len(valid_result)/len(dataset))

    if(total_size <= len(mean_rgb)+len(valid_result)):
        valid_result = valid_result[: total_size - len(mean_rgb)]
    
    for result in valid_result:
        idx = result.idx
        rgb = dataset[idx]['mean_rgb'].numpy()
        audio = dataset[idx]['mean_audio'].numpy()
        mean_rgb.append(rgb)
        mean_audio.append(audio)
        loaded_infos.append([result.id, result.real_id, result.idx, result.file])
        comment_counts.append(result.comment)
        subscriber_counts.append(result.subscriber)
        view_counts.append(result.view)
        like_counts.append(result.like)
        publish_dates.append(result.date)


    
    if not api_key_manager.out_of_key():
        read_file.append(file)
        file_cnts.append(len(valid_result))

    # save data and current progress when each file done, to ensure security
    print("save data and current progress")
    # save data
    fvecs_write(output_rgb_file, np.array(mean_rgb))
    fvecs_write(output_audio_file, np.array(mean_audio))
    with open(output_comments_file, 'w') as f:
        json.dump(comment_counts, f, indent=1)
    with open(output_subscribers_file, 'w') as f:
        json.dump(subscriber_counts, f, indent=1)
    with open(output_views_file, 'w') as f:
        json.dump(view_counts, f, indent=1)
    with open(output_likes_file, 'w') as f:
        json.dump(like_counts, f, indent=1)
    with open(output_publish_dates_file, 'w') as f:
        json.dump(publish_dates, f, indent=1)
    # save progress
    with open(progress_file, 'w') as f:
        f.write(json.dumps({"read_file": read_file, "file_cnts": file_cnts}, indent=1)) # file cnts is useless, if a file did not finish, re load it
    with open(output_info_file, 'w') as f:
        json.dump(loaded_infos, f, indent=1)
    print("process done for file ", file, " got size ", len(mean_rgb))
            
    if(len(mean_rgb) >= total_size):
        print("task done for ", total_size, " items")
        exit()
    if(api_key_manager.out_of_key()):
        print("key out of usage, ", len(mean_rgb), " stored")
        exit()




skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train0099.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train3815.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train1288.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train2505.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train1821.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train0591.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train2440.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train2417.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train3293.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train3483.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train2615.tfrecord
skip file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train2302.tfrecord
skip

I0000 00:00:1731233571.115531 2511889 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 7778 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3090, pci bus id: 0000:b1:00.0, compute capability: 8.6
I0000 00:00:1731233571.115924 2511889 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 7769 MB memory:  -> device: 1, name: NVIDIA GeForce RTX 3090, pci bus id: 0000:ca:00.0, compute capability: 8.6
2024-11-10 18:12:51.300635: I tensorflow/core/kernels/data/tf_record_dataset_op.cc:370] TFRecordDataset `buffer_size` is unspecified, default to 262144
2024-11-10 18:12:53.455304: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


Timeout occurred for http://data.yt8m.org/2/j/i/1U/1UBy.js
Timeout occurred for http://data.yt8m.org/2/j/i/0d/0dBy.js
An unexpected error occurred: timed out
Error processing at index 9: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 59: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 72: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 112: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 138: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 146: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 143: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 170: 'NoneType' object

2024-11-10 18:15:46.592760: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


Timeout occurred for http://data.yt8m.org/2/j/i/wN/wNyc.js
Timeout occurred for http://data.yt8m.org/2/j/i/UV/UVyc.js
Timeout occurred for http://data.yt8m.org/2/j/i/y7/y7yc.js
An unexpected error occurred: timed out
Error processing at index 64: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 107: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 114: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 134: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 170: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 224: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 228: 'NoneType' object has no attribute 'valid'
An unexpected error occurr

2024-11-10 18:21:01.449789: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


An unexpected error occurred: timed out
Error processing at index 14: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 4: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 9: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 18: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 41: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 27: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 34: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 64: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 136: 'NoneType' object has no at

2024-11-10 18:37:50.981825: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


Timeout occurred for http://data.yt8m.org/2/j/i/8u/8uBC.js
An unexpected error occurred: timed out
Error processing at index 12: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 29: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 33: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 2: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 53: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 51: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 71: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 101: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed o

2024-11-10 19:05:46.681867: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


An unexpected error occurred: timed out
Error processing at index 16: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 20: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 23: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 51: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 45: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 38: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 76: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 102: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 105: 'NoneType' object has no

2024-11-10 20:01:10.735142: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


An unexpected error occurred: timed out
Error processing at index 28: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 32: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 3: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 39: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 52: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 127: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 130: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 161: 'NoneType' object has no attribute 'valid'
An unexpected error occurred: timed out
Error processing at index 165: 'NoneType' object has n

2024-11-10 20:11:28.885356: I tensorflow/core/framework/local_rendezvous.cc:405] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


got data size: 1021  valid percentage:  1.0
save data and current progress
process done for file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train2230.tfrecord  got size  70042
key out of usage,  70042  stored
got data size: 1000  valid percentage:  1.0
save data and current progress
process done for file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train0441.tfrecord  got size  71042
key out of usage,  71042  stored
got data size: 998  valid percentage:  1.0
save data and current progress
process done for file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train3018.tfrecord  got size  72040
key out of usage,  72040  stored
got data size: 985  valid percentage:  1.0
save data and current progress
process done for file  /mnt/data/mocheng/dataset/youtube-8m/data/yt8m/video/train3210.tfrecord  got size  73025
key out of usage,  73025  stored
got data size: 983  valid percentage:  1.0
save data and current progress
process done for file  /mnt/data/mocheng/dataset

KeyboardInterrupt: 

: 

In [None]:
print(len(mean_rgb))

65
