## Speech to Text with Speaker Diarization using Pyannote and Whisper Transcription

### Introduction:
Speech to Text (STT) with Speaker Diarization involves converting spoken language into written text while distinguishing different speakers. Pyannote and Whisper are powerful tools that make this process easier by offering pre-trained models for diarization and transcription, respectively.

## Step 1. Download Youtube Video
To download the YouTube video using yt_dlp:

### Prerequisites
FFmpeg:
1) 
Download FFmpeg from the official website: https://ffmpeg.org/download.htm <br>
2) Extract the downloaded ZIP file to a location on your computer. <br>
3) Add the path to the FFmpeg bin directory to your system's PATH: <br>
4) Right-click on This PC or My Computer. <br>
5) Select Properties. <br>
6) Click on Advanced system settings. <br>
7) Click on the Environment Variables button. <br>
8) Under System variables, find the "Path" variable, click Edit, and add the path to the FFmpeg bin directory (e.g., C:\path\to\ffmpeg\bin). <br>
9) Click OK to close the dialogs. <br>e dialogs.

In [19]:
youtube_url = 'https://www.youtube.com/watch?v=-0IxWrSBsco'
output_file = 'temp/myvideo'

In [9]:
from yt_dlp import YoutubeDL
from yt_dlp.postprocessor import FFmpegPostProcessor

ydl_opts = {
    'format': 'bestaudio/best',
    'outtmpl': output_file,
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'wav',
    }],
}

In [10]:
with YoutubeDL(ydl_opts) as ydl:
    ydl.download([youtube_url])
    info_dict = ydl.extract_info(youtube_url, download=False)
    video_id = info_dict.get("id", None)
    video_title = info_dict.get('title', None)
    print(video_title)
    print(video_id)

[youtube] Extracting URL: https://www.youtube.com/watch?v=-0IxWrSBsco
[youtube] -0IxWrSBsco: Downloading webpage
[youtube] -0IxWrSBsco: Downloading ios player API JSON
[youtube] -0IxWrSBsco: Downloading android player API JSON
[youtube] -0IxWrSBsco: Downloading m3u8 information
[info] -0IxWrSBsco: Downloading 1 format(s): 251
[download] Destination: temp\myvideo
[download] 100% of   12.65MiB in 00:00:00 at 14.38MiB/s    
[ExtractAudio] Destination: temp\myvideo.wav
Deleting original file temp\myvideo (pass -k to keep)
[youtube] Extracting URL: https://www.youtube.com/watch?v=-0IxWrSBsco
[youtube] -0IxWrSBsco: Downloading webpage
[youtube] -0IxWrSBsco: Downloading ios player API JSON
[youtube] -0IxWrSBsco: Downloading android player API JSON
[youtube] -0IxWrSBsco: Downloading m3u8 information
Top Private Job vs Top Government Job
-0IxWrSBsco


### Step 2. Perform Speaker Diarization with Pyannote
Now, let's perform speaker diarization using Pyannote:

In [13]:
from pyannote.audio import Pipeline
import torch
import re
from pydub import AudioSegment
import whisper
import json

In [14]:
def millisec(timeStr):
    spl = timeStr.split(":")
    s = int((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2])) * 1000)
    return s


def timeStr(t):
    return '{0:02d}:{1:02d}:{2:06.2f}'.format(round(t // 3600),
                                              round(t % 3600 // 60),
                                              t % 60)

#### Important: To load the pyannote speaker diarization pipeline,

accept the user conditions on both hf.co/pyannote/speaker-diarization and hf.co/pyannote/segmentation.
paste your access_token or login using notebook_login below

In [15]:
from huggingface_hub import notebook_login
notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [20]:
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization-3.0')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
pipeline.to(device)
diarization = pipeline(output_file + '.wav')
# for turn, _, speaker in diarization.itertracks(yield_label=True):
#     print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")
with open('temp\\diarization.txt', 'w') as text_file:
    text_file.write(str(diarization))
dzs = open('temp\\diarization.txt').read().splitlines()
groups = []
g = []
lastend = 0
for d in dzs:
    if g and (g[0].split()[-1] != d.split()[-1]):  # same speaker
        groups.append(g)
        g = []
    g.append(d)
    end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=d)[1]
    end = millisec(end)
    if lastend > end:  # segment engulfed by a previous segment
        groups.append(g)
        g = []
    else:
        lastend = end
if g:
    groups.append(g)

start =  0.0s stop =  21.0s speaker = SPEAKER_04
start =  3.3s stop =  3.6s speaker = SPEAKER_00
start =  7.0s stop =  7.3s speaker = SPEAKER_00
start =  7.3s stop =  7.4s speaker = SPEAKER_01
start =  7.4s stop =  7.4s speaker = SPEAKER_00
start =  10.1s stop =  10.4s speaker = SPEAKER_01
start =  10.7s stop =  11.0s speaker = SPEAKER_00
start =  21.0s stop =  27.3s speaker = SPEAKER_01
start =  28.8s stop =  67.8s speaker = SPEAKER_01
start =  63.5s stop =  64.0s speaker = SPEAKER_00
start =  68.9s stop =  93.9s speaker = SPEAKER_03
start =  94.3s stop =  94.6s speaker = SPEAKER_02
start =  94.6s stop =  94.6s speaker = SPEAKER_00
start =  94.6s stop =  94.7s speaker = SPEAKER_02
start =  94.7s stop =  94.8s speaker = SPEAKER_00
start =  94.9s stop =  95.5s speaker = SPEAKER_00
start =  97.6s stop =  100.2s speaker = SPEAKER_02
start =  101.6s stop =  116.0s speaker = SPEAKER_02
start =  116.2s stop =  121.0s speaker = SPEAKER_02
start =  121.0s stop =  122.8s speaker = SPEAKER_04
st

Segment autio as groups

In [22]:
audio = AudioSegment.from_wav(output_file + '.wav')
gidx = -1
for g in groups:
    start = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
    end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[-1])[1]
    start = millisec(start)
    end = millisec(end)
    gidx += 1
    audio[start:end].export('temp\\' + str(gidx) + '.wav', format='wav')

Generate Transcripts using Whisper

In [24]:
model = whisper.load_model('tiny', device=torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
for i in range(len(groups)):
    audio_file = 'temp\\' + str(i) + '.wav'
    result = model.transcribe(audio=audio_file, language='en', word_timestamps=True)
    with open('temp\\' + str(i) + '.json', 'w') as outfile:
        json.dump(result, outfile, indent=4)

Create HTML file

Generate random colors for each speaker in Html

In [28]:
import matplotlib.colors as mcolors
import random
def random_color_generator():
    color = random.choice(list(mcolors.CSS4_COLORS.keys()))
    return color

In [79]:
import re
def get_speakers_list(speakers):
    speaker_list = []
    for s in speakers:
        m = re.search('[0-9]{3}] . (.+?)$', s[0])
        if m and m.group(1) not in speaker_list:
            speaker_list.append(m.group(1))
    return speaker_list

In [88]:
speakers = {}
for s in get_speakers_list(groups):
    speakers[s] = (s, random_color_generator(), random_color_generator())

In [92]:
def_boxclr = 'white'
def_spkrclr = 'orange'
preS = '<!DOCTYPE html>\n<html lang="en">\n\n<head>\n\t<meta charset="UTF-8">\n\t<meta name="viewport" content="width=device-width, initial-scale=1.0">\n\t<meta http-equiv="X-UA-Compatible" content="ie=edge">\n\t<title>' + \
       video_title + \
       '</title>\n\t<style>\n\t\tbody {\n\t\t\tfont-family: sans-serif;\n\t\t\tfont-size: 14px;\n\t\t\tcolor: #111;\n\t\t\tpadding: 0 0 1em 0;\n\t\t\tbackground-color: #efe7dd;\n\t\t}\n\n\t\ttable {\n\t\t\tborder-spacing: 10px;\n\t\t}\n\n\t\tth {\n\t\t\ttext-align: left;\n\t\t}\n\n\t\t.lt {\n\t\t\tcolor: inherit;\n\t\t\ttext-decoration: inherit;\n\t\t}\n\n\t\t.l {\n\t\t\tcolor: #050;\n\t\t}\n\n\t\t.s {\n\t\t\tdisplay: inline-block;\n\t\t}\n\n\t\t.c {\n\t\t\tdisplay: inline-block;\n\t\t}\n\n\t\t.e {\n\t\t\t/*background-color: white; Changing background color */\n\t\t\tborder-radius: 10px;\n\t\t\t/* Making border radius */\n\t\t\twidth: 50%;\n\t\t\t/* Making auto-sizable width */\n\t\t\tpadding: 0 0 0 0;\n\t\t\t/* Making space around letters */\n\t\t\tfont-size: 14px;\n\t\t\t/* Changing font size */\n\t\t\tmargin-bottom: 0;\n\t\t}\n\n\t\t.t {\n\t\t\tdisplay: inline-block;\n\t\t}\n\n\t\t#player-div {\n\t\t\tposition: sticky;\n\t\t\ttop: 20px;\n\t\t\tfloat: right;\n\t\t\twidth: 40%\n\t\t}\n\n\t\t#player {\n\t\t\taspect-ratio: 16 / 9;\n\t\t\twidth: 100%;\n\t\t\theight: auto;\n\n\t\t}\n\n\t\ta {\n\t\t\tdisplay: inline;\n\t\t}\n\t</style>\n\t<script>\n\t\tvar tag = document.createElement(\'script\');\n\t\ttag.src = "https://www.youtube.com/iframe_api";\n\t\tvar firstScriptTag = document.getElementsByTagName(\'script\')[0];\n\t\tfirstScriptTag.parentNode.insertBefore(tag, firstScriptTag);\n\t\tvar player;\n\t\tfunction onYouTubeIframeAPIReady() {\n\t\t\tplayer = new YT.Player(\'player\', {\n\t\t\t\t//height: \'210\',\n\t\t\t\t//width: \'340\',\n\t\t\t\tvideoId: \'' + \
       video_id + \
       '\',\n\t\t\t});\n\n\n\n\t\t\t// This is the source "window" that will emit the events.\n\t\t\tvar iframeWindow = player.getIframe().contentWindow;\n\t\t\tvar lastword = null;\n\n\t\t\t// So we can compare against new updates.\n\t\t\tvar lastTimeUpdate = "-1";\n\n\t\t\t// Listen to events triggered by postMessage,\n\t\t\t// this is how different windows in a browser\n\t\t\t// (such as a popup or iFrame) can communicate.\n\t\t\t// See: https://developer.mozilla.org/en-US/docs/Web/API/Window/postMessage\n\t\t\twindow.addEventListener("message", function (event) {\n\t\t\t\t// Check that the event was sent from the YouTube IFrame.\n\t\t\t\tif (event.source === iframeWindow) {\n\t\t\t\t\tvar data = JSON.parse(event.data);\n\n\t\t\t\t\t// The "infoDelivery" event is used by YT to transmit any\n\t\t\t\t\t// kind of information change in the player,\n\t\t\t\t\t// such as the current time or a playback quality change.\n\t\t\t\t\tif (\n\t\t\t\t\t\tdata.event === "infoDelivery" &&\n\t\t\t\t\t\tdata.info &&\n\t\t\t\t\t\tdata.info.currentTime\n\t\t\t\t\t) {\n\t\t\t\t\t\t// currentTime is emitted very frequently (milliseconds),\n\t\t\t\t\t\t// but we only care about whole second changes.\n\t\t\t\t\t\tvar ts = (data.info.currentTime).toFixed(1).toString();\n\t\t\t\t\t\tts = (Math.round((data.info.currentTime) * 5) / 5).toFixed(1);\n\t\t\t\t\t\tts = ts.toString();\n\t\t\t\t\t\tconsole.log(ts)\n\t\t\t\t\t\tif (ts !== lastTimeUpdate) {\n\t\t\t\t\t\t\tlastTimeUpdate = ts;\n\n\t\t\t\t\t\t\t// It\'s now up to you to format the time.\n\t\t\t\t\t\t\t//document.getElementById("time2").innerHTML = time;\n\t\t\t\t\t\t\tword = document.getElementById(ts)\n\t\t\t\t\t\t\tif (word) {\n\t\t\t\t\t\t\t\tif (lastword) {\n\t\t\t\t\t\t\t\t\tlastword.style.fontWeight = \'normal\';\n\t\t\t\t\t\t\t\t}\n\t\t\t\t\t\t\t\tlastword = word;\n\t\t\t\t\t\t\t\t//word.style.textDecoration = \'underline\';\n\t\t\t\t\t\t\t\tword.style.fontWeight = \'bold\';\n\n\t\t\t\t\t\t\t\tlet toggle = document.getElementById("autoscroll");\n\t\t\t\t\t\t\t\tif (toggle.checked) {\n\t\t\t\t\t\t\t\t\tlet position = word.offsetTop - 20;\n\t\t\t\t\t\t\t\t\twindow.scrollTo({\n\t\t\t\t\t\t\t\t\t\ttop: position,\n\t\t\t\t\t\t\t\t\t\tbehavior: \'smooth\'\n\t\t\t\t\t\t\t\t\t});\n\t\t\t\t\t\t\t\t}\n\n\t\t\t\t\t\t\t}\n\t\t\t\t\t\t}\n\t\t\t\t\t}\n\t\t\t\t}\n\t\t\t})\n\t\t}\n\t\tfunction jumptoTime(timepoint, id) {\n\t\t\tevent.preventDefault();\n\t\t\thistory.pushState(null, null, "#" + id);\n\t\t\tplayer.seekTo(timepoint);\n\t\t\tplayer.playVideo();\n\t\t}\n\t</script>\n</head>\n\n<body>\n\t<h2>' + \
       video_title + \
       '</h2>\n\t<i>Click on a part of the transcription, to jump to its video, and get an anchor to it in the address\n\t\tbar<br><br></i>\n\t<div id="player-div">\n\t\t<div id="player"></div>\n\t\t<div><label for="autoscroll">auto-scroll: </label>\n\t\t\t<input type="checkbox" id="autoscroll" checked>\n\t\t</div>\n\t</div>\n  '

postS = '\t</body>\n</html>'
html = list(preS)
txt = list("")
gidx = -1
for g in groups:
    shift = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
    shift = millisec(shift)
    shift = max(shift, 0)

    gidx += 1

    captions = json.load(open('temp\\' + str(gidx) + '.json'))['segments']

    if captions:
        speaker = g[0].split()[-1]
        boxclr = def_boxclr
        spkrclr = def_spkrclr
        if speaker in speakers:
            speaker, boxclr, spkrclr = speakers[speaker]

        html.append(f'<div class="e" style="background-color: {boxclr}">\n');
        html.append('<p  style="margin:0;padding: 5px 10px 10px 10px;word-wrap:normal;white-space:normal;">\n')
        html.append(f'<span style="color:{spkrclr};font-weight: bold;">{speaker}</span><br>\n\t\t\t\t')

        for c in captions:
            start = shift + c['start'] * 1000.0
            start = start / 1000.0  # time resolution ot youtube is Second.
            end = (shift + c['end'] * 1000.0) / 1000.0
            txt.append(f'[{timeStr(start)} --> {timeStr(end)}] [{speaker}] {c["text"]}\n')

            for i, w in enumerate(c['words']):
                if w == "":
                    continue
                start = (shift + w['start'] * 1000.0) / 1000.0
                # end = (shift + w['end']) / 1000.0   #time resolution ot youtube is Second.
                html.append(
                    f'<a href="#{timeStr(start)}" id="{"{:.1f}".format(round(start * 5) / 5)}" class="lt" onclick="jumptoTime({int(start)}, this.id)">{w["word"]}</a><!--\n\t\t\t\t-->')
        # html.append('\n')
        html.append('</p>\n')
        html.append(f'</div>\n')
html.append(postS)

with open(f"Transcript.txt", "w", encoding='utf-8') as file:
    s = "".join(txt)
    file.write(s)
    print('Transcript saved to Transcript.txt:')

with open(f"Transcript.html", "w",
          encoding='utf-8') as file:  # TODO: proper html embed tag when video/audio from file
    s = "".join(html)
    file.write(s)
    print('Transcript saved to Transcript.html:')

Transcript saved to Transcript.txt:
Transcript saved to Transcript.html:


Remove the downloaded files

In [91]:
import os
dir_name = "temp"
test = os.listdir(dir_name)
for item in test:
    if item.endswith(".wav"):
        os.remove(os.path.join(dir_name, item))