In [None]:
!apt-get -y install ffmpeg

!pip install -q \
  openai \
  pydub \
  ffmpeg-python \
  numpy \
  pandas


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 41 not upgraded.


In [40]:
import os
import json
import subprocess
from pathlib import Path
from datetime import timedelta

from google.colab import drive, userdata
from pydub import AudioSegment
from openai import OpenAI

# Load API key from Colab secret named: openai-transcribe-key
os.environ["OPENAI_API_KEY"] = userdata.get("openai-transcribe-key")

client = OpenAI()

drive.mount('/content/drive')

# --------- INPUT: folder with .MTS files ----------
AUDIO_DIR = Path(
    "/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+"
)

# --------- OUTPUT ROOT ----------
PROJECT_ROOT = Path("/content/drive/MyDrive/whisdiar_gpt4o_smart_ts")
OUT_WAV_DIR  = PROJECT_ROOT / "wav_16k"
OUT_TXT_DIR  = PROJECT_ROOT / "txt"
OUT_SRT_DIR  = PROJECT_ROOT / "srt"

for d in [PROJECT_ROOT, OUT_WAV_DIR, OUT_TXT_DIR, OUT_SRT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

# --------- GLOBAL CONFIG ----------
SAMPLE_RATE          = 16000
AUDIO_BITRATE        = "24k"          # compressed speech
TRANSCRIBE_MODEL     = "gpt-4o-transcribe"
SEGMENT_MODEL        = "gpt-4o"
ENABLE_SPEAKER_GUESS = False          # set True to ask GPT to guess speakers

AUDIO_DIR, OUT_WAV_DIR, OUT_TXT_DIR, OUT_SRT_DIR


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


(PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+'),
 PosixPath('/content/drive/MyDrive/whisdiar_gpt4o_smart_ts/wav_16k'),
 PosixPath('/content/drive/MyDrive/whisdiar_gpt4o_smart_ts/txt'),
 PosixPath('/content/drive/MyDrive/whisdiar_gpt4o_smart_ts/srt'))

In [None]:
def mts_to_wav_16k_mono(src_path: Path, dst_path: Path) -> Path:
    """
    Convert .MTS (or any media) to 16kHz mono WAV with low bitrate.
    """
    dst_path.parent.mkdir(parents=True, exist_ok=True)

    cmd = [
        "ffmpeg",
        "-y",
        "-i", str(src_path),
        "-ac", "1",                  # mono
        "-ar", str(SAMPLE_RATE),     # 16 kHz
        "-b:a", AUDIO_BITRATE,       # compressed for speed
        "-vn",
        str(dst_path),
    ]
    subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return dst_path


def normalize_wav(path: Path, target_dBFS: float = -21.0) -> Path:
    """
    Loudness normalization to target_dBFS.
    Slightly louder (-21 dBFS) to help Tamil + soft speech.
    """
    audio = AudioSegment.from_file(path)
    change_dBFS = target_dBFS - audio.dBFS
    normalized = audio.apply_gain(change_dBFS)
    normalized.export(path, format="wav")
    return path


def get_audio_duration_seconds(path: Path) -> float:
    audio = AudioSegment.from_file(path)
    return len(audio) / 1000.0


In [None]:
def transcribe_with_gpt4o_transcribe(wav_path: Path, language: str = None, prompt: str = None) -> str:
    """
    One-shot transcription with GPT-4o Transcribe.
    Returns full text, no timestamps.
    """
    print("Sending audio to GPT-4o Transcribe...")
    with open(wav_path, "rb") as f:
        tr = client.audio.transcriptions.create(
            model=TRANSCRIBE_MODEL,
            file=f,
            response_format="text",   # full text string
            language=language,        # None = auto-detect (needed for Tamil mix)
            prompt=prompt,            # Optional prompt to guide the model
        )
    print("Transcription finished.")
    # For response_format="text" the SDK already returns a plain string
    if isinstance(tr, str):
        return tr
    return tr.text


In [None]:
def gpt_smart_segments(full_text: str, total_duration_sec: float, approx_segments: int = 80):
    """
    Use GPT-4o to split transcript into segments with smart approximate timestamps.
    """
    # Safety: limit extremely long texts if something goes wrong
    full_text = full_text.strip()
    if not full_text:
        return []

    system_msg = (
        "You are a precise transcript segmenter. "
        "You receive a full transcript and total audio duration in seconds. "
        "You must split the transcript into ordered segments with approximate start/end timestamps. "
        "Timestamps must be in seconds (float), non-negative, strictly increasing, "
        "and end must never exceed total_duration_sec. "
        "Keep the words in original order. "
        "Do not invent words. "
        "Return ONLY valid JSON with structure: "
        "{\"segments\": [{\"start\": float, \"end\": float, \"text\": string}, ...]}."
    )

    user_payload = {
        "total_duration_sec": total_duration_sec,
        "desired_segments": approx_segments,
        "transcript": full_text,
    }

    resp = client.chat.completions.create(
        model=SEGMENT_MODEL,
        response_format={"type": "json_object"},
        temperature=0,
        messages=[
            {"role": "system", "content": system_msg},
            {
                "role": "user",
                "content": json.dumps(user_payload, ensure_ascii=False),
            },
        ],
    )

    content = resp.choices[0].message.content
    data = json.loads(content)
    segments = data.get("segments", [])

    # Basic cleanup and clamping
    cleaned = []
    last_end = 0.0
    for seg in segments:
        try:
            start = float(seg.get("start", 0.0))
            end   = float(seg.get("end", 0.0))
            text  = str(seg.get("text", "")).strip()
        except Exception:
            continue
        if not text:
            continue
        if start < 0:
            start = 0.0
        if end <= start:
            end = start + 0.5
        if start < last_end:
            start = last_end
        if end > total_duration_sec:
            end = total_duration_sec
        cleaned.append({"start": start, "end": end, "text": text})
        last_end = end
        if last_end >= total_duration_sec:
            break

    return cleaned


In [None]:
def gpt_add_speaker_labels(segments, max_speakers: int = 4):
    """
    Optional pass: ask GPT-4o to assign speaker labels (S1, S2, ...) to each segment.
    """
    if not segments:
        return segments

    system_msg = (
        "You are a diarization assistant working ONLY from text. "
        "Assign speaker labels S1, S2, ..., up to a maximum number of speakers. "
        "Use consistent labels for the same voice across segments. "
        "If unsure, keep the same speaker as previous segment. "
        "Do not change start/end/text. "
        "Return ONLY JSON: {\"segments\": [{\"start\": float, \"end\": float, \"text\": str, \"speaker\": str}, ...]}."
    )

    payload = {
        "max_speakers": max_speakers,
        "segments": segments,
    }

    resp = client.chat.completions.create(
        model=SEGMENT_MODEL,
        response_format={"type": "json_object"},
        temperature=0,
        messages=[
            {"role": "system", "content": system_msg},
            {"role": "user", "content": json.dumps(payload, ensure_ascii=False)},
        ],
    )

    content = resp.choices[0].message.content
    data = json.loads(content)
    out_segments = data.get("segments", [])

    # Fallback if GPT returns rubbish
    if not out_segments:
        return segments

    cleaned = []
    for base, new in zip(segments, out_segments):
        speaker = str(new.get("speaker", "")).strip() or None
        seg = {
            "start": base["start"],
            "end": base["end"],
            "text": base["text"],
        }
        if speaker:
            seg["speaker"] = speaker
        cleaned.append(seg)

    return cleaned


In [None]:
def format_timestamp(sec: float) -> str:
    td = timedelta(seconds=sec)
    total_ms = int(td.total_seconds() * 1000)
    hours = total_ms // 3_600_000
    minutes = (total_ms % 3_600_000) // 60_000
    seconds = (total_ms % 60_000) // 1000
    milliseconds = total_ms % 1000
    return f"{hours:02}:{minutes:02}:{seconds:02},{milliseconds:03}"


def export_srt(segments, out_path: Path) -> Path:
    """
    segments: list of dicts with keys: start, end, text, optional speaker.
    """
    lines = []
    for idx, seg in enumerate(segments, start=1):
        start_ts = format_timestamp(seg["start"])
        end_ts   = format_timestamp(seg["end"])
        text     = seg["text"].strip()
        speaker  = seg.get("speaker")

        if speaker:
            text = f"{speaker}: {text}"

        lines.append(str(idx))
        lines.append(f"{start_ts} --> {end_ts}")
        lines.append(text)
        lines.append("")

    out_path.write_text("\n".join(lines), encoding="utf-8")
    return out_path


def export_txt(segments, out_path: Path) -> Path:
    """
    Export plain readable transcript.
    If speaker labels exist, include them.
    """
    lines = []
    for seg in segments:
        text = seg["text"].strip()
        speaker = seg.get("speaker")
        if speaker:
            lines.append(f"{speaker}: {text}")
        else:
            lines.append(text)
    out_path.write_text("\n".join(lines), encoding="utf-8")
    return out_path


In [None]:
def process_single_mts(mts_path: Path, language: str = None, prompt: str = None):
    """
    .MTS → 16k WAV → normalize → GPT-4o Transcribe → GPT-4o smart timestamps
    → optional speaker labels → SRT + TXT.
    """
    print(f"Input: {mts_path}")

    # 1) Convert to WAV
    wav_path = OUT_WAV_DIR / f"{mts_path.stem}_16k_mono.wav"
    wav_path = mts_to_wav_16k_mono(mts_path, wav_path)
    print(f"WAV generated: {wav_path}")

    # 2) Normalize
    normalize_wav(wav_path)
    print("Audio normalized.")

    # 3) Duration
    duration_sec = get_audio_duration_seconds(wav_path)
    print(f"Duration: {duration_sec:.1f} seconds")

    # 4) Transcription (text only)
    try:
        full_text = transcribe_with_gpt4o_transcribe(wav_path, language=language, prompt=prompt)
    except Exception as e:
        print("Transcription error:", repr(e))
        return None, None

    # 5) Smart segmentation
    try:
        segments = gpt_smart_segments(full_text, total_duration_sec=duration_sec)
    except Exception as e:
        print("Segmentation error:", repr(e))
        return None, None

    print(f"Smart segments created: {len(segments)}")

    # 6) Optional speaker guessing
    if ENABLE_SPEAKER_GUESS:
        try:
            segments = gpt_add_speaker_labels(segments)
            print("Speaker labels added.")
        except Exception as e:
            print("Speaker-labelling error (ignored):", repr(e))

    # 7) Export
    srt_path = OUT_SRT_DIR / f"{mts_path.stem}.srt"
    txt_path = OUT_TXT_DIR / f"{mts_path.stem}.txt"

    export_srt(segments, srt_path)
    export_txt(segments, txt_path)

    print("Completed.")
    print("SRT:", srt_path)
    print("TXT:", txt_path)

    return srt_path, txt_path


In [41]:
mts_files = list(AUDIO_DIR.glob("*.MTS")) + list(AUDIO_DIR.glob("*.mts"))
print("Files detected:", len(mts_files))
mts_files


Files detected: 11


[PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00062.MTS'),
 PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00063.MTS'),
 PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00064.MTS'),
 PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00065.MTS'),
 PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00066.MTS'),
 PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00067.MTS'),
 PosixPath('/content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00068.MTS'),
 PosixPath('/content/drive/Shareddrives/A

In [42]:
import concurrent.futures

def worker(path: Path):
    print("=" * 80)
    print("Processing:", path.name)
    return process_single_mts(path, language=None)

NUM_WORKERS = 3

if mts_files:
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_WORKERS) as ex:
        list(ex.map(worker, mts_files))
else:
    print("No MTS files to process.")


Processing: 00062.MTS
Input: /content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00062.MTS
Processing: 00063.MTS
Input: /content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00063.MTS
Processing: 00064.MTS
Input: /content/drive/Shareddrives/Analytics Team/Clients Deliverables/IPTK/IPTK VIDEO/SABAH/22-11-2025 10 AM BUMI 61+/00064.MTS
WAV generated: /content/drive/MyDrive/whisdiar_gpt4o_smart_ts/wav_16k/00062_16k_mono.wav
Audio normalized.
Duration: 1.9 seconds
Sending audio to GPT-4o Transcribe...
WAV generated: /content/drive/MyDrive/whisdiar_gpt4o_smart_ts/wav_16k/00063_16k_mono.wav
Audio normalized.
Transcription finished.
Duration: 1.9 seconds
Sending audio to GPT-4o Transcribe...
Smart segments created: 1
Completed.
SRT: /content/drive/MyDrive/whisdiar_gpt4o_smart_ts/srt/00062.srt
TXT: /content/drive/MyDrive/whisdiar_gpt4o_smart_ts/txt/00062.txt
Processing: 00065.MT

In [None]:
# ==========================================
# CORRECTION WORKFLOW
# ==========================================
# List of files to re-process. Use filenames only.
FILES_TO_FIX = [
    # "00065.MTS",
    # "00072.MTS",
]

# Optional overrides
FORCE_LANGUAGE = None  # e.g. "ms" or "en"
FORCE_PROMPT   = None  # e.g. "This is a meeting in Malay and English."

if FILES_TO_FIX:
    print(f"Starting correction for {len(FILES_TO_FIX)} files...")
    for fname in FILES_TO_FIX:
        # Find the full path
        found = list(AUDIO_DIR.glob(fname))
        if not found:
            print(f"File not found: {fname}")
            continue
        
        path = found[0]
        print(f"Re-processing: {path.name}")
        process_single_mts(path, language=FORCE_LANGUAGE, prompt=FORCE_PROMPT)
else:
    print("No files listed for correction.")
