In [None]:
# Install tools
# speechbrain (used for speaker embedding)
!pip install -qq torch==1.11.0 torchvision==0.12.0 torchaudio==0.11.0 torchtext==0.12.0
!pip install -qq speechbrain==0.5.12

# pyannote.audio (used for speaker diarization)
!pip install -qq pyannote.audio==2.1.1

# OpenAI whisper (used for automatic speech recognition)
!pip install -qq git+https://github.com/openai/whisper.git 
  

In [None]:
# Import tools
import subprocess #if stripping audio from video file
import datetime #to print time of start and end of analysis
import os

from huggingface_hub import notebook_login
notebook_login()

from pyannote.audio import Pipeline

from pyannote.audio import Audio

import whisper


In [None]:
from pydub import AudioSegment

In [None]:
# Set variables
audio = Audio(sample_rate=16000, mono=True)
model_size = "large-v2"
base_model = whisper.load_model(model_size)

speakers = {'SPEAKER_00':('Talare_1', 'white', 'darkorange'), 
            'SPEAKER_01':('Talare_2', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_02':('Talare_3', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_03':('Talare_4', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_04':('Talare_5', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_05':('Talare_6', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_06':('Talare_7', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_07':('Talare_8', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_08':('Talare_9', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_09':('Talare_10', '#e1ffc7', 'darkgreen'), 
            'SPEAKER_10':('Talare_11', '#e1ffc7', 'darkgreen') }
def_boxclr = 'white'
def_spkrclr = 'orange'

In [None]:
# Function for isolating audio

def isolate_audio(file):
    subprocess.call(['ffmpeg', '-i', file, 'input_audio.wav', '-y'])
    return 'input_audio.wav'

In [None]:
def just_text(audio_file, file_path):
    result = base_model.transcribe(audio_file)
    with open(os.path.splitext(file_path)[0] + '_text.txt', 'w') as file:
        s = "".join(result["text"])
        file.write(s)


In [None]:
# Function for speaker diarization
def speaker_dz(audio_file):
    start = datetime.datetime.now()
    pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization@2.1', 
                                    use_auth_token=True)
    
    who_speaks_when = pipeline(audio_file, 
                                      num_speakers=None,  # these values can be
                                      min_speakers=None,  # provided by the user
                                      max_speakers=None)  # when they are known
    who_speaks_when
    with open("diarization.txt", "w") as text_file:
        text_file.write(str(who_speaks_when))
    end = datetime.datetime.now()


In [None]:
def millisec(timeStr):
  spl = timeStr.split(":")
  s = (int)((int(spl[0]) * 60 * 60 + int(spl[1]) * 60 + float(spl[2]) )* 1000)
  return s

In [None]:
# Preparing audio files according to dz
import re

def transcribe(audio_file):
    start = datetime.datetime.now()

    dzs = open('diarization.txt').read().splitlines()
    groups = []
    g = []
    lastend = 0
    
    for d in dzs:   
        if g and (g[0].split()[-1] != d.split()[-1]):      #same speaker
            groups.append(g)
            g = []
            
        g.append(d)
        end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=d)[1]
        end = millisec(end)
        
        if (lastend > end):       #segment engulfed by a previous segment
            groups.append(g)
            g = [] 
        else:
            lastend = end
    if g:
      groups.append(g)
    print(*groups, sep='\n')
    audio = AudioSegment.from_wav(audio_file)
    gidx = -1
    for g in groups:
      start = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
      end = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[-1])[1]
      start = millisec(start) #- spacermilli
      end = millisec(end)  #- spacermilli
      print(start, end)
      gidx += 1
      audio[start:end].export(str(gidx) + '.wav', format='wav')
        
    for i in range(gidx+1):
      !whisper {str(i) + '.wav'} --language sv --model large-v2
    end = datetime.datetime.now()
    return groups

In [None]:
def write(groups, file_path):
    import webvtt
    spacermilli = 0
    txt = list("")
    gidx = -1
    for g in groups:
        shift = re.findall('[0-9]+:[0-9]+:[0-9]+\.[0-9]+', string=g[0])[0]
        shift = millisec(shift) - spacermilli #the start time in the original video
        shift=max(shift, 0)
        gidx += 1
        captions = [[(int)(millisec(caption.start)), (int)(millisec(caption.end)),  caption.text] for caption in webvtt.read(str(gidx) + '.wav.vtt')]
        #captions = (list) webvtt.read(str(gidx) + '.wav.vtt')

        if captions:
            speaker = g[0].split()[-1]
            boxclr = def_boxclr
            spkrclr = def_spkrclr
            if speaker in speakers:
                speaker, boxclr, spkrclr = speakers[speaker]

        for c in captions:
            start = shift + c[0]
            start = start / 1000.0   #time resolution ot youtube is Second.
            startStr = '{0:02d}:{1:02d}:{2:06.3f}'.format((int)(start // 3600), (int)(start % 3600 // 60), start % 60)
            end = shift + c[1]
            end = end / 1000.0   #time resolution ot youtube is Second.
            endStr = '{0:02d}:{1:02d}:{2:06.3f}'.format((int)(end // 3600), (int)(end % 3600 // 60), end % 60)

        txt.append(f'[{startStr} --> {endStr}] [{speaker}] {c[2]}\n')

    with open(os.path.splitext(file_path)[0] + '.txt', 'w') as file:
        s = "".join(txt)
        file.write(s)

In [None]:
def clean_up():
    # Get the current working directory
    cwd = os.getcwd()
    # Iterate over all files and directories in the current working directory
    for item in os.listdir(cwd):
        # Get the full path of the item
        item_path = os.path.join(cwd, item)
        # Check if the item is a file and not the script
        if os.path.isfile(item_path) and not item_path.endswith('ipynb'):
            # Delete the file
            os.remove(item_path)

In [None]:
# Function that iterates over directory "Data"
def iterate():
    start = datetime.datetime.now()
    try:
        
        for filename in os.listdir("Data"):
            start_1 = start = datetime.datetime.now()
            file_path = os.path.join("Data", filename)
            audio_file = isolate_audio(file_path)
            just_text(audio_file, file_path)
            speaker_dz(audio_file)
            groups = transcribe(audio_file)
            write(groups, file_path)
            clean_up()
            end_1 = start = datetime.datetime.now()
            delta_1 = end_1 -  start_1
            print("Transkription for " + filename + "was " + str(delta_1))
            
    except Exception as e:
            print(f'An error has occured: {e}')
        
    end = datetime.datetime.now()
    delta = end - start
    print("Runtime for everything was " + str(delta))

In [None]:
iterate()