# **Pyannote speaker-diarization-3.1**
credits delik [huggingface](https://huggingface.co/Delik) [github](https://github.com/D3lik)

join our server to talk about open source ai!

 [![Discord](https://img.shields.io/discord/1198701940511617164?color=%23738ADB&label=Discord&style=for-the-badge)](https://discord.gg/osai)


In [None]:
# @title Install requirements
from IPython.display import clear_output
!pip install pyannote-audio==3.1.1 wavio
clear_output()
print("Finished Installing")

In [None]:
# @title Run audio diarization
import torch
import os
from pyannote.audio import Pipeline
from google.colab import files
num_speakers = 2 # @param {type:"slider", min:0, max:10, step:1}
min_speakers = 2 # @param {type:"slider", min:0, max:10, step:1}
max_speakers  = 2 # @param {type:"slider", min:0, max:10, step:1}
audio_path = "Path to your audio" # @param {type:"string"}
api = "Your huggingface api key" # @param {type:"string"}
os.environ["api"] = api
try:
    pipeline = Pipeline.from_pretrained(
        "pyannote/speaker-diarization-3.1",
        use_auth_token=os.environ["api"]
    )
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    pipeline.to(device)
except Exception as e:
    print(f"Error initializing pipeline: {e}")
    pipeline = None

def save_audio(audio_path):
    if pipeline is None:
        return "Error: Pipeline not initialized"
    return audio_path

def diarize_audio(temp_file, num_speakers, min_speakers, max_speakers):
    if pipeline is None:
        return "Error: Pipeline not initialized"
    try:
        params = {}
        if num_speakers > 0:
            params["num_speakers"] = num_speakers
        if min_speakers > 0:
            params["min_speakers"] = min_speakers
        if max_speakers > 0:
            params["max_speakers"] = max_speakers

        diarization = pipeline(temp_file, **params)
    except Exception as e:
        return f"Error processing audio: {e}"

    return str(diarization)

def timestamp_to_seconds(timestamp):
    try:
        h, m, s = map(float, timestamp.split(':'))
        return 3600 * h + 60 * m + s
    except ValueError as e:
        print(f"Error converting timestamp to seconds: '{timestamp}'. Error: {e}")
        return None

def generate_labels_from_diarization(diarization_output):
    successful_lines = 0
    labels_path = 'labels.txt'
    try:
        with open(labels_path, 'w') as outfile:
            lines = diarization_output.strip().split('\n')
            for line in lines:
                try:
                    parts = line.strip()[1:-1].split(' --> ')
                    start_time = parts[0].strip()
                    end_time = parts[1].split(']')[0].strip()
                    label = line.split()[-1].strip()
                    start_seconds = timestamp_to_seconds(start_time)
                    end_seconds = timestamp_to_seconds(end_time)
                    outfile.write(f"{start_seconds}\t{end_seconds}\t{label}\n")
                    successful_lines += 1
                except Exception as e:
                    print(f"Error processing line: '{line.strip()}'. Error: {e}")
        print(f"Processed {successful_lines} lines successfully.")
        return labels_path if successful_lines > 0 else None
    except Exception as e:
        print(f"Cannot write to file '{labels_path}'. Error: {e}")
        return None

def process_audio(audio, num_speakers=0, min_speakers=0, max_speakers=0):
    diarization_result = diarize_audio(save_audio(audio), num_speakers, min_speakers, max_speakers)
    if diarization_result.startswith("Error"):
        return diarization_result, None
    else:
        label_file = generate_labels_from_diarization(diarization_result)
        return diarization_result, label_file

diarization_result, label_file = process_audio(audio_path, num_speakers, min_speakers, max_speakers)
print(diarization_result)
files.download('labels.txt')
print("Files downloaded to device")