In [1]:
!pip install ffmpeg-python --quiet
!pip install pyannote.audio --quiet
!pip install torchvision==0.23.0 --quiet

In [73]:
import os
import torch
from pyannote.audio import Pipeline

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

CLEAN_AUDIO_PATH = "audios/noise_code_switch_cleaned.wav"

os.makedirs("diarization_output", exist_ok=True)
os.makedirs("segments", exist_ok=True)

print("Using cleaned audio:", CLEAN_AUDIO_PATH)

HF_TOKEN = ""
PYANNOTE_API_KEY = ""

Using device: cuda
Using cleaned audio: audios/noise_code_switch_cleaned.wav


In [3]:
# Pyannote Percision-2 Model

pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-precision-2",
    token=PYANNOTE_API_KEY
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.yaml:   0%|          | 0.00/134 [00:00<?, ?B/s]

In [40]:
# Pyannote Community-1 Model

pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-community-1",
    token=HF_TOKEN
)

config.yaml:   0%|          | 0.00/444 [00:00<?, ?B/s]

segmentation/pytorch_model.bin:   0%|          | 0.00/5.91M [00:00<?, ?B/s]

plda/xvec_transform.npz:   0%|          | 0.00/134k [00:00<?, ?B/s]

plda/plda.npz:   0%|          | 0.00/134k [00:00<?, ?B/s]

embedding/pytorch_model.bin:   0%|          | 0.00/26.6M [00:00<?, ?B/s]

In [74]:
pipeline.to(device)
output = pipeline(CLEAN_AUDIO_PATH, num_speakers=2)
print("Diarization complete.")

Diarization complete.


In [75]:
segments = []

for turn, speaker in output.speaker_diarization:
    segments.append({
        "speaker": speaker,
        "start": float(turn.start),
        "end": float(turn.end)
    })

print("Raw segments:", len(segments))
segments[0:]

Raw segments: 35


[{'speaker': 'SPEAKER_01', 'start': 0.50346875, 'end': 1.3303437500000002},
 {'speaker': 'SPEAKER_00',
  'start': 1.3303437500000002,
  'end': 1.3472187500000001},
 {'speaker': 'SPEAKER_01', 'start': 1.3472187500000001, 'end': 1.36409375},
 {'speaker': 'SPEAKER_00', 'start': 1.36409375, 'end': 1.4653437500000002},
 {'speaker': 'SPEAKER_01', 'start': 1.4653437500000002, 'end': 1.56659375},
 {'speaker': 'SPEAKER_00', 'start': 1.56659375, 'end': 1.6172187500000001},
 {'speaker': 'SPEAKER_01', 'start': 1.6172187500000001, 'end': 3.03471875},
 {'speaker': 'SPEAKER_00', 'start': 3.03471875, 'end': 3.4397187500000004},
 {'speaker': 'SPEAKER_00', 'start': 3.7434687500000003, 'end': 7.43909375},
 {'speaker': 'SPEAKER_00', 'start': 8.67096875, 'end': 10.25721875},
 {'speaker': 'SPEAKER_00',
  'start': 10.425968750000003,
  'end': 14.745968750000003},
 {'speaker': 'SPEAKER_00', 'start': 15.758468750000002, 'end': 16.90596875},
 {'speaker': 'SPEAKER_00', 'start': 17.20971875, 'end': 17.98596875},


In [76]:
def clean_diarization_segments(segments, gap_threshold=0.5, min_duration=0.4):

    merged = []

    for seg in segments:
        if merged and merged[-1]["speaker"] == seg["speaker"]:
            if seg["start"] - merged[-1]["end"] <= gap_threshold:
                merged[-1]["end"] = seg["end"]
            else:
                merged.append(seg)
        else:
            merged.append(seg)

    cleaned = [
        s for s in merged if s["end"] - s["start"] >= min_duration
    ]

    # force first speaker as SPEAKER_00
    if cleaned:
        first = cleaned[0]["speaker"]
        if first != "SPEAKER_00":
            for s in cleaned:
                s["speaker"] = (
                    "SPEAKER_00" if s["speaker"] != "SPEAKER_00" else "SPEAKER_01"
                )

    return cleaned


cleaned_segments = clean_diarization_segments(segments)

print("Cleaned segments:", len(cleaned_segments))
cleaned_segments[0:]

Cleaned segments: 16


[{'speaker': 'SPEAKER_00', 'start': 0.50346875, 'end': 1.3303437500000002},
 {'speaker': 'SPEAKER_00', 'start': 1.6172187500000001, 'end': 3.03471875},
 {'speaker': 'SPEAKER_01', 'start': 3.03471875, 'end': 7.43909375},
 {'speaker': 'SPEAKER_01', 'start': 8.67096875, 'end': 14.745968750000003},
 {'speaker': 'SPEAKER_01', 'start': 15.758468750000002, 'end': 17.98596875},
 {'speaker': 'SPEAKER_01', 'start': 18.57659375, 'end': 23.04846875},
 {'speaker': 'SPEAKER_00', 'start': 25.191593750000003, 'end': 29.96721875},
 {'speaker': 'SPEAKER_00', 'start': 30.65909375, 'end': 31.18221875},
 {'speaker': 'SPEAKER_01', 'start': 31.18221875, 'end': 36.61596875},
 {'speaker': 'SPEAKER_00', 'start': 38.48909375, 'end': 42.572843750000004},
 {'speaker': 'SPEAKER_00', 'start': 42.92721875, 'end': 44.024093750000006},
 {'speaker': 'SPEAKER_01',
  'start': 44.024093750000006,
  'end': 44.564093750000005},
 {'speaker': 'SPEAKER_01', 'start': 45.23909375, 'end': 48.63096875},
 {'speaker': 'SPEAKER_01', '

In [77]:
import json

json_path = "diarization_output/noise_code_switch.json"

with open(json_path, "w") as f:
    json.dump(cleaned_segments, f, indent=4)

print("Saved diarization JSON →", json_path)

Saved diarization JSON → diarization_output/community_model/noise_code_switch.json


In [78]:
import ffmpeg

for i, seg in enumerate(cleaned_segments):
    start = seg["start"]
    end = seg["end"]
    speaker = seg["speaker"]

    out_path = f"segments/noise_code_switch_segments/seg_{i}_{speaker}.wav"

    (
        ffmpeg
        .input(CLEAN_AUDIO_PATH, ss=start, to=end)
        .output(out_path, ac=1, ar=16000)
        .overwrite_output()
        .run(quiet=True)
    )

print("Saved all segments → /segments")

Saved all segments → /segments
