In [1]:
import numpy as np
from pyannote.audio import Pipeline, Model, Inference
import os
from dotenv import load_dotenv
import random
import torch
from scipy.spatial.distance import cdist
from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN, MeanShift
from sklearn.metrics import rand_score, adjusted_mutual_info_score, silhouette_score, davies_bouldin_score
import seaborn as sns
import matplotlib.pyplot as plt
from copy import deepcopy

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
load_dotenv()
token = os.getenv("HUGGING_FACE_TOKEN")
directory  = "audio_snimci"

In [3]:
def insert_files_in_list(list_files: list, directory: str):
  for f in os.listdir(directory):
    if os.path.isfile(os.path.join(directory, f)):
      list_files.append(os.path.join(directory, f))


def function_heatmap(embedding: list):
  """speaker_1_list = []
  for dir in list_dictionary:
    insert_files_in_list(speaker_1_list,[],'a',dir)"""
  n = len(embedding)
  matrix = [[0]*n for _ in range(n)]
  for i in range(n):
    for j in range(n):
      matrix[i][j] = cdist([embedding[i]],[embedding[j]],metric="cosine")[0,0]
  sns.heatmap(matrix)
  return matrix
def distribution(speaker_dictionaries,matrix):
  previous_count = 0
  cordinates_for_speaker = []
  for speaker in speaker_dictionaries:
    cordinates_for_speaker.append((previous_count,previous_count+len(os.listdir(speaker))))
    previous_count+=len(os.listdir(speaker))
  print(cordinates_for_speaker)
  from_same_group = []
  for speaker in cordinates_for_speaker:
    l_cord = speaker[0]
    r_cord = speaker[1]
    for row in matrix[l_cord:r_cord]:
      for e in row[l_cord:r_cord]:
        from_same_group.append(e)

  np_matrix = np.array(deepcopy(matrix))
  for speaker in cordinates_for_speaker:
    l_cord = speaker[0]
    r_cord = speaker[1]
    np_matrix[l_cord:r_cord,l_cord:r_cord] = 2
  different_group = np_matrix.flatten()[np_matrix.flatten()!=2]

  plt.hist(from_same_group,bins=20,color='green',alpha=0.5)
  plt.hist(different_group,bins=10,color='red',alpha=0.5)
  plt.show()

def clusterisation_result(list_files_name: list, prediction: list):
  p = {}
  prediction_n = [ int(f) for f in prediction]
  
  for l in sorted(list(zip(list_files_name,prediction_n))):
    if l[1] in p:
      if not l[0] in p[l[1]]:
        p[l[1]].append(l[0])
    else:
      p[l[1]] = []
      p[l[1]].append(l[0])
  for key, value in sorted(p.items()):
      #print(f"{key}: {[ v.split('/')[0] for v in value]}")
      print(f"{key}: {[ v for v in value]}")


minimal_treshold = 0.4
def calculate_threshold(i,embedding):
  cosine_dist = sorted([ cdist([i],[j],metric='cosine')[0,0] for j in embedding])
  difference = []
  max_v = 0
  index = 0
  for i in range(1,len(cosine_dist)-1):
    diff = cosine_dist[i+1]-cosine_dist[i]
    difference.append(diff)
    if diff>max_v and (cosine_dist[i]+cosine_dist[i+1])/2>minimal_treshold:
      index = i
      max_v = diff
  treshold = (cosine_dist[index+1]+cosine_dist[index])/2
  if treshold<0.8:
    return treshold
  return 0.8


def my_algorithm_v2(list_files_new,embedding):
  print("My algorithm")
  n = len(list_files_new)
  prediction = [-1]*n
  cluster=0
  for i,audio_first in enumerate(embedding):
    if prediction[i]>-1:
      continue
    treshold = calculate_threshold(audio_first,embedding)
    for j,audio_second in enumerate(embedding):
      if prediction[j]>-1:
        continue
      if cdist([audio_first],[audio_second],metric='cosine')[0,0]<treshold:
        prediction[j] = cluster
    cluster+=1

  clusterisation_result(list_files_new, prediction)



In [4]:
model = Model.from_pretrained("pyannote/embedding",
                              use_auth_token=token)
inference = Inference(model, window="whole")

/home/lenovo/Desktop/Ognjen/ClickerProject/venv/lib/python3.12/site-packages/pytorch_lightning/utilities/migration/migration.py:208: You have multiple `ModelCheckpoint` callback states in this checkpoint, but we found state keys that would end up colliding with each other after an upgrade, which means we can't differentiate which of your checkpoint callbacks needs which states. At least one of your `ModelCheckpoint` callbacks will not be able to reload the state.
Lightning automatically upgraded your loaded checkpoint from v1.2.7 to v2.5.1.post0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../../.cache/torch/pyannote/models--pyannote--embedding/snapshots/4db4899737a38b2d618bbd74350915aa10293cb2/pytorch_model.bin`
Lightning automatically upgraded your loaded checkpoint from v1.2.7 to v2.5.1.post0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../../.

Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.8.1+cu102, yours is 2.7.0+cu118. Bad things might happen unless you revert torch to 1.x.
Model was trained with pyannote.audio 0.0.1, yours is 3.3.2. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.8.1+cu102, yours is 2.7.0+cu118. Bad things might happen unless you revert torch to 1.x.


/home/lenovo/Desktop/Ognjen/ClickerProject/venv/lib/python3.12/site-packages/pytorch_lightning/core/saving.py:195: Found keys that are not in the model state dict but in the checkpoint: ['loss_func.W']


In [5]:
list_files = []

insert_files_in_list(list_files,directory)
embedding = [inference(file) for file in list_files]
combined = list(zip(list_files,embedding))
random.shuffle(combined)

list_files_new, embedding_new = zip(*combined)

In [6]:
my_algorithm_v2(list_files_new,embedding_new)

My algorithm
0: ['audio_snimci/1.wav', 'audio_snimci/3.wav']
1: ['audio_snimci/2.wav', 'audio_snimci/4.wav']
