In [2]:
from src.utils.audio_utils import record_audio_chunk, concatenate_stream, download_mp3, start_continuous_recording, enhance_audio_signal, speach_activity_detection


from src.utils.system_utils import create_folder_structure, delete_chunk

#### Feature extraction mfcc

In [None]:
import librosa
import librosa.feature
import os

# Path to the folder containing audio slice files
slices_folder = 'audio/chunks/'

# Get a list of all audio slice files in the folder
slice_files = [f for f in os.listdir(slices_folder) if f.endswith(".wav")]

# Initialize a list to store MFCC features
mfcc_features = []

# Loop through each slice file and extract MFCC features
for slice_file in slice_files:
    slice_path = os.path.join(slices_folder, slice_file)
    audio_samples, sample_rate = librosa.load(slice_path, sr=None)
    
    # Extract MFCC features for the current slice
    mfcc = librosa.feature.mfcc(y=audio_samples, sr=sample_rate, n_mfcc=13)
    mfcc_features.append(mfcc)

### Step 2: Testing different clustering methods

#### Agglomerative Clustering with Cosine Similarity
We will use hierarchical agglomerative clustering with cosine similarity to cluster the audio features. This allows us to handle an unknown number of speakers.

In [None]:
import numpy as np
from scipy.cluster.hierarchy import linkage, fcluster, dendrogram
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt

# Assuming you have your MFCC features as mfcc_features (a list of arrays)
# Concatenate all MFCC features to form a feature matrix
feature_matrix = np.concatenate(mfcc_features, axis=1)

# Calculate the distance matrix using cosine similarity
# You might need to normalize your features before calculating cosine similarity
# Normalize the feature matrix using StandardScaler
scaler = StandardScaler()
normalized_feature_matrix = scaler.fit_transform(feature_matrix)
cosine_sim = 1 - np.dot(normalized_feature_matrix, normalized_feature_matrix.T)

# Perform hierarchical agglomerative clustering
linked = linkage(cosine_sim, method='ward')

# Plot the dendrogram (optional)
plt.figure(figsize=(10, 7))
dendrogram(linked, orientation='top', distance_sort='descending', show_leaf_counts=True)
plt.title('Dendrogram')
plt.xlabel('Sample Index')
plt.ylabel('Distance')
plt.show()

# Determine the optimal number of clusters using a distance threshold
distance_threshold = 0.4  # You can experiment with different thresholds
cluster_labels = fcluster(linked, distance_threshold, criterion='distance')

# Print the cluster labels
print("Cluster labels:", cluster_labels)

#### DBSCAN 
DBSCAN is particularly useful when the number of clusters is not known in advance and can handle various shapes of clusters, including handling noise points effectively.

In [None]:
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler

# Concatenate your MFCC features into a feature matrix
feature_matrix = np.concatenate(mfcc_features, axis=1)

# Normalize the feature matrix using StandardScaler
scaler = StandardScaler()
normalized_feature_matrix = scaler.fit_transform(feature_matrix)

# Initialize DBSCAN with appropriate parameters
eps = 0.5  # Adjust the epsilon (neighborhood distance) based on your data
min_samples = 5  # Adjust the minimum number of samples in a neighborhood

dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric='euclidean')

# Fit DBSCAN
cluster_labels = dbscan.fit_predict(normalized_feature_matrix)

# Print the cluster labels
print("Cluster labels:", cluster_labels)

#### Step 3: Cluster Validation (Silhouette Score - Same as before)
##### Calculate the Silhouette Score to evaluate the quality of the clusters.

In [None]:
from sklearn.metrics import silhouette_score

# Calculate the Silhouette Score
silhouette_avg = silhouette_score(feature_matrix, cluster_labels)
print(f"Average Silhouette Score: {silhouette_avg}")

#### Step 4: Speaker Diarization with Speaker Tracking
##### To perform speaker diarization and remember speakers across different time slices, we will use a tracking mechanism based on the cluster labels.

In [None]:
# Create a dictionary to store the speakers and their corresponding segments
speakers_dict = {}

# Loop through each segment and track the speakers
for idx, label in enumerate(cluster_labels):
    if label not in speakers_dict:
        speakers_dict[label] = [idx]
    else:
        speakers_dict[label].append(idx)

# Print the diarization results
for speaker, segments in speakers_dict.items():
    print(f"Speaker {speaker}: Segments {segments}")

#### Create path structure

In [None]:
import os
import shutil

folder_path = 'test'
subfolder_name = 'audio'
full_subfolder_path = os.path.join(folder_path, subfolder_name)

# Check if the folder exists
if os.path.exists(full_subfolder_path):
    # If it exists, delete the folder and its contents
    shutil.rmtree(folder_path)
    
# Create the 'test' folder and 'audio' subfolder
os.makedirs(full_subfolder_path)

print("Folder structure created successfully.")

##### delete file

In [None]:
import os

parent_folder = 'test'
subfolder = 'audio'
file_to_delete = 'example.txt'  # file to delete

file_path = os.path.join(parent_folder, subfolder, file_to_delete)

if os.path.exists(file_path):
    os.remove(file_path)
    print(f"File '{file_to_delete}' has been deleted.")
else:
    print(f"File '{file_to_delete}' does not exist.")

#### speach detection 

We will need to make sure that these speaker turns match with the transcriptions

In [11]:
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
import os
import pandas as pd
from src.utils.audio_utils import speach_activity_detection 

#f = 'audio/chunks/recorded_audio_20230819_231924.wav'
#enhance_audio_signal(audio_path=f)

#adjust target amplitude

# Path to the folder containing audio segments
segments_folder = 'test/audio/temp/temp_speaker_segements/SPEAKER_01/recorded_audio_crop_0.wav'
df = speach_activity_detection(filename=segments_folder)
df.iloc[:][['start', 'stop']].values[0][1]

0.8

#### clustering k-means

In [None]:
segments_folder = 'audio/chunks/'
files_path_list = []    
for filename in os.listdir(segments_folder):
    if filename.endswith('.wav'):
        segment_path = os.path.join(segments_folder, filename)
        files_path_list.append(segment_path)


import numpy as np
import librosa
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import os

# Function to extract MFCC features for a given audio file
def extract_mfcc(audio_file):
    y, sr = librosa.load(audio_file)
    mfcc = librosa.feature.mfcc(y=y, sr=sr)
    return mfcc.T  # Transpose to have time on the x-axis

# Directory containing your 10-second audio files
audio_dir = 'audio/chunks/'

# List audio files in the directory
audio_files = [os.path.join(audio_dir, filename) for filename in os.listdir(audio_dir) if filename.endswith('.wav')]

# Extract MFCC features for all audio files
all_mfcc = np.vstack([extract_mfcc(audio_file) for audio_file in audio_files])

# Perform PCA to reduce dimensionality for visualization
pca = PCA(n_components=2)
reduced_mfcc = pca.fit_transform(all_mfcc)

# Perform K-means clustering
num_clusters = 4  # You can adjust the number of clusters as needed
kmeans = KMeans(n_clusters=num_clusters)
cluster_labels = kmeans.fit_predict(reduced_mfcc)

# Plot the clusters
plt.figure(figsize=(8, 6))
scatter = plt.scatter(reduced_mfcc[:, 0], reduced_mfcc[:, 1], c=cluster_labels, cmap='rainbow')
plt.legend(*scatter.legend_elements(), title="Clusters")
plt.title('K-means Clustering of Audio Segments')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.show()

#### diarization test pyAudioAnalysis

In [None]:
from pyAudioAnalysis import audioSegmentation as aS
import matplotlib.pyplot as plt
import numpy as np

# Load your audio file
audio_file = "audio/chunks/recorded_audio_20230820_030026.wav"

# Perform speaker diarization
result = aS.speaker_diarization(audio_file, n_speakers=2)

# Extract the segments from the result
segments = result

# Detect overlapping segments
overlapping_segments = []

for i in range(len(segments) - 1):
    #print(f"segemnt: {segments[i]}")
    print(f"segement 1: {segments[2]} \n and segment 2: {segments[i + 1]} \n =================================================================")
    end_time_speaker1, start_time_speaker2 = segments[i][1], segments[i + 1][0]
    
    if end_time_speaker1 > start_time_speaker2:
        overlapping_segments.append((start_time_speaker2, end_time_speaker1))

# Plot the waveform of the audio
audio, _ = aS.readAudioFile(audio_file)
plt.figure(figsize=(12, 6))
plt.plot(np.linspace(0, len(audio) / 44100, num=len(audio)), audio)
plt.xlabel("Time (s)")
plt.title("Audio Waveform")

# Highlight overlapping segments
for segment in overlapping_segments:
    plt.axvspan(segment[0], segment[1], color='red', alpha=0.5)

# Display the plot
plt.show()

# Print or visualize overlapping segments
for i, segment in enumerate(overlapping_segments):
    print(f"Overlap {i+1} at {segment[0]} to {segment[1]} seconds.")
