In [3]:
import essentia
# there are two operating modes in essentia which (mostly) have the same algorithms
# they are accessible via two submodules:
# import essentia.standard
# import essentia.streaming

import sys
from pathlib import Path
module_path = Path("../src/technotaggr/")
sys.path.append(str(module_path))


In [21]:
root = Path('..')
print(list(Path("..").rglob("*Funken.aiff")))
data = root / "song_data" / "music-library-analysis-dataset"
models = root /"src" / "technotaggr" / "models"
print(root)
print(data)
print(models)


[PosixPath('../song_data/music-library-analysis-dataset/1-01 Funken.aiff')]
..
../song_data/music-library-analysis-dataset
../src/technotaggr/models


In [13]:
print(Path(data / "1-01 Funken.aiff").resolve())

/Users/mishkin/Desktop/gitclones/technotaggr/song_data/music-library-analysis-dataset/1-01 Funken.aiff


In [15]:
names = {}
for i, f in enumerate(data.iterdir()):
    if f.name != '.DS_Store':
        names[f"name_{i}"] = f.name
display(names)   

{'name_0': '37 - Neverland (Aparde Remix).mp3',
 'name_1': 'Intensity_Fluctuations_-_Ø_[Phase]_(Setaoc_Mass_remix).mp3',
 'name_2': 'Artaphine - Toru Ikemoto - _080 [Artaphine Premiere].mp3',
 'name_4': '01 Testify.flac',
 'name_5': '01 Heal My Soul.aiff',
 'name_6': '01 Power to the Soul.aiff',
 'name_7': 'Aus_der_Tiefe_der_Zeit_-_Obscure_Shape,_SHDW_(Original_Mix).mp3',
 'name_8': '01. Eh Wut.flac',
 'name_9': '1-02 Tide.aiff',
 'name_10': 'Last_Charms_-_Hyden_(Alarico_Remix)_(SYEP009).mp3',
 'name_11': '03 For Marco.aiff',
 'name_12': '10 - DJ HEARTSTRING - VISION OF ECSTASY.mp3',
 'name_13': 'Kolter - Bob Marley - Could you be loved (Kolter Edit).aiff',
 'name_14': 'Cyclo_-_Traumer_(Original_Mix).mp3',
 'name_15': 'Uncertain, Alarico - Absence (Original Mix).aiff',
 'name_16': '1-01 Funken.aiff',
 'name_17': '1-02 Bashment Boogie.aiff',
 'name_18': 'in_aeternam.aiff',
 'name_19': 'What To Do (&ME Remix) - Guy Gerber .aiff',
 'name_20': '1-02 We Will Bring It Back.aiff',
 'name_21':

In [None]:
from essentia.standard import MonoLoader, TensorflowPredictMusiCNN, TensorflowPredict2D
# name = "1-01 Funken.aiff"
audio_path = Path(data / names["name_0"]).as_posix()
embedded_model_path = models / "feature-extractors" / "musicnn" / "msd-musicnn-1" / "msd-musicnn-1.pb"
classifier_model_path = models / "classification-heads" / "mood_happy" / "mood_happy-msd-musicnn-1.pb"
audio = MonoLoader(filename= audio_path, sampleRate=16000, resampleQuality=4)()
embedding_model = TensorflowPredictMusiCNN(graphFilename=embedded_model_path, output="model/dense/BiasAdd")
embeddings = embedding_model(audio)

model = TensorflowPredict2D(graphFilename=classifier_model_path, output="model/Softmax")
predictions = model(embeddings)

[   INFO   ] TensorflowPredict: Successfully loaded graph file: `/Users/mishkin/Desktop/gitclones/technotaggr/src/technotaggr/models/feature-extractors/musicnn/msd-musicnn-1/msd-musicnn-1.pb`
I0000 00:00:1764563767.179129 23145599 mlir_graph_optimization_pass.cc:425] MLIR V1 optimization pass is not enabled
[   INFO   ] TensorflowPredict: Successfully loaded graph file: `/Users/mishkin/Desktop/gitclones/technotaggr/src/technotaggr/models/classification-heads/mood_happy/mood_happy-msd-musicnn-1.pb`


In [None]:
print(type(embedding_model))

In [None]:
print(predictions)
print(predictions.shape)

In [None]:
print(audio.shape) # number of samples
print(f"duration is {int(audio.shape[0]) / 16000}")

In [None]:
with open(Path(models_json /"msd-musicnn-1.json").as_posix(), 'r') as json_file:
    metadata = json.load(json_file)

print(metadata['schema'])

In [None]:
import numpy as np
import matplotlib.pyplot as plt
activations = TensorflowPredictMusiCNN(graphFilename=embedded_model_path)(audio)
ig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.matshow(activations.T, aspect='auto')
ax.set_yticks(range(len(metadata['classes'])))
ax.set_yticklabels(metadata['classes'])
ax.set_xlabel('patch number')
ax.xaxis.set_ticks_position('bottom')   
plt.title(f'Tag activations for {names["name_0"]}')
plt.show()


In [None]:
embeddings = TensorflowPredictMusiCNN(graphFilename=embedded_model_path, output='model/dense_1/BiasAdd')(audio)
ig, ax = plt.subplots(1, 1, figsize=(10, 4))
ax.matshow(embeddings.T, aspect='auto')
ax.xaxis.set_ticks_position('bottom')   
ax.set_xlabel('patch number')
plt.title('Embeddings')
plt.show()
print(embeddings.shape)

### Getting Sample Rates

In [None]:
sample_rates = {}
for i, f in enumerate(data.iterdir()):
    if f.suffix in ['.wav', '.aiff', '.aif', '.aifc', '.flac', '.mp3']:
        sample_rates[f"{f.name}"] = librosa.get_samplerate(f.as_posix())
        
display(sample_rates)

In [None]:
# 1 Read essentia paper on muscinn -> get general idea of architecture, intermediate features, and output
# 2 get the standalone musicnn repo -> go through the musicnn notebook tutorial

# Testing Model Outputs

### DEAM Arousal/Valence Classification

In [None]:
audio_path = Path(data / names["name_5"]).as_posix()
embedded_model_path = Path(models / "msd-musicnn-1.pb").as_posix()
classifier_model_path = Path(models / "deam-msd-musicnn-2.pb").as_posix()
audio = MonoLoader(filename= audio_path, sampleRate=16000, resampleQuality=4)()
embedding_model = TensorflowPredictMusiCNN(graphFilename= embedded_model_path, output="model/dense/BiasAdd")
embeddings = embedding_model(audio)
model = TensorflowPredict2D(graphFilename=classifier_model_path, output="model/Identity")
predictions = model(embeddings)
print(predictions)
print(predictions.shape)

- Get BPM of the song
- calculate time for 4 beats
- get 1 bar -> 4x 4 time
-> use this as input to Essentia Model for Patch size 

In [None]:
def getPatchSizeforBar(audio_file):
    pass

### View Patch size for bar
- display the beat locations from global BMP

In [None]:
# load in audio file at desred sample rate
# 

### Assess BPM of song

In [None]:
from essentia.standard import MonoLoader, TempoCNN, RhythmExtractor2013
import numpy as np
import matplotlib.pyplot as plt
fig, axs = plt.subplots(3, figsize=(10, 8))
fig.text(0.5, 0.04, "Time(samples)", ha= 'center')
axs[0].set_title("Audio waveform and the estimated beat positions")
sr = 11025
audio_path = Path(data / names["name_1"]).as_posix()
duration = 5
audio_slice = audio_11khz[:sr*duration]
print(audio_path)
print(audio_11khz)


classifier_model_path_1 = Path(models /'deepsquare-k16-3.pb').as_posix()
audio_11khz = MonoLoader(filename= audio_path, sampleRate=sr, resampleQuality= 4)()
global_bpm_1, local_bpm, local_probs = TempoCNN(graphFilename= classifier_model_path_1)(audio_11khz)
print('song BPM from deepsquare: {}'.format(global_bpm_1))
print(local_bpm)
bps_1 = global_bpm_1/60
spb_1 = sr/bps_1
axs[0].plot(audio_slice)
markers = np.arange(0,len(audio_slice), step=spb_1)
for marker in markers:
    axs[0].axvline(x=marker, color='red', alpha =.5)


classifier_model_path_2 = Path(models /'deeptemp-k16-3.pb').as_posix()
global_bpm_2, local_bpm, local_probs = TempoCNN(graphFilename= classifier_model_path_2)(audio_11khz)
print('song BPM from deeptemp: {}'.format(global_bpm_2))
print(local_bpm)
axs[1].plot(audio_slice)
bps_1 = global_bpm_2/60
spb_2 = sr/bps_1
markers = np.arange(0,len(audio_slice), step=spb_2)
for marker in markers:
    axs[1].axvline(x=marker, color='red', alpha =.5)

bpm, beats, beats_confidence, _, beats_intervals = RhythmExtractor2013(method="multifeature")(audio_11khz)
print("BPM:", bpm)
# print("Beat positions (sec.):", beats)
print("Beat estimation confidence:", beats_confidence)
my_iter = iter(beats)
beat = 0
axs[2].plot(audio_slice)
while beat <= 5:
    axs[2].axvline(x=beat*sr, color='red',alpha =.5)
    beat = next(my_iter)


In [None]:
from pathlib import Path
from collections.abc import Mapping
import mutagen
from mutagen import File
from mutagen.id3 import ID3
from mutagen.mp3 import MP3
from mutagen.aiff import AIFF
from mutagen.flac import FLAC
from mutagen.wave import WAVE
from mutagen.mp4 import MP4

def _parse_number(x):
    if x is None:
        return None
    if isinstance(x, (int, float)):
        return float(x)
    s = str(x).strip()
    low = s.lower()
    for sep in ["bpm=", "=", ":", ";", ","]:
        if sep in low:
            s = low.split(sep)[-1].strip()
            break
    for token in [" bpm", " beats/min", " beats per minute"]:
        if s.lower().endswith(token):
            s = s[:-len(token)].strip()
    for split_on in ["/", " "]:
        if split_on in s:
            s = s.split(split_on)[0].strip()
    try:
        return float(s)
    except ValueError:
        return None

def _get_bpm_from_id3(id3: ID3):
    if not id3:
        return None
    f = id3.get('TBPM')
    if f and getattr(f, 'text', None):
        n = _parse_number(f.text[0])
        if n is not None:
            return n
    for t in id3.getall('TXXX') or []:
        desc = getattr(t, 'desc', '') or ''
        if desc.lower() in ('bpm', 'tempo', 'beats_per_minute', 'beats per minute'):
            vals = getattr(t, 'text', [])
            if vals:
                n = _parse_number(vals[0])
                if n is not None:
                    return n
    return None

def _get_bpm_from_vorbis(tags):
    if not tags:
        return None
    preferred_keys = [
        "bpm", "tempo", "beats_per_minute", "beats per minute",
        "initial bpm", "bpm (beats per minute)"
    ]
    for want in preferred_keys:
        for k in tags.keys():
            if k.lower() == want:
                vals = tags.get(k) or []
                if not isinstance(vals, list):
                    vals = [vals]
                for v in vals:
                    n = _parse_number(v)
                    if n is not None:
                        return n
    for k in tags.keys():
        kl = k.lower()
        if "bpm" in kl or kl == "tempo":
            vals = tags.get(k) or []
            if not isinstance(vals, list):
                vals = [vals]
            for v in vals:
                n = _parse_number(v)
                if n is not None:
                    return n
    return None

def _as_mapping(obj):
    """Return obj if it's mapping-like (dict-ish), else None."""
    if isinstance(obj, Mapping):
        return obj
    if hasattr(obj, "keys") and hasattr(obj, "get") and hasattr(obj, "__contains__"):
        return obj
    return None

def _get_bpm_from_riff_info(tags):
    """Handle RIFF INFO tags (e.g., IBPM). Accepts mapping-like only."""
    tagmap = _as_mapping(tags)
    if not tagmap:
        return None
    for key in ('IBPM', 'BPM', 'BPM '):
        if key in tagmap:
            v = tagmap.get(key)
            if isinstance(v, list) and v:
                v = v[0]
            n = _parse_number(v)
            if n is not None:
                return n
    return None

def getBPM(audio_file):
    audio = File(audio_file)
    if audio is None:
        return None

    # MP4 (M4A): 'tmpo' atom (list of ints)
    if isinstance(audio, MP4):
        vals = audio.tags.get('tmpo') if audio.tags else None
        if vals:
            return float(vals[0])

    # MP3 and AIFF (often ID3)
    if isinstance(audio, (MP3, AIFF)):
        return _get_bpm_from_id3(audio.tags if hasattr(audio, 'tags') else None)

    # WAV: could have ID3 or RIFF INFO
    if isinstance(audio, WAVE):
        # 1) ID3-in-WAV
        if isinstance(audio.tags, ID3):
            n = _get_bpm_from_id3(audio.tags)
            if n is not None:
                return n
        # 2) RIFF INFO (mapping-like only). Do NOT pass audio.info (WaveStreamInfo)
        n = _get_bpm_from_riff_info(getattr(audio, 'tags', None))
        if n is not None:
            return n
        return None

    # FLAC: Vorbis comments
    if isinstance(audio, FLAC):
        return _get_bpm_from_vorbis(audio.tags)

    # Other containers (e.g., OGG/Opus) – try Vorbis-like handling
    tags = getattr(audio, 'tags', None)
    n = _get_bpm_from_vorbis(_as_mapping(tags) or tags)
    if n is not None:
        return n

    return None


In [None]:
# make dataframe of all predictions, and save to resultsd
import mutagen
import math
results = {}
sr =11025
results['TempoCNN-deepsquare'] = []
results['Tempo-CNN-deeptempt'] = []
results['RhythmExtractor2013'] = []
results['Original'] = []
classifier_model_path_1 = Path(models /'deepsquare-k16-3.pb').as_posix()
classifier_model_path_2 = Path(models /'deeptemp-k16-3.pb').as_posix()

                        
                 
for pos in range(len(names)):
    if pos ==3:
        continue
    audio_path = Path(data / names[f"name_{pos}"]).as_posix()
    audio_11khz = MonoLoader(filename= audio_path, sampleRate=sr, resampleQuality= 4)()
    global_bpm_1, _,_ = TempoCNN(graphFilename= classifier_model_path_1)(audio_11khz)
    global_bpm_2, _ ,_ = TempoCNN(graphFilename= classifier_model_path_2)(audio_11khz) 
    bpm,_,_,_,_ = RhythmExtractor2013(method="multifeature")(audio_11khz)
    results['TempoCNN-deepsquare'].append(global_bpm_1)
    results['Tempo-CNN-deeptempt'].append(global_bpm_2)
    results['RhythmExtractor2013'].append(math.ceil(bpm))
    results['Original'].append(getBPM(audio_path))

    

results = pd.DataFrame(results)
display(results)


In [None]:
results = pd.DataFrame(results)
display(results)

In [None]:
print(type(embedding_model))