In [2]:
#Mount Gdrive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [11]:
#Download caption file here https://huggingface.co/datasets/OpenGVLab/InternVid/tree/main "caption.jsonl"    #Copy your caption file from GDRIVE to "/content/YouTubeClipDownloader/caption.jsonl" this assumes file is in root of GDRIVE
!mkdir /content/YouTubeClipDownloader/
!cp /content/drive/MyDrive/caption.jsonl /content/YouTubeClipDownloader/caption.jsonl

mkdir: cannot create directory ‘/content/YouTubeClipDownloader/’: File exists


In [None]:
# Mount Gdrive
from google.colab import drive
drive.mount('/content/drive')

# Download caption file from Gdrive to "/content/YouTubeClipDownloader/caption.jsonl"
# Replace the file path "/content/drive/MyDrive/caption.jsonl" with the actual path to your caption file on Gdrive
!mkdir /content/YouTubeClipDownloader/
!cp /content/drive/MyDrive/caption.jsonl /content/YouTubeClipDownloader/caption.jsonl

!pip install yt-dlp
import os
import subprocess
import json
import yt_dlp
import cv2
from concurrent.futures import ThreadPoolExecutor, as_completed

class YouTubeClipDownloader:
    def __init__(self, database_path, download_dir, json_output_path):
        self.database_path = database_path
        self.download_dir = download_dir
        self.json_output_path = json_output_path
        self.successful_downloads = 0

    def get_frame_count(self, video_path):
        # Open the video and get its properties
        cap = cv2.VideoCapture(video_path)
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        cap.release()
        return frame_count

    def shorten_caption(self, caption, max_length=50):
        # Truncate the caption to fit within the specified max_length
        if len(caption) > max_length:
            caption = caption[:max_length - 3] + "..."
        return caption

    def download_clip(self, clip_info):
        youtube_id = clip_info["YoutubeID"]
        start_time = clip_info["Start_timestamp"]
        end_time = clip_info["End_timestamp"]
        youtube_link = f"https://www.youtube.com/watch?v={youtube_id}"

        caption = clip_info["Caption"]  # Get the caption for the file name

        # Truncate the caption to fit within the maximum filename length (e.g., 50 characters)
        truncated_caption = self.shorten_caption(caption, max_length=50)

        # Modify the output filename to include 'YOUTUBEID_CAPTIONHERE'
        output_filename = f"{youtube_id}_{truncated_caption}.mp4"
        output_path = os.path.join(self.download_dir, output_filename)

        # Check if the output file already exists; if yes, skip downloading this clip
        if os.path.exists(output_path):
            print(f"Skipping {youtube_id}. File already exists.")
            return

        print(f"Downloading clip: {youtube_id}")
        try:
            # Download the clip using yt-dlp
            command = [
                "yt-dlp",
                "--verbose",
                "--no-progress",
                "--format", "bestvideo[ext=mp4]+bestaudio[ext=m4a]/mp4",
                "--output", output_path,
                "--postprocessor-args", f"-ss {start_time} -to {end_time}",
                youtube_link
            ]
            subprocess.check_call(command)
        except subprocess.CalledProcessError as e:
            print(f"Error occurred while downloading {youtube_id}: {str(e)}")
            return
        except Exception as ex:
            print(f"Unknown error occurred while downloading {youtube_id}: {str(ex)}")
            return

        # Count the frames of the downloaded clip
        num_frames = self.get_frame_count(output_path)

        # Create the data entry for the current video clip
        data_entry = {
            "video_path": output_path,
            "num_frames": num_frames,
            "data": []
        }

        # Add the prompt data for each frame from the input JSONL
        for frame_index in range(1, num_frames + 1):
            data_entry["data"].append({
                "frame_index": frame_index,
                "prompt": caption  # Use the full caption in the frame index entry
            })

        # Increment the successful_downloads counter
        self.successful_downloads += 1

        return data_entry

    def download_clips(self):
        if not os.path.exists(self.download_dir):
            os.makedirs(self.download_dir)

        output_data = []
        max_threads = 4  # Set the number of threads to 16 for maximum concurrency

        with open(self.database_path, "r") as file:
            # Use ThreadPoolExecutor to create a pool of worker threads
            with ThreadPoolExecutor(max_threads) as executor:
                futures = []
                for line in file:
                    clip_info = json.loads(line)
                    future = executor.submit(self.download_clip, clip_info)
                    futures.append(future)

                for future in as_completed(futures):
                    data_entry = future.result()
                    if data_entry:
                        output_data.append(data_entry)

                    if self.successful_downloads % 5 == 0:
                        self.write_to_output_json(output_data)
                        output_data = []

        # Write the remaining data to the output JSON file
        if output_data:
            self.write_to_output_json(output_data)

    def write_to_output_json(self, data):
        # Save the data in the desired format to the output JSON file
        with open(self.json_output_path, "a") as json_file:
            for entry in data:
                json.dump(entry, json_file, indent=4)
                json_file.write("\n")  # Add a newline after each entry

if __name__ == "__main__":
    database_path = "/content/YouTubeClipDownloader/caption.jsonl"
    download_dir = "/content/videos"
    json_output_path = "/content/output.json"

    downloader = YouTubeClipDownloader(database_path, download_dir, json_output_path)
    downloader.download_clips()

