In [57]:
import gc
from dataclasses import dataclass
from pathlib import Path
from typing import Callable

from audio_similarity import AudioSimilarity
from tqdm.notebook import tqdm
from sklearn.cluster import DBSCAN, HDBSCAN
import json
import plotly.express as px
import polars as pl
import multiprocessing
from tqdm.contrib.concurrent import process_map
from functools import partial

In [2]:
# Get files
def discover_audio(*audio_files_root_dir: Path) -> set[Path]:
    return set().union(*(set(map(lambda x: x.absolute(), i.rglob(f"*.{ext}"))) for ext in ['mp3', 'ogg'] for i in audio_files_root_dir))

# audio_files = discover_audio(Path('calmyleon/RAIN'), Path('calmyleon/RAIN2'), Path('calmyleon/RAIN3'), Path('calmyleon/RAIN4'), Path('calmyleon/RAIN5'), Path('shuteye.ai'))
audio_files = discover_audio(Path('calmyleon/RAIN'), Path('shuteye.ai'))
print(f"Total of {len(audio_files)} audio files")

Total of 37 audio files


In [3]:
@dataclass(frozen=True, eq=True, order=True, unsafe_hash=True, slots=True)
class AudioSimilarityResult:
    audio_file_a: Path
    audio_file_b: Path
    zcr_similarity: float
    rhythm_similarity: float
    chroma_similarity: float
    spectral_contrast_similarity: float
    perceptual_similarity: float
    swass: float
    
    def to_dict(self):
        return {
            'audio_file_a': self.audio_file_a,
            'audio_file_b': self.audio_file_b,
            'zcr_similarity': self.zcr_similarity,
            'rhythm_similarity': self.rhythm_similarity,
            'chroma_similarity': self.chroma_similarity,
            'spectral_contrast_similarity': self.spectral_contrast_similarity,
            'perceptual_similarity': self.perceptual_similarity,
            'swass': self.swass
        }
    

def measure_similarity(audio_file_a: Path, audio_file_b: Path, verbose: bool = False) -> AudioSimilarityResult:
    sample_rate = 44100
    weights = {
        'zcr_similarity': 0.25,
        'rhythm_similarity': 0.25,
        'chroma_similarity': 0.25,
        'spectral_contrast_similarity': 0.25,
        'perceptual_similarity': 0.0
    }
    similarity = AudioSimilarity(str(audio_file_a), str(audio_file_b), sample_rate, weights, verbose=verbose)
    metrics = similarity.stent_weighted_audio_similarity(metrics='all')
    
    del similarity
    gc.collect()
    
    return AudioSimilarityResult(
        audio_file_a=audio_file_a,
        audio_file_b=audio_file_b,
        **metrics
    )

In [15]:
# Measure similarity between all pairs of tracks
#progress_bar = tqdm(total=len(audio_files) ** 2)

similarities = [AudioSimilarityResult(**i) for i in json.loads(Path('similarities.json').read_text())]

def create_job(audio_files: tuple[Path, Path]) -> AudioSimilarityResult:
    return measure_similarity(audio_file_a=audio_files[0], audio_file_b=audio_files[1])

def compute_similarities(audio_files: set[Path]) -> list[AudioSimilarityResult]:
    similarities_by_file = {(Path(i.audio_file_a), Path(i.audio_file_b)) for i in similarities}
    matrix = [(i, j) for i in audio_files for j in audio_files if (i, j) not in similarities_by_file]
    with multiprocessing.Pool(multiprocessing.cpu_count()//2) as pool:
        return list(tqdm(pool.imap(create_job, matrix), total=len(matrix)))

similarities += compute_similarities(audio_files)
print(f"Total of {len(similarities)} similarities")

0it [00:00, ?it/s]

Total of 1369 similarities


In [18]:
# Write similarities to a file
_ = Path('similarities.json').write_text(json.dumps([AudioSimilarityResult.to_dict(s) for s in similarities], default=str))

In [70]:
# Create a function to cluster similarities
def cluster_similarities(similarity_matrix: pl.DataFrame, min_samples: int = 4) -> dict[int, list[Path]]:
    dbscan = HDBSCAN(min_samples=min_samples, metric='precomputed')
    # Generate distance matrix from similarities
    X = similarity_matrix.select(pl.exclude('a')).to_numpy()
    dbscan.fit(X)
    clusters = {i: [] for i in range(-1, len(set(dbscan.labels_)))}
    for i, label in enumerate(dbscan.labels_):
        clusters[label].append(similarity_matrix['a'][i])
    return clusters

In [74]:
file_similarities = {(i.audio_file_a, i.audio_file_b): i for i in similarities}

In [75]:
def get_similarity_df(score_func: Callable[[AudioSimilarityResult], float]):
    return pl.DataFrame([dict(a=Path(key[0]).name, b=Path(key[1]).name, score=score_func(value)) for key, value in file_similarities.items()])

def get_similarity_matrix(similarity_df: pl.DataFrame):
    return similarity_df.pivot(values='score', index='a', columns='b')

In [76]:
# Cluster similarities
similarity_matrix = get_similarity_matrix(get_similarity_df(lambda x: x.swass))
index_to_name = {i: name for i, name in enumerate(similarity_matrix.columns)}
clusters = cluster_similarities(similarity_matrix)
print(f"Total of {len(clusters)} clusters. Total files: {len(similarity_matrix.columns)}")

ValueError: The precomputed distance matrix is expected to be symmetric, however its values appear to be asymmetric. Please verify that the distance matrix was constructed correctly.

In [80]:
# How similar are RAIN files to each other? How similar are RAIN files to shuteye.ai files?
score_df = []
for score_field_name in ['zcr_similarity', 'rhythm_similarity', 'chroma_similarity', 'spectral_contrast_similarity', 'perceptual_similarity', 'swass']:
    similarity_df = get_similarity_df(lambda x: getattr(x, score_field_name)).filter(pl.col('a') < pl.col('b'))
    
    rain_to_rain_similarity = similarity_df.filter(pl.col('a').str.contains('RAIN') & pl.col('b').str.contains('RAIN'))
    sample_rain_file = rain_to_rain_similarity['a'][0]
    rain_to_shuteye_similarity = similarity_df.filter(
        ((pl.col('a').str.contains('RAIN')) & (~pl.col('b').str.contains('RAIN'))) |
        ((pl.col('b').str.contains('RAIN')) & (~pl.col('a').str.contains('RAIN')))
    )
    shuteye_to_shuteye_similarity = similarity_df.filter((~pl.col('a').str.contains('RAIN')) & (~pl.col('b').str.contains('RAIN')))
    
    score_df.append(
        rain_to_rain_similarity.select('score').with_columns(
            dataset=pl.lit('RAIN to RAIN'),
            metric=pl.lit(score_field_name)
        )
    )
    score_df.append(
        rain_to_shuteye_similarity.select('score').with_columns(
            dataset=pl.lit('RAIN to shuteye.ai'),
            metric=pl.lit(score_field_name)
        )
    )
    score_df.append(
        shuteye_to_shuteye_similarity.select('score').with_columns(
            dataset=pl.lit('shuteye.ai to shuteye.ai'),
            metric=pl.lit(score_field_name)
        )
    )
    
score_df = pl.concat(score_df)
# Draw as box plot. Each box plot will represent a different metric. color will depend on the dataset.
fig = px.box(score_df.to_pandas(), x='metric', y='score', color='dataset', title='Similarity between RAIN files and shuteye.ai files')
fig.show()