# Install and Import Packages


In [None]:
%pip install -U openai-whisper
%pip install pyannote.audio

In [1]:
import whisper
import datetime

import subprocess

import torch
import pyannote.audio
from pyannote.audio.pipelines.speaker_verification import PretrainedSpeakerEmbedding

from pyannote.audio import Audio
from pyannote.core import Segment

import wave
import contextlib

from sklearn.cluster import AgglomerativeClustering
import numpy as np
import subprocess
import os

The torchaudio backend is switched to 'soundfile'. Note that 'sox_io' is not supported on Windows.
The torchaudio backend is switched to 'soundfile'. Note that 'sox_io' is not supported on Windows.


# Util Functions


## Convert MP3 into WAV
Run the first two lines if you are using a virtual environment

In [2]:
#ffmpeg_path = "C:\\ProgramData\\ffmpeg"
#os.environ["PATH"] += os.pathsep + ffmpeg_path

def mp3_to_wav(folder_path):
    # Iterate over all files in the folder
    for file_name in os.listdir(folder_path):
        # Check if the file is an MP3
        if file_name.endswith(".mp3"):
            # Set the paths for the MP3 and WAV files
            mp3_path = os.path.join(folder_path, file_name)
            wav_path = os.path.join(folder_path, file_name[:-4] + ".wav")
            
            # Use subprocess to run the ffmpeg command to convert the MP3 to WAV
            subprocess.run(["ffmpeg", "-i", mp3_path, "-ar", "16000", wav_path], check=True)
            
            # Delete the original MP3 file
            os.remove(mp3_path)

## Load Whisper and Pyannote Audio model

In [3]:
num_speakers = 2 #@param {type:"integer"}
language = 'English' #@param ['any', 'English']
model_size = "tiny" #@param ["tiny", "base", "small", "medium", "large", "large_v2"]

model_name = model_size
if language == 'English' and model_size != 'medium':
  model_name += '.en'

model = whisper.load_model(model_size)
model_medium = whisper.load_model('medium') #Base model is used to test other function
embedding_model = PretrainedSpeakerEmbedding( 
    "speechbrain/spkrec-ecapa-voxceleb",
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
)

## Other functions to transcribe text with Speech Diarization

In [4]:
def get_duration(path):
  with contextlib.closing(wave.open(path,'r')) as f:
    frames = f.getnframes()
    rate = f.getframerate()
    return frames / float(rate)
  
def make_embeddings(path, segments, duration):
  embeddings = np.zeros(shape=(len(segments), 192))
  for i, segment in enumerate(segments):
    embeddings[i] = segment_embedding(path, segment, duration)
  return np.nan_to_num(embeddings)

audio = Audio()

def segment_embedding(path, segment, duration):
  start = segment["start"]
  # Whisper overshoots the end timestamp in the last segment
  end = min(duration, segment["end"])
  clip = Segment(start, end)
  waveform, sample_rate = audio.crop(path, clip)
  return embedding_model(waveform[None])

def add_speaker_labels(segments, embeddings, num_speakers):
  clustering = AgglomerativeClustering(num_speakers).fit(embeddings)
  labels = clustering.labels_
  for i in range(len(segments)):
    segments[i]["speaker"] = 'SPEAKER ' + str(labels[i] + 1)

def time(secs):
  return datetime.timedelta(seconds=round(secs))

def get_output(segments):
  output = ''
  for (i, segment) in enumerate(segments):
    output += segment["text"][1:] + '\n'
  output += '\n'
  return output
def write_file(text, file_path):
    directory = os.path.dirname(file_path)
    if not os.path.exists(directory):
        os.makedirs(directory)
    with open(file_path, 'w') as file:
        file.write(text)
        
def transcribe(path, num_speakers,model=model):
  duration = get_duration(path)
  if duration > 4 * 60 * 60:
    return "Audio duration too long"

  result = model.transcribe(path)
  segments = result["segments"]

  num_speakers = min(max(round(num_speakers), 1), len(segments))
  if len(segments) == 1:
    segments[0]['speaker'] = 'SPEAKER 1'
  else:
    embeddings = make_embeddings(path, segments, duration)
    add_speaker_labels(segments, embeddings, num_speakers)
  output = get_output(segments)
  return output


# Main Function

In [9]:
classification = '/not_interested'#Change this for different type of classification
audio_path = './asset/testing_audio' + classification
result_path = './asset/testing_result' + classification


mp3_to_wav(audio_path)
output=''
for file_name in os.listdir(audio_path):
    if os.path.isfile(os.path.join(audio_path, file_name)):
        file_path = os.path.join(audio_path, file_name)
        file_result_path = os.path.join(result_path, file_name[:-4] + '.txt')
        output =transcribe(file_path, num_speakers,model=model_medium)
        write_file(output, file_result_path)


KeyboardInterrupt: 