<a href="https://colab.research.google.com/github/yugendrasai-ui/ai-video-pipeline/blob/main/AI_Video_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [56]:
# Install all required libraries
!pip install google-generativeai edge-tts moviepy requests opencv-python-headless pydub openai-whisper ffmpeg-python nest_asyncio




In [57]:
# Import Colab secrets
from google.colab import userdata
import google.generativeai as genai

# Load Gemini API Key
GEMINI_API_KEY = userdata.get("GEMINI_API_KEY")

# Load Pexels API Key
PEXELS_KEY = userdata.get("PEXELS_KEY")

# Load Pixabay API Key
PIXABAY_API_KEY = userdata.get("PIXABAY_API_KEY")


# Validate Keys
if not GEMINI_API_KEY:
    raise ValueError("Gemini Key Missing")

if not PEXELS_KEY:
    raise ValueError("Pexels Key Missing")

if not PIXABAY_API_KEY:
    raise ValueError("Pixabay Key Missing")


print("‚úÖ All API Keys Loaded")


# Configure Gemini
genai.configure(api_key=GEMINI_API_KEY)


‚úÖ All API Keys Loaded


In [58]:
# Load Gemini Model
model = genai.GenerativeModel("models/gemini-2.5-flash")

print("‚úÖ Gemini Ready")


‚úÖ Gemini Ready


In [59]:
def generate_script(topic):
    """
    Generate YouTube voice script using Gemini
    """

    prompt = f"""
    Write a natural YouTube voice script about "{topic}"

    Rules:
    - Simple English
    - Friendly tone
    - No headings
    - No bullet points
    - About 1 minute
    """

    response = model.generate_content(prompt)

    return response.text.strip()



In [60]:
import re

def clean_script(text):
    """
    Remove extra symbols and spaces from script
    """

    text = re.sub(r"[#*_~]", "", text)
    text = re.sub(r"[!?:;]+", ".", text)
    text = re.sub(r"\s+", " ", text)

    return text.strip()


In [61]:
import edge_tts
import asyncio
import nest_asyncio

nest_asyncio.apply()


async def text_to_voice(text):
    """
    Convert script to voice using Edge TTS
    """

    voice = edge_tts.Communicate(
        text,
        voice="en-IN-NeerjaNeural"
    )

    await voice.save("voice.mp3")

    print("‚úÖ Voice Created")


In [62]:
import requests

def download_image(url, filename):
    """
    Download image from URL
    """

    try:
        data = requests.get(url, timeout=15).content

        with open(filename, "wb") as f:
            f.write(data)

        return True

    except:
        return False


In [63]:
from urllib.parse import quote
import time


def fetch_from_pexels(topic, count=10, retries=3):
    """
    Try Pexels API (3 retries)
    """

    print("üîç Trying Pexels...")

    query = quote(topic)

    url = f"https://api.pexels.com/v1/search?query={query}&per_page={count}"

    headers = {
        "Authorization": PEXELS_KEY
    }

    for attempt in range(1, retries+1):

        try:

            print(f"‚û°Ô∏è Pexels Attempt {attempt}")

            res = requests.get(url, headers=headers, timeout=15)

            if res.status_code != 200:
                raise Exception("Status " + str(res.status_code))

            data = res.json()

            photos = data.get("photos", [])

            if not photos:
                raise Exception("No Images")

            images = []

            for i, photo in enumerate(photos):
                img_url = photo["src"]["large"]
                name = f"img_{i}.jpg"

                if download_image(img_url, name):
                    images.append(name)

            print("‚úÖ Pexels Success")

            return images


        except Exception as e:

            print("‚ö†Ô∏è Pexels Error:", e)

            if attempt < retries:
                time.sleep(2)


    print("‚ùå Pexels Failed")

    return []


In [64]:
def fetch_from_pixabay(topic, count=10):
    """
    Fallback to Pixabay
    """

    print("üîÑ Switching to Pixabay...")

    query = quote(topic)

    url = f"https://pixabay.com/api/?key={PIXABAY_API_KEY}&q={query}&image_type=photo&per_page={count}"

    try:

        res = requests.get(url, timeout=15)

        if res.status_code != 200:
            raise Exception("Status " + str(res.status_code))

        data = res.json()

        hits = data.get("hits", [])

        if not hits:
            raise Exception("No Images")

        images = []

        for i, hit in enumerate(hits):
            img_url = hit["largeImageURL"]
            name = f"img_{i}.jpg"

            if download_image(img_url, name):
                images.append(name)

        print("‚úÖ Pixabay Success")

        return images


    except Exception as e:

        print("‚ùå Pixabay Error:", e)

        return []


In [78]:
def get_images(topic, count=10):
    """
    Try Pexels first, if fails use Pixabay
    """

    images = fetch_from_pexels(topic, count)

    if not images:
        images = fetch_from_pixabay(topic, count)

    if not images:
        raise Exception("‚ùå No Images Found")

    print("üì∏ Total Images:", len(images))

    return images


In [66]:
from moviepy.editor import *
import cv2


def make_video(images, audio_file):
    """
    Create video using MoviePy
    """

    audio = AudioFileClip(audio_file)

    duration = audio.duration

    img_time = duration / len(images)

    clips = []

    for img in images:

        frame = cv2.imread(img)

        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        frame = cv2.resize(frame, (1280, 720))

        clip = ImageClip(frame).set_duration(img_time)

        clips.append(clip)


    video = concatenate_videoclips(clips)

    final = video.set_audio(audio)

    final.write_videofile(
        "final_video.mp4",
        fps=24,
        codec="libx264",
        audio_codec="aac"
    )

    return "final_video.mp4"


In [67]:
import whisper


def format_time(sec):

    ms = int((sec - int(sec)) * 1000)
    s = int(sec) % 60
    m = (int(sec) // 60) % 60
    h = int(sec) // 3600

    return f"{h:02}:{m:02}:{s:02},{ms:03}"


def generate_subtitles(audio_file, out_file="subtitles.srt"):

    print("üéß Loading Whisper...")

    model = whisper.load_model("base")

    print("üìù Transcribing...")

    result = model.transcribe(audio_file)

    with open(out_file, "w", encoding="utf-8") as f:

        for i, seg in enumerate(result["segments"], 1):

            start = format_time(seg["start"])
            end = format_time(seg["end"])

            text = seg["text"].strip()

            f.write(f"{i}\n")
            f.write(f"{start} --> {end}\n")
            f.write(f"{text}\n\n")

    print("‚úÖ Subtitles Created")

    return out_file


In [68]:
def add_subtitles():

    !ffmpeg -y -i final_video.mp4 -vf subtitles=subtitles.srt final_video_with_subs.mp4

    print("‚úÖ Subtitles Added")


In [69]:
# -------------------------
# MAIN EXECUTION
# -------------------------

topic = "Artificial Intelligence in 2026"


print("üü¢ Generating Script...")
raw = generate_script(topic)


print("üü¢ Cleaning Script...")
script = clean_script(raw)


print("üü¢ Creating Voice...")
await text_to_voice(script)


print("üü¢ Downloading Images...")
images = get_images(topic)


print("üü¢ Creating Video...")
make_video(images, "voice.mp3")


print("üü¢ Creating Subtitles...")
generate_subtitles("voice.mp3")


print("üü¢ Adding Subtitles...")
add_subtitles()


print("\nüéâ DONE: final_video_with_subs.mp4 Ready")


üü¢ Generating Script...
üü¢ Cleaning Script...
üü¢ Creating Voice...
‚úÖ Voice Created
üü¢ Downloading Images...
üîç Trying Pexels...
‚û°Ô∏è Pexels Attempt 1
‚úÖ Pexels Success
üì∏ Total Images: 10
üü¢ Creating Video...
Moviepy - Building video final_video.mp4.
MoviePy - Writing audio in final_videoTEMP_MPY_wvf_snd.mp4




MoviePy - Done.
Moviepy - Writing video final_video.mp4





Moviepy - Done !
Moviepy - video ready final_video.mp4
üü¢ Creating Subtitles...
üéß Loading Whisper...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 139M/139M [00:01<00:00, 122MiB/s]



üìù Transcribing...
‚úÖ Subtitles Created
üü¢ Adding Subtitles...
ffmpeg version 4.4.2-0ubuntu0.22.04.1 Copyright (c) 2000-2021 the FFmpeg developers
  built with gcc 11 (Ubuntu 11.2.0-19ubuntu1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.22.04.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libdav1d --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librabbitmq --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libtheora --enable-libt

In [70]:
from IPython.display import Video, display

display(Video("final_video_with_subs.mp4", embed=True))