In [12]:
'''
Uncomment and run this cell once at the start of each session.
'''

!pip install openl3 pydub
!apt-get install -y ffmpeg


Collecting openl3
  Downloading openl3-0.4.2.tar.gz (29 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pydub
  Downloading pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting spotipy
  Downloading spotipy-2.25.1-py3-none-any.whl.metadata (5.1 kB)
Collecting kapre>=0.3.5 (from openl3)
  Downloading kapre-0.3.7.tar.gz (26 kB)
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting resampy<0.3.0,>=0.2.1 (from openl3)
  Downloading resampy-0.2.2.tar.gz (323 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m323.4/323.4 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting redis>=3.5.3 (from spotipy)
  Downloading redis-5.2.1-py3-none-any.whl.metadata (9.1 kB)
Downloading pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Downloading spotipy-2.25.1-py3-none-any.whl 

In [53]:
import openl3
import librosa
import requests
import numpy as np
import os
import re
import math
from tqdm import tqdm
from datetime import datetime, timedelta
from collections import defaultdict
from pydub import AudioSegment
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.decomposition import PCA
from sklearn.preprocessing import normalize
from google.colab import drive, userdata
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [18]:
def process_seed_track(mp3_path, wav_path, emb_path,
                       input_repr="mel128", content_type="music", embedding_size=512):
  """
  Converts MP3 to WAV and computes OpenL3 embedding. Saves WAV and .npy embedding to specified paths.
  """

  audio = AudioSegment.from_file(mp3_path, format="mp3")
  audio = audio.set_frame_rate(48000).set_channels(1)
  audio.export(wav_path, format="wav")
  print(f"Converted to WAV: {os.path.basename(mp3_path)}")

  audio_data, sr = librosa.load(wav_path, sr=None, mono=True)
  emb, _ = openl3.get_audio_embedding(audio_data, sr,
                                      input_repr=input_repr,
                                      content_type=content_type,
                                      embedding_size=embedding_size)
  emb_mean = np.mean(emb, axis=0)

  np.save(emb_path, emb_mean)
  print(f"Saved embedding: {os.path.basename(emb_path)}")


In [6]:
# define directory paths
base_dir = "/content/drive/My Drive/2024-25/openl3_music/"
mp3_dir = os.path.join(base_dir, "mp3_seeds")
wav_dir = os.path.join(base_dir, "wav_seeds")
emb_seed_dir = os.path.join(base_dir, "seed_embeddings")

In [19]:
# process all MP3s in the seeds folder. only need to run this code once per seed
for filename in os.listdir(mp3_dir):
  if filename.endswith(".mp3"):
    song_name = os.path.splitext(filename)[0]
    mp3_path = os.path.join(mp3_dir, filename)
    wav_path = os.path.join(wav_dir, song_name + ".wav")
    emb_path = os.path.join(emb_seed_dir, song_name + "_embedding.npy")

    process_seed_track(mp3_path, wav_path, emb_path)


Converted to WAV: irwtsayh.mp3
[1m82/82[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 142ms/step
Saved embedding: irwtsayh_embedding.npy
Converted to WAV: amoeba.mp3
[1m72/72[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 145ms/step
Saved embedding: amoeba_embedding.npy
Converted to WAV: projectdreams.mp3
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 147ms/step
Saved embedding: projectdreams_embedding.npy
Converted to WAV: undercovermartyn.mp3
[1m53/53[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 91ms/step
Saved embedding: undercovermartyn_embedding.npy


# Deezer API testing

In [70]:
def safe_filename(name):
  return re.sub(r'[\\/:"*?<>|]+', '_', name)

def get_deezer_new_albums(limit=20, country_id=0, days_back=5):
  """
  Pulls new releases from Deezer editorial. Default is Global (ID 0).
  Country IDs are Deezer-specific (e.g., 2 = US, 6 = FR, etc.)
  """
  recent_date_cutoff = datetime.now() - timedelta(days=days_back)

  url = f"https://api.deezer.com/editorial/{country_id}/releases"
  r = requests.get(url)
  if r.status_code != 200:
    raise Exception("Failed to fetch releases.")
  data = r.json()['data']

  albums = [{
    'title': album['title'],
    'artist': album['artist']['name'],
    'id': album['id']
  } for album in data[:limit] if datetime.strptime(album['release_date'], '%Y-%m-%d') > recent_date_cutoff]

  print(f"Loaded {len(albums)} albums from Deezer, released in the past {days_back} days.")
  return albums

def get_deezer_tracks_from_album(album_id):
  url = f"https://api.deezer.com/album/{album_id}"
  r = requests.get(url)
  if r.status_code != 200:
    raise Exception("Failed to fetch tracks.")
  data = r.json()
  tracks = data.get('tracks', {}).get('data', [])

  return [{
    'title': safe_filename(track['title']),
    'artist': track['artist']['name'],
    'id': track['id'],
    'preview_url': track['preview']
  } for track in tracks if track.get('preview')]

def compile_previewable_tracks(album_list):
  all_tracks = []
  for album in album_list:
    tracks = get_deezer_tracks_from_album(album['id'])
    all_tracks.extend(tracks)
  print(f"Compiled {len(all_tracks)} previewable tracks from {len(album_list)} albums.")
  return all_tracks

def download_and_convert_preview(track, mp3_dir, wav_dir):
  """
  Downloads preview track from Deezer and converts to WAV.
  """
  mp3_path = os.path.join(mp3_dir, f"{track['artist']} - {track['title']}.mp3")
  wav_path = os.path.join(wav_dir, f"{track['artist']} - {track['title']}.wav")

  r = requests.get(track['preview_url'])
  if r.status_code != 200:
    raise Exception("Failed to download preview.")

  with open(mp3_path, 'wb') as f:
    f.write(r.content)

  audio = AudioSegment.from_file(mp3_path, format="mp3")
  audio = audio.set_frame_rate(48000).set_channels(1)
  audio.export(wav_path, format="wav")
  return wav_path

def compute_openl3_embedding(wav_path, emb_path, model):
  audio, sr = librosa.load(wav_path, sr=None, mono=True)
  emb, _ = openl3.get_audio_embedding(audio, sr,
                                      content_type="music",
                                      embedding_size=512,
                                      model=model,
                                      verbose=0)
  emb_mean = np.mean(emb, axis=0)
  emb_path = os.path.join(emb_path, os.path.basename(wav_path).replace(".wav", "_embedding.npy"))
  np.save(emb_path, emb_mean)
  return emb_mean

def find_similar_tracks(candidate_tracks, candidate_embeddings, seed_embeddings, seed_names, top_k):
  seed_matrix = np.stack(seed_embeddings)

  pca = PCA(n_components=50)
  reduced_embs = normalize(pca.fit_transform(candidate_embeddings))
  reduced_seeds = normalize(pca.transform(seed_matrix))
  similarities = cosine_similarity(reduced_embs, reduced_seeds)

  max_scores = np.max(similarities, axis=1)
  best_seed_idx = np.argmax(similarities, axis=1)

  all_candidates = []
  for i, track in enumerate(candidate_tracks):
    artist, title = track.split(" - ", maxsplit=1)
    all_candidates.append({
      'artist': artist,
      'track': title,
      'similarity': float(max_scores[i]),
      'closest_seed': seed_names[best_seed_idx[i]]
    })

  all_candidates.sort(key=lambda x: x['similarity'], reverse=True)

  # select best matches per seed
  max_per_artist = math.floor(0.2 * top_k)
  per_seed_quota = math.ceil(top_k / len(seed_names))
  selected_tracks = []
  artist_counts = defaultdict(int)
  used_indices = set()

  for j, seed in enumerate(seed_names):
    seed_matches = [(i, similarities[i][j]) for i in range(len(all_candidates))]
    seed_matches.sort(key=lambda x: -x[1])

    count = 0
    for idx, sim in seed_matches:
      if idx in used_indices:
        continue
      artist = all_candidates[idx]['artist']
      if artist_counts[artist] < max_per_artist:
        selected_tracks.append(all_candidates[idx])
        artist_counts[artist] += 1
        used_indices.add(idx)
        count += 1
      if count >= per_seed_quota:
        break

  # fill remaining up to top_k with remaining best matches
  if len(selected_tracks) < top_k:
    for i in range(len(all_candidates)):
      if i in used_indices:
        continue
      artist = all_candidates[i]['artist']
      if artist_counts[artist] < max_per_artist:
        selected_tracks.append(all_candidates[i])
        artist_counts[artist] += 1
        used_indices.add(i)
      if len(selected_tracks) >= top_k:
        break

  selected_tracks.sort(key=lambda x: -x['similarity'])
  return selected_tracks[:top_k]


In [73]:
def run_full_deezer_pipeline(album_list, mp3_dir, wav_dir, track_dir):
  all_tracks = compile_previewable_tracks(album_list)
  candidate_embeddings = []
  valid_tracks = []

  model = openl3.models.load_audio_embedding_model(input_repr="mel128",
                                                   content_type="music",
                                                   embedding_size=512)

  for track in tqdm(all_tracks, desc='Computing embeddings'):
    wav_path = download_and_convert_preview(track, mp3_dir, wav_dir)
    if not wav_path:
      continue
    emb = compute_openl3_embedding(wav_path, track_dir, model)
    candidate_embeddings.append(emb)
    valid_tracks.append(track)

  if not candidate_embeddings:
    print("No valid previews were processed.")
    return []


In [41]:
def load_embeddings_from_dir(embedding_dir):
  embeddings = []
  names = []

  for fname in os.listdir(embedding_dir):
    if fname.endswith(".npy"):
      path = os.path.join(embedding_dir, fname)
      emb = np.load(path)
      embeddings.append(emb)
      name = fname.replace("_embedding.npy", "")
      names.append(name)

  print(f"Loaded {len(embeddings)} embeddings.")
  return embeddings, names


In [23]:
mp3_track_dir = os.path.join(base_dir, "mp3_tracks")
wav_track_dir = os.path.join(base_dir, "wav_tracks")
emb_track_dir = os.path.join(base_dir, "track_embeddings")
seed_embs, seed_names = load_embeddings_from_dir(emb_seed_dir)
print(seed_names)
albums = get_deezer_new_albums(days_back = 7)

Loaded 4 seed embeddings.
['irwtsayh', 'amoeba', 'projectdreams', 'undercovermartyn']
Loaded 15 albums from Deezer, released in the past 7 days.


In [74]:
# Only need to run once per session. Saves embeddings of candidate tracks in a new directory.
run_full_deezer_pipeline(albums, mp3_track_dir, wav_track_dir, emb_track_dir)

Compiled 233 previewable tracks from 15 albums.


Computing embeddings: 100%|██████████| 233/233 [06:46<00:00,  1.75s/it]


In [75]:
track_embs, track_names = load_embeddings_from_dir(emb_track_dir)
st = find_similar_tracks(track_names, track_embs, seed_embs, seed_names, top_k = 10)

Loaded 233 embeddings.


In [78]:
st

[{'artist': 'Allegaeon',
  'track': 'Chaos Theory',
  'similarity': 0.7331178619874356,
  'closest_seed': 'irwtsayh'},
 {'artist': 'Black Sherif',
  'track': 'One',
  'similarity': 0.7238382769840812,
  'closest_seed': 'projectdreams'},
 {'artist': 'KeBlack',
  'track': 'Avec',
  'similarity': 0.6361437303765004,
  'closest_seed': 'projectdreams'},
 {'artist': 'Didi B',
  'track': 'GAWAYA',
  'similarity': 0.5975490319992783,
  'closest_seed': 'projectdreams'},
 {'artist': 'OBOY',
  'track': 'Maybach',
  'similarity': 0.5770761870397767,
  'closest_seed': 'projectdreams'},
 {'artist': 'Black Sherif',
  'track': 'Eye Open',
  'similarity': 0.5746119497239018,
  'closest_seed': 'projectdreams'},
 {'artist': 'Djo',
  'track': 'Golden Line',
  'similarity': 0.5662374132360284,
  'closest_seed': 'amoeba'},
 {'artist': 'Allegaeon',
  'track': 'Imperial',
  'similarity': 0.5647617587147457,
  'closest_seed': 'irwtsayh'},
 {'artist': 'KeBlack',
  'track': 'Boulot',
  'similarity': 0.2608617244

# Cleanup

In [16]:
def clear_directory_contents(dirs, extensions=(".mp3", ".wav", ".npy")):
  for dir_path in dirs:
    if os.path.exists(dir_path):
      deleted = 0
      for fname in os.listdir(dir_path):
        if fname.endswith(extensions):
          os.remove(os.path.join(dir_path, fname))
          deleted += 1
      print(f"Deleted {deleted} files from {dir_path}")
    else:
      print(f"Directory not found: {dir_path}")

In [37]:
directories = [mp3_track_dir, wav_track_dir, emb_track_dir]
clear_directory_contents(directories)

Deleted 233 files from /content/drive/My Drive/2024-25/openl3_music/mp3_tracks
Deleted 233 files from /content/drive/My Drive/2024-25/openl3_music/wav_tracks
Deleted 1 files from /content/drive/My Drive/2024-25/openl3_music/track_embeddings
