<a href="https://colab.research.google.com/github/sebbe2407/dataset/blob/main/transcribe_latest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Notes on usage:

- Make sure to [change runtime to GPU](https://www.tutorialspoint.com/google_colab/google_colab_using_free_gpu.htm). 
- The transcript will be saved in Files, which you can find in the menu on the left.
- Change the number of speakers below if different from two.
- Pick a bigger model if you want more accuracy and a smaller model if you want the program to run faster ([more info](https://github.com/openai/whisper#available-models-and-languages)).
- If you know the language being spoken is English, then change language to 'English' as this improves performance.


High level overview of what's happening here:


1.   I'm using Open AI's Whisper model to seperate audio into segments and generate transcripts.
2.   I'm then generating speaker embeddings for each segments.
3.   Then I'm using agglomerative clustering on the embeddings to identify the speaker for each segment.   

Let me know if I can make it better!


In [None]:
# upload audio file
from google.colab import files
uploaded = files.upload()
path = next(iter(uploaded))

KeyboardInterrupt: ignored

In [None]:
num_speakers = 8 #@param {type:"integer"}

language = "de" #@param ["any", "English", "de"]

model_size = 'large' #@param ['tiny', 'base', 'small', 'medium', 'large']


model_name = model_size
if language == 'English' and model_size != 'large':
  model_name += '.en'


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install -q git+https://github.com/openai/whisper.git > /dev/null
!pip install -q git+https://github.com/pyannote/pyannote-audio > /dev/null

import whisper
import datetime

import subprocess

import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding
embedding_model = PretrainedSpeakerEmbedding( 
    "speechbrain/spkrec-ecapa-voxceleb",
    device=torch.device("cuda"))

from pyannote.audio import Audio
from pyannote.core import Segment

import wave
import contextlib

from sklearn.cluster import AgglomerativeClustering
import numpy as np

Downloading (…)ain/hyperparams.yaml:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

Downloading embedding_model.ckpt:   0%|          | 0.00/83.3M [00:00<?, ?B/s]

Downloading (…)an_var_norm_emb.ckpt:   0%|          | 0.00/1.92k [00:00<?, ?B/s]

Downloading classifier.ckpt:   0%|          | 0.00/5.53M [00:00<?, ?B/s]

Downloading (…)in/label_encoder.txt:   0%|          | 0.00/129k [00:00<?, ?B/s]

In [None]:
path = '/content/drive/MyDrive/Breakout room Zentralschweiz-converted.mp3'
if path[-3:] != 'wav':
  subprocess.call(['ffmpeg', '-i', path, 'audio.wav', '-y'])
  path = 'audio.wav'

In [None]:
model = whisper.load_model(model_size)

100%|█████████████████████████████████████| 2.87G/2.87G [01:09<00:00, 44.4MiB/s]


In [None]:
result = model.transcribe(path)
segments = result["segments"]

In [None]:
print(result)

{'text': ' Herzlich willkommen in der Zentralschweiz-Gruppe. Guten Tag auch an Herrn Professor Kurzyder. Es freut mich, dass Sie es rechtzeitig geschafft haben. Ich teile gleich kurz die Slides, sodass wir die Wirksamkeit der Capitelo 291 Daten gemeinsam besprechen können. Können Sie meine Slides sehen? Könnte mir jemand kurz antworten, weil ich Sie nicht sehe? Super, vielen Dank. Als erstes würde mich interessieren, was Ihre Interpretation der PFS-Daten in der Gesamtpopulation ist, sowie auch in der Population mit einer Alteration im Signalweg, wie auch exploratorisch in der nicht alterierten Population? Frau Professor Leo? Das haben wir ja im Prinzip vorhin auch schon angeschaut, oder? Also ich denke, dass sowohl in der Gesamtpopulation als auch in der alterierten Population das PFS sehr ähnlich ist, oder wir haben eine sehr ähnliche Hazard Ratio, die in der alterierten Population noch ein bisschen besser ist. Aber sogar in der exploratorischen Analyse sehen wir da ein gutes Signal. 

In [None]:
with contextlib.closing(wave.open(path,'r')) as f:
  frames = f.getnframes()
  rate = f.getframerate()
  duration = frames / float(rate)

In [None]:
import re

def reformat_transcript(transcript):
    pattern = re.compile(r'S(\d+)\s([\d:]+)(.+)')
    lines = transcript.split('\n')
    formatted_lines = []
    for line in lines:
        match = pattern.match(line)
        if match:
            speaker = "S" + match.group(1)
            timecode = match.group(2)
            text = match.group(3).strip()
            formatted_lines.append(f"#00:{timecode}-0#\n{speaker}: {text} #00:{timecode}-1#")
    return '\n'.join(formatted_lines)

# Read the content from the .txt file
with open("transcript.txt", "r", encoding="utf-8") as f:
    transcript = f.read()

# Reformat the transcript
formatted_transcript = reformat_transcript(transcript)
print(formatted_transcript)
with contextlib.closing(wave.open(path,'r')) as f:
  frames = f.getnframes()
  rate = f.getframerate()
  duration = frames / float(rate)

FileNotFoundError: ignored

In [None]:
audio = Audio()

def segment_embedding(segment):
  start = segment["start"]
  # Whisper overshoots the end timestamp in the last segment
  end = min(duration, segment["end"])
  clip = Segment(start, end)
  waveform, sample_rate = audio.crop(path, clip)
  return embedding_model(waveform[None])

In [None]:
embeddings = np.zeros(shape=(len(segments), 192))
for i, segment in enumerate(segments):
  embeddings[i] = segment_embedding(segment)

embeddings = np.nan_to_num(embeddings)

AssertionError: ignored

In [None]:
clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
labels = clustering.labels_
for i in range(len(segments)):
  segments[i]["speaker"] = 'S' + str(labels[i] + 1)

In [None]:
with open("transcript.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()

for i in range(len(lines)):
    if "S1:" in lines[i]:
        lines[i] = lines[i].replace("S1:", "S2:")
    elif "S2:" in lines[i]:
        lines[i] = lines[i].replace("S2:", "S1:")

with open("transcript.txt", "w", encoding="utf-8") as f:
    f.writelines(lines)


# From here on only proof

In [None]:
# Check for correct change of speakers
with open("trancript.txt", "r", encoding="utf-8") as f:
    text = f.read()

lines = text.split("\n")

if len(lines) == 0 or (len(lines) == 1 and len(lines[0]) == 0):
    print("Error: input file is empty.")
else:
    speaker = None  # initialize speaker to None
    for i, line in enumerate(lines):
        if len(line) > 0:
            current_speaker = line[:2]
            if current_speaker not in ['S1', 'S2']:
                print(f"Error: Invalid speaker marker '{current_speaker}' in line {i}.")
                break
            if current_speaker != speaker:
                speaker = current_speaker
            else:
                print(line)
        else:
            print(f"Line {i} is empty. Skipping...\n")

Line 47 is empty. Skipping...

Line 103 is empty. Skipping...

Line 114 is empty. Skipping...

Line 140 is empty. Skipping...

Line 154 is empty. Skipping...

Line 206 is empty. Skipping...



In [None]:
# Check for correct timestamp format
import re

with open("trancript.txt", "r", encoding="utf-8") as f:
    text = f.read()

lines = text.split("\n")

if len(lines) == 0 or (len(lines) == 1 and len(lines[0]) == 0):
    print("Error: input file is empty.")
else:
    for i, line in enumerate(lines):
        if len(line) > 0:
            if re.search(r'#\d:\d\d:\d\d#', line):
                print(f"Incorrect timestamp format in line {i}: {line}")
        else:
            print(f"Line {i} is empty. Skipping...\n")

Line 1 is empty. Skipping...

Line 4 is empty. Skipping...

Line 53 is empty. Skipping...

Line 109 is empty. Skipping...

Line 120 is empty. Skipping...

Line 146 is empty. Skipping...

Line 211 is empty. Skipping...

