# Gemini ASR+MT

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


# Gemini ASR

In [None]:
# ========================
# INSTALL & IMPORTS
# ========================
!pip install -q google-generativeai pydub tqdm librosa

import os
import io
from google.colab import drive, userdata
import google.generativeai as genai
from pydub import AudioSegment
from tqdm import tqdm

# ========================
# SETUP
# ========================

# Mount Google Drive
drive.mount('/content/drive', force_remount=True)

# Securely load your Gemini API key from Colab secrets
api_key = userdata.get("GOOGLE_API_KEY")
if not api_key:
    raise ValueError("‚ùå No GOOGLE_API_KEY found in Colab secrets! Add it under 'More ‚Üí Secrets'.")

genai.configure(api_key=api_key)

# Choose your model
model = genai.GenerativeModel("models/gemini-2.5-pro")

# Input/output folders in Google Drive
base_dir = "/content/drive/MyDrive/Test_28_Adnew_wav/"
input_dir = "/content/drive/MyDrive/Test_28_Adnew_wav/Tamil/"
output_dir = os.path.join(base_dir, "Test_28_srtimprv","Srt1")
os.makedirs(output_dir, exist_ok=True)

# ========================
# HELPER FUNCTIONS
# ========================

def transcribe_audio_file(file_path):
    """Transcribe full audio file without splitting."""
    audio = AudioSegment.from_wav(file_path)
    buffer = io.BytesIO()
    audio.export(buffer, format="wav")
    audio_bytes = buffer.getvalue()

    try:
        response = model.generate_content([
            {"mime_type": "audio/wav", "data": audio_bytes},
            """
            Your are a Subtitle Generator:
            Transcribe this audio exactly as spoken in Tamil (no extra comments, no filler words) in the .srt format(Subtitle Format):

            1
            00:00:15,362 --> 00:00:21,789
            ‡§Ö‡§¨ ‡§π‡§Æ ‡§ú‡§æ‡§®‡•á‡§Ç‡§ó‡•á ‡§ï‡•à‡§Ç‡§°‡§≤‡•ç‡§∏ ‡§Æ‡•á‡§Ç ‡§ï‡•ç‡§Ø‡§æ ‡§ï‡•ç‡§Ø‡§æ ‡§ö‡•Ä‡§ú‡§º‡•ã‡§Ç ‡§ï‡•Ä ‡§ú‡§º‡§∞‡•Ç‡§∞‡§§ ‡§™‡§°‡§º‡§§‡•Ä ‡§π‡•à ‡§î‡§∞ ‡§â‡§®‡§ï‡•ã ‡§π‡§Æ ‡§ï‡§π‡§æ‡§Å ‡§∏‡•á ‡§ñ‡§º‡§∞‡•Ä‡§¶ ‡§∏‡§ï‡§§‡•á ‡§π‡•à‡§Ç

            2
            00:00:21,922 --> 00:00:27,422
            ‡§§‡•ã ‡§∏‡§¨‡§∏‡•á ‡§™‡§π‡§≤‡•á ‡§ï‡•à‡§Ç‡§°‡§≤ ‡§¨‡§®‡§æ‡§®‡•á ‡§ï‡•á ‡§≤‡§ø‡§è ‡§π‡§Æ‡•á‡§Ç ‡§°‡§¨‡§≤ ‡§¨‡•â‡§Ø‡§≤‡§∞ ‡§ï‡•Ä ‡§ú‡§º‡§∞‡•Ç‡§∞‡§§ ‡§™‡§°‡§º‡§§‡•Ä ‡§π‡•à ‡§Ø‡•á

            3
            00:00:27,617 --> 00:00:29,853
            ‡§á‡§∏ ‡§§‡§∞‡§π ‡§ï‡§æ ‡§Ø‡•á ‡§á‡§Ç‡§°‡§ï‡•ç‡§∂‡§® ‡§π‡•à

            and so on...

            The transcription should strictly follow the format above, where:
            - **Timestamps** are in the format of HH:MM:SS,SSS --> HH:MM:SS,SSS (with millisecond precision)(Hours:Minutes:Seconds,milliseconds).
            - Each entry should have a **sequential index** starting from 1 (e.g., 1, 2, 3, ...).
            - Even if Hours are not, Keep the Hours format in timestamp like this: 00:00:29,854 --> 00:00:34,500 not like this 00:29,854 --> 00:34,500 or 29,854 --> 34,500.
            - The spoken text should be captured **exactly as it is spoken**, without adding or removing words(but remove filler words).
            - If there is **silence** or a pause, mark the duration with a timestamp like this:
              ```
              4
              00:00:29,854 --> 00:00:34,500
              [Silence]
              ```
            - Include **Speaker labels** (e.g., Speaker 1, Speaker 2) where relevant if multiple speakers are detected.

            Please ensure the output strictly follows the SRT format. Thank you!
            """
        ])

        return response.text.strip()
    except Exception as e:
        print("‚ùå Error:", e)
        return ""

# ========================
# MAIN PROCESS
# ========================

for filename in os.listdir(input_dir):
    if filename.lower().endswith(".wav"):
        file_path = os.path.join(input_dir, filename)
        print(f"\nüéß Transcribing full audio: {filename}")

        # Get full transcription
        text = transcribe_audio_file(file_path)

        # Save TXT file
        txt_output = os.path.join(output_dir, filename.replace(".wav", ".txt"))
        with open(txt_output, "w", encoding="utf-8") as f:
            f.write(text)

        print(f"‚úÖ Done: {filename}")
        print(f"üìÑ TXT saved to: {txt_output}")


Text to Srt Format rectifier

In [None]:
import os
import re

def normalize_timestamp(ts: str) -> str:
    """
    Normalize timestamp to 'HH:MM:SS,mmm' format.
    Handles missing hours and malformed parts.
    """
    ts = ts.strip().replace('.', ',')
    # Split at comma for milliseconds
    if ',' in ts:
        time_part, ms = ts.split(',', 1)
        ms = re.sub(r'\D', '', ms)[:3].ljust(3, '0')
    else:
        time_part, ms = ts, '000'
    parts = time_part.split(':')
    # Fill missing parts
    if len(parts) == 1:
        h, m, s = 0, 0, parts[0]
    elif len(parts) == 2:
        h, m, s = 0, parts[0], parts[1]
    else:
        h, m, s = parts[-3], parts[-2], parts[-1]
    try:
        return f"{int(h):02d}:{int(m):02d}:{int(s):02d},{ms}"
    except:
        return "00:00:00,000"


def fix_srt_file(input_path, output_path):
    """
    Reads one .srt/.txt file, fixes timestamp formatting,
    and saves a new valid .srt file.
    """
    with open(input_path, 'r', encoding='utf-8', errors='ignore') as f:
        lines = f.readlines()

    new_lines = []
    ts_pattern = re.compile(
        r'(\d{1,2}:?\d{1,2}:?\d{1,2}[.,]?\d*)\s*[-‚Äì>]+\s*(\d{1,2}:?\d{1,2}:?\d{1,2}[.,]?\d*)'
    )

    for line in lines:
        match = ts_pattern.search(line)
        if match:
            start, end = match.groups()
            start = normalize_timestamp(start)
            end = normalize_timestamp(end)
            new_lines.append(f"{start} --> {end}\n")
        else:
            new_lines.append(line)

    with open(output_path, 'w', encoding='utf-8') as f:
        f.writelines(new_lines)


def process_folder(input_folder, output_folder):
    """
    Process all .srt/.txt files in a folder recursively,
    writing fixed versions to output_folder.
    """
    os.makedirs(output_folder, exist_ok=True)

    for root, _, files in os.walk(input_folder):
        for file in files:
            if file.lower().endswith(('.srt', '.txt')):
                input_path = os.path.join(root, file)
                rel_path = os.path.relpath(input_path, input_folder)
                output_path = os.path.join(output_folder, os.path.splitext(rel_path)[0] + '.srt')

                os.makedirs(os.path.dirname(output_path), exist_ok=True)
                print(f"Fixing: {rel_path}")
                fix_srt_file(input_path, output_path)

    print("\n All files processed and saved in:", output_folder)



if __name__ == "__main__":
    input_folder = "/content/drive/My Drive/Test_28_Adnew_wav/Test_28_Gemini25pro_asr/Tamil/Srt"
    output_folder = "/content/drive/My Drive/Test_28_Adnew_wav/Test_28_Gemini25pro_asr/Tamil/Fixed_srt"

    process_folder(input_folder, output_folder)


# Gemini MT (still testing for improvement)

In [None]:
from google import genai
from google.colab import drive, userdata
import os
import re
import time

# === Mount Google Drive and API ===
drive.mount('/content/drive')
os.environ["GOOGLE_API_KEY"] = userdata.get('GOOGLE_API_KEY')
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"])

# === Paths ===
base_dir = "/content/drive/My Drive/OpenAI_API_pipeline"
asr_dir = os.path.join(base_dir, "asr")  # input SRTs
mt_dir = os.path.join(base_dir, "mt","gemini_2.5_pro")   # translated output
os.makedirs(mt_dir, exist_ok=True)

target_language = "Telugu"
print("üü¢ Ready ‚Äî Processing all .srt files...")

# === SRT parsing pattern ===
pattern = r"(\d+)\s+([\d:,]+ --> [\d:,]+)\s+(.+?)(?=\n\d+\n|$)"

def translate_batch(lines):
    """Translate list of subtitle text chunks at once with Gemini."""
    joined_text = "\n".join(lines)
    prompt = f"""
You are a professional subtitle translator for Indic languages.

Translate the following subtitle dialogue into {target_language}.
Preserve meaning. Keep subtitles short and natural.
Do NOT translate numbers or timestamps.
Return one line per subtitle, in order.

Text:
{joined_text}
"""
    for _ in range(3):  # retry logic
        try:
            response = client.models.generate_content(
                model="gemini-2.5-pro",  # or gemini-2.0-pro if you have access
                contents=prompt
            )
            # Gemini's response object
            result_text = response.text.strip()
            return result_text.split("\n")
        except Exception as e:
            print("Retrying batch due to error:", e)
            time.sleep(3)
    return [""] * len(lines)


# === Loop over all SRT files ===
for f_name in os.listdir(asr_dir):
    if not f_name.lower().endswith(".srt"):
        continue

    input_file = os.path.join(asr_dir, f_name)
    print(f"\nüé¨ Processing: {f_name}")

    with open(input_file, 'r', encoding='utf-8') as f:
        content = f.read()

    entries = re.findall(pattern, content, flags=re.DOTALL)
    print(f"   ‚Üí {len(entries)} subtitles detected")

    translated_entries = []
    translated_text_only = []

    batch_size = 15
    for i in range(0, len(entries), batch_size):
        batch = entries[i:i+batch_size]
        orig_texts = [t[2].strip() for t in batch]

        translated_batch = translate_batch(orig_texts)

        for (num, ts, _), trans in zip(batch, translated_batch):
            translated_entries.append(f"{num}\n{ts}\n{trans}\n")
            translated_text_only.append(trans)

        print(f"   ‚úÖ Translated segments {i+1}‚Äì{min(i+batch_size,len(entries))}")

    # Save outputs
    base = os.path.splitext(f_name)[0]
    srt_out = os.path.join(mt_dir, f"{base}_{target_language}.srt")
    txt_out = os.path.join(mt_dir, f"{base}_{target_language}.txt")

    with open(srt_out, "w", encoding='utf-8') as f:
        f.write("\n".join(translated_entries))

    with open(txt_out, "w", encoding='utf-8') as f:
        f.write("\n".join(translated_text_only))

    print(f"   üìÅ Saved ‚Üí {srt_out}")
    print(f"   üìÑ Saved ‚Üí {txt_out}")

print("\n‚úÖ All files translated successfully!")


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
üü¢ Ready ‚Äî Processing all .srt files...

üé¨ Processing: Chapter 6A - Sucessful Entreuprenuer Journey.srt
   ‚Üí 139 subtitles detected
   ‚úÖ Translated segments 1‚Äì15
   ‚úÖ Translated segments 16‚Äì30
   ‚úÖ Translated segments 31‚Äì45
   ‚úÖ Translated segments 46‚Äì60
Retrying batch due to error: 503 UNAVAILABLE. {'error': {'code': 503, 'message': 'The model is overloaded. Please try again later.', 'status': 'UNAVAILABLE'}}
Retrying batch due to error: 503 UNAVAILABLE. {'error': {'code': 503, 'message': 'The model is overloaded. Please try again later.', 'status': 'UNAVAILABLE'}}
Retrying batch due to error: 503 UNAVAILABLE. {'error': {'code': 503, 'message': 'The model is overloaded. Please try again later.', 'status': 'UNAVAILABLE'}}
   ‚úÖ Translated segments 61‚Äì75
   ‚úÖ Translated segments 76‚Äì90
   ‚úÖ Translated segments 91‚Äì105
   ‚úÖ T

# Gemini TTS

In [None]:
!pip install -U -q "google-genai>=1.16.1"
# !pip install pysrt

from google.colab import drive, userdata
import io
import json
import re
import wave
import os
import base64
import struct
import shutil
import pysrt, time

from IPython.display import Audio, display, HTML, Markdown
from google import genai
from google.genai import types
from google.genai.types import GenerateContentConfig, Tool

# -------------------------------
# Mount Google Drive
# -------------------------------
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
drive.mount('/content/drive', force_remount=True)

# Initialize client
client = genai.Client(api_key=GOOGLE_API_KEY)


# -------------------------------
# Helper: parse .srt into segments
# -------------------------------
def parse_srt(path):
    subs = pysrt.open(path)
    segments = []
    for sub in subs:
        start = sub.start.hours*3600 + sub.start.minutes*60 + sub.start.seconds + sub.start.milliseconds/1000
        end   = sub.end.hours*3600   + sub.end.minutes*60   + sub.end.seconds   + sub.end.milliseconds/1000
        text = sub.text.replace("\n", " ").strip()
        segments.append((start, end, text))
    return segments


# -------------------------------
# Helper: write .wav file
# -------------------------------
def wave_file(filename, pcm, channels=1, rate=24000, sample_width=2):
    print(f"\nWriting audio file with parameters:")
    print(f"Channels: {channels}")
    print(f"Sample rate: {rate}")
    print(f"Sample width: {sample_width}")
    print(f"Data length: {len(pcm)} bytes")

    with wave.open(filename, "wb") as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(sample_width)
        wf.setframerate(rate)
        wf.writeframes(pcm)


# -------------------------------
# NEW Helper: Safe TTS with retry
# -------------------------------
def get_tts_audio(client, prompt, voice, retries=5, delay=5):
    """Call Gemini TTS with retry logic and safe extraction."""
    for attempt in range(retries):
        try:
            response = client.models.generate_content(
                model="gemini-2.5-pro-preview-tts",
                contents=prompt,
                config=types.GenerateContentConfig(
                    response_modalities=["audio"],
                    speech_config=types.SpeechConfig(
                        voice_config=types.VoiceConfig(
                            prebuilt_voice_config=types.PrebuiltVoiceConfig(
                                voice_name=voice
                            )
                        )
                    ),
                ),
            )

            # --- Safe extraction block ---
            data = None
            try:
                data = response.candidates[0].content.parts[0].inline_data.data
            except Exception:
                if hasattr(response.candidates[0].content, "inline_data"):
                    data = response.candidates[0].content.inline_data.data
                elif hasattr(response, "audio") and hasattr(response.audio, "data"):
                    data = response.audio.data

            if data:
                return data  # ‚úÖ success
            else:
                print(f"‚ö†Ô∏è No audio returned on attempt {attempt+1}. Retrying...")
                time.sleep(delay)
        except Exception as e:
            print(f"‚ö†Ô∏è TTS error on attempt {attempt+1}: {e}")
            time.sleep(delay)
    return None  # ‚ùå all retries failed


# -------------------------------
# Input + setup
# -------------------------------
srt_file_path = '/content/drive/MyDrive/aa/test2_Tamil.srt'  # replace with your path
VOICE = 'Kore'

segments = parse_srt(srt_file_path)
print(f"Found {len(segments)} subtitle segments.")

base_name = os.path.splitext(os.path.basename(srt_file_path))[0]
output_dir = f'/content/drive/MyDrive/aa/{base_name}_segments'
os.makedirs(output_dir, exist_ok=True)

failed_log = os.path.join(output_dir, "failed_segments.txt")

# -------------------------------
# Main processing loop
# -------------------------------
for idx, (start, end, text) in enumerate(segments, 1):
    if len(text.strip()) < 5:
        print(f"‚ö†Ô∏è Skipping too-short segment {idx}: '{text}'")
        continue

    PROMPT = f"Speak in Indian female Tamil with an educational tone: {text}"
    print(f"\nProcessing segment {idx} ({start:.2f}s ‚Üí {end:.2f}s): {text[:60]}...")

    data = get_tts_audio(client, PROMPT, VOICE)
    if not data:
        print(f"‚ùå Skipping segment {idx} ‚Äî no audio after retries.")
        with open(failed_log, "a") as log:
            log.write(f"{idx}: {text}\n")
        continue

    # Save audio
    rate = 24000
    file_name = f"{idx:03d}.wav"
    print(f"\nSaving sample rate: {rate}")
    wave_file(file_name, data, rate=rate)

    # Copy to Drive
    destination_path = os.path.join(output_dir, file_name)
    shutil.copy(f"/content/{file_name}", destination_path)
    display(Audio(destination_path))

print(f"\n‚úÖ All segments saved in: {output_dir}")
print(f"üìÑ Failed segments (if any) logged to: {failed_log}")


audio merge

In [None]:
import subprocess

def merge_segments_ffmpeg_timed(segments, segments_dir, output_path, sample_rate=24000):
    """
    Merge segments into a single time-aligned audio track using FFmpeg filter_complex.
    Each segment is placed at its exact SRT start time.
    """
    print("\nüéØ Performing precise timeline merge using FFmpeg...")

    filter_parts = []
    inputs = []

    for i, (start, end, text) in enumerate(segments, 1):
        seg_path = os.path.join(segments_dir, f"{i:03d}.wav")
        if not os.path.exists(seg_path):
            print(f"‚ö†Ô∏è Skipping missing segment {i:03d}")
            continue

        delay_ms = int(start * 1000)  # convert to milliseconds
        inputs += ["-i", seg_path]
        # Apply delay via adelay filter
        filter_parts.append(f"[{i-1}:a]adelay={delay_ms}|{delay_ms}[a{i}]")

    # Combine all delayed audio tracks
    filter_complex = "; ".join(filter_parts) + f"; {' '.join(f'[a{i}]' for i in range(1, len(filter_parts)+1))}amix=inputs={len(filter_parts)}:normalize=0[aout]"

    cmd = [
        "ffmpeg", "-y",
        *inputs,
        "-filter_complex", filter_complex,
        "-map", "[aout]",
        "-ar", str(sample_rate),
        "-ac", "1",
        "-c:a", "pcm_s16le",
        output_path
    ]

    print(f"\nRunning FFmpeg command:\n{' '.join(cmd)}\n")
    subprocess.run(cmd, check=True)
    print(f"‚úÖ Final aligned audio saved at: {output_path}")

final_output = f"/content/drive/MyDrive/aa/{base_name}_merged_timed_Tamil.wav"
merge_segments_ffmpeg_timed(segments, output_dir, final_output)

