In [1]:
!pip install pyannote.audio
!pip install accelerate
!pip install pydub
!pip install whisper-timestamped

## Speaker diarization

In [2]:
import torch
from pyannote.audio import Pipeline
from pydub import AudioSegment
import datetime
import whisper_timestamped as whisper
import time
import os
import gc

  torchaudio.set_audio_backend("soundfile")


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



In [3]:
# Inicializar el modelo de diarización de pyannote
diarization_pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token="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")
diarization_pipeline.to(torch.device("cuda"))

device = "cuda:0" if torch.cuda.is_available() else "cpu"

model = whisper.load_model("large-v3", device=device)


In [4]:

def conversion_diarizacion_transcripcion(input_file,model,nombre_salida):
  conversiones = [
      ("output_wav_pcm.wav", "pcm_s16le", None)       # WAV sin compresión
  ]

  def format_time(seconds):
    """ Convertir segundos a formato de tiempo HH:MM:SS """
    return str(datetime.timedelta(seconds=int(seconds)))


  for output_file, codec, bitrate in conversiones:
      if bitrate:
          # Comando para conversiones con bitrate específico
          command = f"ffmpeg -i {input_file} -acodec {codec} -ar 44100 -b:a {bitrate} {output_file}"
      else:
          # Comando para conversiones sin bitrate específico (sin pérdida)
          command = f"ffmpeg -i {input_file} -acodec {codec} -ar 44100 {output_file}"
      os.system(command)
      print(f"Archivo {output_file} creado.")



    # Ejecutar diarización
  inicio_diarizacion = time.time()

  diarization = diarization_pipeline("output_wav_pcm.wav", num_speakers=2)
  final_diarizacion = time.time()

  tiempo_diarizacion=final_diarizacion-inicio_diarizacion


  # Cargar y configurar el modelo de Whisper para el archivo completo
  inicio_transcripcion = time.time()

  audio = whisper.load_audio("output_wav_pcm.wav")
  transcription_result = whisper.transcribe(model, "output_wav_pcm.wav", language="es", vad=True,detect_disfluencies=True,
                                               beam_size=5, best_of=5, temperature=(0.0, 0.2, 0.4, 0.6, 0.8, 1.0))
  final_transcripcion = time.time()

  tiempo_transcripcion=final_transcripcion-inicio_transcripcion



    # Preparar el archivo de salida y procesar la diarización y transcripción
  with open(f"{nombre_salida}.txt", "w", encoding='utf-8') as output_file:
        last_speaker = None
        last_start = 0
        last_end = 0
        last_transcription = ""
        total_confidence = 0
        segments = transcription_result['segments']

        for i, segment in enumerate(segments):
            start_time = segment['start']
            end_time = segment['end']
            transcript_text = segment['text'].strip()
            segment_confidence = segment['confidence']
            total_confidence += segment_confidence

            # Buscar el hablante que más se solapa con este segmento
            speaker_label = None
            max_overlap = 0
            for turn, _, speaker in diarization.itertracks(yield_label=True):
                overlap = min(end_time, turn.end) - max(start_time, turn.start)
                if overlap > max_overlap:
                    max_overlap = overlap
                    speaker_label = speaker

            # Si no hay hablante identificado y estamos en el primer o último segmento, asignamos el último o el primer hablante conocido
            if not speaker_label:
                if i == 0:
                    speaker_label = last_speaker  # Use the last known speaker for the first segment if none identified
                elif i == len(segments) - 1:
                    speaker_label = last_speaker  # Use the last known speaker for the last segment if none identified

            # Combinar segmentos si el hablante es el mismo y no hay pausa significativa
            if speaker_label == last_speaker and (start_time - last_end) < 1:
                last_end = end_time
                last_transcription += " " + transcript_text
            else:
                if last_speaker is not None:
                    # Escribir el segmento previo
                    output_file.write(f"{format_time(last_start)} - {format_time(last_end)} [{last_speaker}]: {last_transcription}\n")
                last_speaker = speaker_label
                last_start = start_time
                last_end = end_time
                last_transcription = transcript_text
        # Calcular y mostrar la confianza promedio por segmento para toda la transcripción
        average_confidence = total_confidence /  len(transcription_result['segments']) if transcription_result['segments'] else 0
        print(f"Diarization and transcription completed. Average segment confidence: {average_confidence:.2f}")

        del diarization
        del audio
        del transcription_result
        del output_file
        gc.collect()

  return(average_confidence,tiempo_diarizacion,tiempo_transcripcion)

In [5]:
# OS BASED PATH
# base = Path(__file__).resolve().parent
# Notebook Path
base = os.path.abspath('')

audio_path = os.path.join(base, "14838701_2.WAV")
transcribe_path = os.path.join(base, "transcribe.txt")

input_file = "14838701_2.WAV"
nombre_salida = "transcribe"

average_confidence,tiempo_diarizacion,tiempo_transcripcion = conversion_diarizacion_transcripcion(input_file,model,nombre_salida)
print(average_confidence)
print(tiempo_diarizacion)
print(tiempo_transcripcion)

Archivo output_wav_pcm.wav creado.


100%|██████████| 7108/7108 [00:23<00:00, 296.78frames/s]


Diarization and transcription completed. Average segment confidence: 0.46
0.4641363636363636
12.728769540786743
50.675360679626465


## Keyword find

Preproceso para unir, todos los textos de un speaker

In [6]:
import re

with open("transcribe.txt", "r", encoding="UTF-8") as f:
    transcribe = f.readlines()

# Keyword preprocessing
speaker1 = [line for line in transcribe if "SPEAKER_00" in line]
speaker2 = [line for line in transcribe if "SPEAKER_01" in line]

# Join the lines of each speaker
speaker1 = " ".join(speaker1)
speaker2 = " ".join(speaker2)

# Remove the strings that match the pattern 00:00:00 - 0:00:26 [SPEAKER_00]:
pattern = r"\d{1,2}:\d{2}:\d{2} - \d{1,2}:\d{2}:\d{2} \[SPEAKER_\d{2}\]:"

speaker1 = re.sub(pattern, "", speaker1)
speaker2 = re.sub(pattern, "", speaker2)

speaker1 = speaker1.split("\n")
speaker2 = speaker2.split("\n")

Keyword find per item

Load spanish pipeline

In [None]:
!python -m spacy download es_core_news_lg
# !python -m spacy download es_core_news_md

In [33]:
import spacy
from spacy import displacy

spanish_nlp = spacy.load("es_core_news_lg")
nlp = spacy.blank("es", vocab=spanish_nlp.vocab)

#use spacy in each item of the speaker list
speaker1 = [nlp(line) for line in speaker1]
speaker2 = [nlp(line) for line in speaker2]

# Find type of conversation using spacy
# Find the most common entity in the conversation
speaker1_entities = [ent.label_ for line in speaker1 for ent in line.ents]
speaker2_entities = [ent.label_ for line in speaker2 for ent in line.ents]

print(speaker1_entities)
print(speaker2_entities)

['PER', 'PER', 'PER', 'PER', 'PER', 'LOC', 'ORG', 'MISC', 'PER']
['LOC', 'PER', 'PER', 'PER', 'LOC']


In [34]:
displacy.serve(speaker1[5], style="ent")


Using the 'ent' visualizer
Serving on http://0.0.0.0:5000 ...

Shutting down server on port 5000.
