In [None]:
!pip install moviepy
!pip install google-generativeai
!pip install elevenlabs
!pip install python-dotenv

In [None]:
import os
import uuid
import time
import base64
import requests
from io import BytesIO
from IPython.display import Audio, display
import wave
from elevenlabs.client import ElevenLabs
from dotenv import load_dotenv

In [None]:
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
ELEVENLABS_API_KEY = os.getenv("ELEVENLABS_API_KEY")
VOICE_ID = os.getenv("VOICE_ID")
GEMINI_LLM_MODEL = os.getenv("GEMINI_LLM_MODEL")

In [None]:
elevenlabs_client  = ElevenLabs(api_key= ELEVENLABS_API_KEY)

def get_audio_from_elevenlabs(text_to_speak):
    audio = elevenlabs_client.text_to_speech.convert(
        text=text_to_speak,
        voice_id=VOICE_ID,
        model_id="eleven_flash_v2",
        output_format="mp3_44100_128",
    )
    save_file_path = f"{text_to_speak}.mp3"
    with open(save_file_path, "wb") as f:
        for chunk in audio:
            if chunk:
                f.write(chunk)

In [None]:
def get_caption_from_gemini(image_data):
    """
    Sends a base64 encoded image to the Gemini API to get a text caption.

    Args:
        image_data (str): Base64 encoded string of the image frame.

    Returns:
        str: The generated caption or an error message.
    """
    url = f"https://generativelanguage.googleapis.com/v1beta/models/{GEMINI_LLM_MODEL}:generateContent?key={GEMINI_API_KEY}"

    payload = {
        "contents": [
            {
                "role": "user",
                "parts": [
                    { "text": "Describe the ongoing esports game action in this image in one brief sentence. Focus on key events like kills, objectives, or team fights." },
                    { "inlineData": { "mimeType": "image/jpeg", "data": image_data } }
                ]
            }
        ]
    }

    try:
        response = requests.post(url, json=payload, headers={'Content-Type': 'application/json'})
        response.raise_for_status() # Raise an error for bad status codes

        result = response.json()
        caption = result['candidates'][0]['content']['parts'][0]['text']
        return caption
    except requests.exceptions.RequestException as e:
        return f"Error getting caption: {e}"

def get_audio_from_gemini(text_to_speak):
    """
    Sends text to the Gemini TTS API to get base64 encoded audio data.

    Args:
        text_to_speak (str): The text to be converted to speech.

    Returns:
        tuple: A tuple containing (base64_audio_data, mime_type) or (None, None) on failure.
    """
    url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-preview-tts:generateContent?key={GEMINI_API_KEY}"

    payload = {
        "contents": [
            { "parts": [ { "text": text_to_speak } ] }
        ],
        "generationConfig": {
            "responseModalities": ["AUDIO"],
            "speechConfig": {
                "voiceConfig": {
                    "prebuiltVoiceConfig": { "voiceName": "Rasalgethi" }
                }
            }
        }
    }

    try:
        response = requests.post(url, json=payload, headers={'Content-Type': 'application/json'})
        response.raise_for_status()

        result = response.json()
        audio_data = result['candidates'][0]['content']['parts'][0]['inlineData']['data']
        mime_type = result['candidates'][0]['content']['parts'][0]['inlineData']['mimeType']
        return audio_data, mime_type
    except requests.exceptions.RequestException as e:
        print(f"Error getting audio: {e}")
        return None, None

def pcm_to_wav(pcm_data, sample_rate, num_channels=1, samp_width=2):
    """
    Converts raw PCM audio data to a WAV file format in memory.

    Args:
        pcm_data (bytes): The raw PCM audio bytes.
        sample_rate (int): The sample rate of the audio (e.g., 24000).
        num_channels (int): The number of audio channels (e.g., 1 for mono).
        samp_width (int): The sample width in bytes (e.g., 2 for 16-bit).

    Returns:
        bytes: The WAV audio data as bytes.
    """
    with BytesIO() as buffer:
        with wave.open(buffer, 'wb') as wav_file:
            wav_file.setnchannels(num_channels)
            wav_file.setsampwidth(samp_width)
            wav_file.setframerate(sample_rate)
            wav_file.writeframes(pcm_data)
        return buffer.getvalue()

def save_and_play_audio(base64_audio, output_filename, mime_type):
    """
    Saves the base64 audio data to a file and plays it in the notebook.

    Args:
        base64_audio (str): Base64 encoded audio data.
        output_filename (str): The name of the file to save.
        mime_type (str): The mime type of the audio data.
    """
    try:
        audio_bytes = base64.b64decode(base64_audio)
        sample_rate = int(mime_type.split('=')[-1])

        # Convert the raw audio to a WAV file format
        wav_bytes = pcm_to_wav(audio_bytes, sample_rate)

        # Play the WAV data
        display(Audio(data=wav_bytes, autoplay=True))

        with open(output_filename, 'wb') as f:
            f.write(wav_bytes)
        print(f"Audio saved to {output_filename}")

    except Exception as e:
        print(f"Error saving or playing audio: {e}")



# --- Step 5: Main Pipeline Function ---

def process_video_and_caption(video_path, interval_seconds=2):
    """
    Main function to process the video, generate captions, and create speech.

    Args:
        video_path (str): The path to the video file.
        interval_seconds (int): The interval in seconds to capture frames.
    """
    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return

    video_clip = VideoFileClip(video_path)
    duration = video_clip.duration

    print(f"Processing video: {video_path}")
    print(f"Total duration: {duration:.2f} seconds")

    for t in range(0, int(duration), interval_seconds):
        print(f"\n--- Processing frame at {t} seconds ---")

        try:
            # Extract frame and convert to bytes
            frame = video_clip.get_frame(t)

            # Convert the frame to a temporary image file in memory
            from PIL import Image
            image_buffer = BytesIO()
            Image.fromarray(frame).save(image_buffer, format='JPEG')
            image_bytes = image_buffer.getvalue()

            # Encode the image bytes to base64
            base64_image = base64.b64encode(image_bytes).decode('utf-8')

            # Get caption from Gemini
            caption = get_caption_from_gemini(base64_image)
            print(f"Generated Caption: {caption}")

            # Get audio from Gemini TTS
            audio_data, mime_type = get_audio_from_gemini(caption)
            if audio_data and mime_type:
                save_and_play_audio(audio_data, f"caption_at_{t}s.l16", mime_type)

        except Exception as e:
            print(f"An error occurred at time {t}s: {e}")

    video_clip.close()
    print("\n--- Video processing complete. ---")



In [None]:
VIDEO_FILE_NAME = "/content/your_video.mp4"
process_video_and_caption(VIDEO_FILE_NAME, interval_seconds=30)

Processing video: /content/your_video.mp4
Total duration: 74.38 seconds

--- Processing frame at 0 seconds ---
Generated Caption: A fierce team fight is erupting around the Baron pit, with multiple players engaging and using abilities.


Audio saved to caption_at_0s.l16

--- Processing frame at 30 seconds ---
Generated Caption: A chaotic team fight erupts near a dragon pit as players clash with a mix of abilities and attacks.


Audio saved to caption_at_30s.l16

--- Processing frame at 60 seconds ---
Generated Caption: A team fight is occurring in the jungle with multiple champions engaged.


Audio saved to caption_at_60s.l16

--- Video processing complete. ---
