# Automatic speech recognization with diarization

This tutorial guide you through a process to perform ASR and speech diarization.

This is a very common use case where you have a single audio file where you want to separate the speaker automatically to displayed the conversation.



## Prepare audio file

Put `audio.wav` in the same dir as this notebook.

## Envrionment Setup

First you need to create a new virtual environment using python 3.10. This is the version that we have tested.

Check that your notebook kernel is pointing to the correct env

In [None]:
!pip --version

In [None]:
!pip install pydub pyannote.audio speechbrain diart
!pip install --upgrade google-cloud-speech
!pip install speechbrain

## Configuration

In [None]:
################################
# YOU MUST UPDATE THESE CONFIG
################################

PROJECT_ID = "XXX"
PROJECT_NUMBER = "XXX"
RAW_FILE = "audio_002.wav" # THis is your audio file to be processed.
PARTITION_DIR = "call_partition"
HF_TOKEN = "XXX"


## Let's rock it

In [None]:
LOCATION="global"
TARGET_AUDIO_FILE = "16k_" + RAW_FILE
print(TARGET_AUDIO_FILE)

import os

os.makedirs(PARTITION_DIR, exist_ok=True)
print(f"Directory '{PARTITION_DIR}' created successfully.")

In [None]:
#Don't need to run
from pydub.utils import mediainfo

def extract_audio_metadata(file_path):
    try:
        metadata = mediainfo(file_path)
        return metadata
    except Exception as e:
        print(f"Error: {e}")
        return None

# Example usage
file_path = RAW_FILE
metadata = extract_audio_metadata(file_path)
if metadata:
    print("Audio Metadata:")
    for key, value in metadata.items():
        if key == "sample_rate":
            print(f"{key}: {value}")


Resample the audio.

In [None]:
from pydub import AudioSegment

def convert_to_16k(audio_file):

    # Set the frame rate to 16000
    # Load the original audio file
    audio = AudioSegment.from_file(audio_file)

    # Set the frame rate to 16000
    audio_16k = audio.set_frame_rate(16000)

    # Export the result
    audio_16k.export("16k_" + audio_file, format="wav")

    return "16k_" + audio_file


convert_to_16k(RAW_FILE)

The following cell may take up to 10mins to run on CPU. You can speed up using GPU.

In [None]:
import os
import speechbrain
from pyannote.audio import Pipeline

# Model
pipeline = Pipeline.from_pretrained(
  "pyannote/speaker-diarization-3.1",
   use_auth_token=HF_TOKEN
  )

def diarize_audio(audio_file):

    # run the pipeline on an audio file
    diarization = pipeline(audio_file, num_speakers=2)
    base_name, _ = os.path.splitext(file_path)
    print(f"{base_name=}")
    # Define the new file name with the .rttm extension
    new_file_path = f"{base_name}.rttm"
    # dump the diarization output to disk using RTTM format
    with open(new_file_path, "w") as rttm:
        diarization.write_rttm(rttm)
    return diarization


diarize_audio(TARGET_AUDIO_FILE)


You should now is a `rttm` file generated. This file show the start/end interval for each speaker.

Let's see how's the content looks like.

In [None]:
def process_rttm(input_data):
    """
        3: start
        4: duration
        7: speaker

        return a list of [[start, end, speaker_id]]
    """
    segments = [line.strip().split() for line in input_data.strip().split(',')]
    results= []
    cur_speaker = None
    cur_runtime = 0

    for idx, segment in enumerate(segments):
        start_time, duration, speaker = segment[3], segment[4], segment[7]
        # print(idx, start_time, duration, speaker)
        data = [idx, start_time, duration, speaker]
        results.append(data)

    return results


RTTM_FILE = file_path.split(".")[0] + ".rttm"

print(RTTM_FILE)
with open(RTTM_FILE) as f:
    spk_diar = f.readlines()


results = process_rttm(str(spk_diar))
results

## Break audio

Now breakdown the audio file using rrtm

In [None]:
import subprocess

def cut_audio(input_file, start_time, duration, output_file):
    """
    Cut a portion of the audio file.

    :param input_file: Path to the input audio file.
    :param start_time: Start time for cutting in the format 'hh:mm:ss'.
    :param duration: Duration of the cut in the format 'hh:mm:ss'.
    :param output_file: Path to the output audio file.
    """
    command = [
        'ffmpeg',
        '-y',
        '-loglevel',
        'error',
        '-i', input_file,
        '-ss', start_time,
        '-t', duration,
        '-c', 'copy',
        output_file
    ]

    subprocess.run(command, check=True)

def break_audio(audio_file, metadatas, output_dir:str):
    # Check if the directory exists
    if not os.path.exists(output_dir):
        # Create the directory
        os.makedirs(output_dir)
        print(f"Directory '{output_dir}' was created.")
    else:
        print(f"Directory '{output_dir}' already exists.")

    for i in metadatas:
        idx, start_time, duration, speaker = i
        output_file = f"{output_dir}/{idx:02}-{start_time}-{duration}-{speaker}.wav"
        print(output_file)
        cut_audio(audio_file, start_time, duration, output_file)


break_audio(TARGET_AUDIO_FILE, results, PARTITION_DIR)

Let's sort the sequences.

In [None]:
import os

# List and sort files in the directory
files = sorted([f for f in os.listdir(PARTITION_DIR) if os.path.isfile(os.path.join(PARTITION_DIR, f))],
               reverse=False)

# Print the sorted list of file names
print(files)


Now you should have all the files (rttm and call_partition).


## Speech to text (Optional)

This is for you to view the transcription in notebook.

remember to update the `language_code` if needed.

In [None]:
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech


def speech_v2(
    project_id: str,
    audio_file: str,
) -> cloud_speech.RecognizeResponse:
    """Transcribe an audio file in a specific region."""
    # Instantiates a client to a regionalized Speech endpoint.
    client = SpeechClient()

    # Reads a file as bytes
    with open(audio_file, "rb") as f:
        content = f.read()

    config = cloud_speech.RecognitionConfig(
        auto_decoding_config=cloud_speech.AutoDetectDecodingConfig(),
        language_codes=["en-IN"],
        model="telephony",
    )

    request = cloud_speech.RecognizeRequest(
        recognizer=f"projects/{project_id}/locations/global/recognizers/_",
        config=config,
        content=content,
    )

    # Transcribes the audio into text
    response = client.recognize(request=request)

    data = []
    for result in response.results:
        # print(f"Transcript: {result.alternatives[0].transcript}")
        data.append(result.alternatives[0].transcript)
    return data



In [None]:
# !gcloud auth application-default login

In [None]:
def process_all_audio(audio_dir:str):

    for file in files[:10]:
        parts = file.split('-')
        # print(parts)
        speaker = parts[3]  
        try:
            result = speech_v2(PROJECT_ID, audio_dir + "/" + file)
        except:
            pass
        
        print(speaker, ":", result)


process_all_audio(PARTITION_DIR)