# Audio Transcription

## Audio Track of Interview - 2 Speakers

In [None]:
import librosa

In [None]:
audio_path = 'interview.mp3'
y, sr = librosa.load(audio_path, sr=None)

In [None]:
from IPython.display import Audio

# Play the loaded audio
Audio(data=y, rate=sr)

## Pyannote Audio Transcription

In [None]:
#!pip install --upgrade pyannote.audio

This is a **gated model**! You must request access that is linked to your HF token at both these links:
* https://huggingface.co/pyannote/speaker-diarization-3.1
* https://huggingface.co/pyannote/segmentation-3.0

## Processing

If you have fully setup CUDA with your NVIDIA card, you may want to consider trying to use your GPU. Note: We do not support installation or setup support in this course for CUDA due to the difficuly in trying to help someone install this without access to their computer.

```python
import torch
pipeline.to(torch.device("cuda"))
```

## Pipeline

In [None]:

from huggingface_hub import login
login()

In [None]:
%%time
from pyannote.audio import Pipeline

diarization_pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1", use_auth_token="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"
)

In [None]:
%%time
# This will take a very long time on less powerful computers!
from pyannote.audio.pipelines.utils.hook import ProgressHook
with ProgressHook() as hook:
    diarization = diarization_pipeline("interview.mp3", hook=hook)

In [None]:
diarization

In [None]:
diarization.chart()

In [None]:
diarization.discretize()

In [None]:
# print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")

### Function to combine total start and stop speaking times

In [None]:
def consolidate_speaker_segments(diarization):
    consolidated_segments = []
    current_speaker = None
    segment_start = None

    for turn, _, speaker in diarization.itertracks(yield_label=True):
        if speaker != current_speaker:
            if current_speaker is not None:
                consolidated_segments.append((current_speaker, segment_start, turn.start))
            current_speaker = speaker
            segment_start = turn.start
        segment_end = turn.end

    if current_speaker is not None:
        consolidated_segments.append((current_speaker, segment_start, segment_end))

    return consolidated_segments

In [None]:
# Example usage with the provided diarization object:
segments = consolidate_speaker_segments(diarization)

In [None]:
segments

In [None]:
for speaker, start, end in segments:
    print(f"speaker_{speaker} start={start:.1f}s stop={end:.1f}s")

In [None]:
!pip install pydub

In [None]:
from pydub import AudioSegment
import os

def split_audio_segments(audio_file, segments, output_dir='segmented_audio'):
    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Load the audio file
    audio = AudioSegment.from_file(audio_file)

    # Iterate over the segments and export each one
    for idx, (speaker, start, end) in enumerate(segments):
        # Calculate start and end in milliseconds
        start_ms = start * 1000
        end_ms = end * 1000

        # Extract the segment
        segment = audio[start_ms:end_ms]

        # Create the output file name
        speaker_label = speaker.split('_')[-1]  # Get speaker identifier
        output_file = os.path.join(output_dir, f"{idx:02d}_SPEAKER{speaker_label}_START{start:.0f}_STOP{end:.0f}.mp3")

        # Export the segment
        segment.export(output_file, format="mp3")
        print(f"Exported {output_file}")

In [None]:
split_audio_segments('interview.mp3',segments)

# Speech Transcription

In [None]:
# Use a pipeline as a high-level helper
from transformers import pipeline

# model = https://huggingface.co/facebook/wav2vec2-base-960h
pipe = pipeline("automatic-speech-recognition")

In [None]:
import os
import re

def process_segmented_files(directory='segmented_audio'):
    # Ensure the directory exists
    if not os.path.exists(directory):
        raise FileNotFoundError(f"Directory '{directory}' does not exist.")

    # List all files in the directory
    files = os.listdir(directory)


    # Process each file
    for file in files:
        file_path = os.path.join(directory, file)
        if os.path.isfile(file_path):
            text = pipe(file_path)['text']
            num,speaker,time_start,time_stop = file.split('_')
            time_stop = time_stop.replace(".mp3",'')
            print(f"{speaker}-- {time_start}sec {time_stop}sec:\n{text}")
            print('\n\n')

In [None]:
process_segmented_files()