### Get Audio from Spotify ID:
Below is an implementation that allows you to get an audio track as a numpy array from its spotify track ID.
To clean up the project and prevent large storage size, uncomment the try block which cleans up the 'temp_audio.wav' file

In [None]:
import os
from dotenv import load_dotenv
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import yt_dlp
import librosa
import numpy as np
import logging
import matplotlib.pyplot as plt
from IPython.display import Audio, display
import soundfile as sf

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

load_dotenv()

def get_audio_array(track_id, duration=30, sr=44100, start_time=0):
    """
    Get audio as numpy array from Spotify track ID using YouTube, starting from a specific time.
    
    Parameters:
        track_id (str): Spotify track ID
        duration (float): Duration of audio to extract in seconds (default: 30)
        sr (int): Sample rate (default: 44100)
        start_time (float): Start time in seconds (default: 0)
    
    Returns:
        numpy.ndarray: Audio array, or None if extraction fails
    """
    # Initialize Spotify client
    try:
        auth_manager = SpotifyClientCredentials(
            client_id=os.getenv("SPOTIFY_CLIENT_ID"),
            client_secret=os.getenv("SPOTIFY_CLIENT_SECRET")
        )
        sp = spotipy.Spotify(auth_manager=auth_manager)
        
        # Get track info from Spotify
        track = sp.track(track_id)
        track_name = track['name']
        artist_name = track['artists'][0]['name']
        logger.info(f"Fetching: {track_name} by {artist_name}")
    except Exception as e:
        logger.error(f"Error fetching Spotify track {track_id}: {e}")
        return None
    
    # YouTube download options
    ydl_opts = {
        'format': 'bestaudio/best',
        'outtmpl': 'temp_audio.%(ext)s',
        'quiet': False,  # Set to False for debugging
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
        }],
    }
    
    # Download audio from YouTube
    search_query = f"{track_name} {artist_name} official audio"
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(f"ytsearch1:{search_query}", download=True)
            if not info or 'entries' not in info or not info['entries']:
                logger.error(f"No valid YouTube video found for query: {search_query}")
                return None
        
        # Verify file exists
        if not os.path.exists('temp_audio.wav'):
            logger.error("Download succeeded but temp_audio.wav was not created")
            return None
        
        # Load audio with offset and duration
        audio, original_sr = librosa.load(
            'temp_audio.wav', 
            sr=sr, 
            mono=True, 
            offset=start_time, 
            duration=duration
        )
        
        # Ensure exact duration by padding if necessary
        target_samples = duration * sr
        if len(audio) < target_samples:
            audio = np.pad(audio, (0, target_samples - len(audio)))
        elif len(audio) > target_samples:
            audio = audio[:target_samples]  # Shouldn't happen with duration param, but included for safety
            
        logger.info(f"Audio loaded: {len(audio)/sr:.2f}s, {sr}Hz, shape: {audio.shape}, start_time: {start_time}s")
        return audio
    
    except Exception as e:
        logger.error(f"Error downloading or processing audio for {track_id}: {e}")
        try:
            os.remove('temp_audio.wav')
        except:
            pass
        return None

def get_track_name(track_id):
    """Helper function to get track name for filename"""
    try:
        auth_manager = SpotifyClientCredentials(
            client_id=os.getenv("SPOTIFY_CLIENT_ID"),
            client_secret=os.getenv("SPOTIFY_CLIENT_SECRET")
        )
        sp = spotipy.Spotify(auth_manager=auth_manager)
        track = sp.track(track_id)
        track_name = track['name'].replace(' ', '_').replace('/', '_')  # Make filename safe
        artist_name = track['artists'][0]['name'].replace(' ', '_').replace('/', '_')
        return f"{artist_name}_{track_name}"
    except:
        return "audio"

# Example usage
track_id = "1zV72LFybhhjlhGwXPDZc4"  # Example Spotify track ID
track_name = get_track_name(track_id)
audio = get_audio_array(track_id, duration=30, sr=44100, start_time=20)


### DSP Pipeline

##### 1. Convert to Mono
The first step in the signal processing pipeline is to convert the audio to mono, which librosa does when we load the audio file into an array.

##### 2. Harmonic and Percussive Source Separation (HPSS)
Splitting the audio signal into its harmonic and percussive components allows for cleaner signals to be analyzed. For key estimation, only the harmonic content is used which prevents the percussive frequencies that do not correspond to the key of the audio from effecting the estimation. Similarly, for tempo estimation, only the percussive signal is analyzed as teh percussion tends to show clearer rhythmic patterns which the model will more easily be able to detect and ascribe a BPM label to.

In [None]:
def perform_dsp(audio, sr=44100):
    """
    Perform DSP analysis with proper time scaling and layout.
    Returns harmonic and percussive audio arrays.
    """
    # Verify audio properties
    duration = len(audio) / sr
    print(f"Processing audio: {duration:.2f}s, {len(audio)} samples")
    
    # Compute STFT with appropriate parameters
    n_fft = 2048
    hop_length = 512
    
    stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length)
    
    # Harmonic-percussive separation with tuned parameters
    harmonic, percussive = librosa.decompose.hpss(stft, kernel_size=(13, 31))
    
    # Reconstruct time-domain signals
    harmonic_audio = librosa.istft(harmonic, hop_length=hop_length, length=len(audio))
    percussive_audio = librosa.istft(percussive, hop_length=hop_length, length=len(audio))
    
    # Normalize for consistency (prevent clipping)
    harmonic_audio = harmonic_audio / np.max(np.abs(harmonic_audio)) if np.max(np.abs(harmonic_audio)) != 0 else harmonic_audio
    percussive_audio = percussive_audio / np.max(np.abs(percussive_audio)) if np.max(np.abs(percussive_audio)) != 0 else percussive_audio
    
    # Plot (existing code)
    fig = plt.figure(figsize=(12, 10))
    
    gs = plt.GridSpec(4, 2, figure=fig, height_ratios=[1, 1, 1, 0.1], hspace=0.3)
    
    ax1 = fig.add_subplot(gs[0, :])
    img1 = librosa.display.specshow(
        librosa.amplitude_to_db(np.abs(stft), ref=np.max),
        y_axis='log', x_axis='time', sr=sr, hop_length=hop_length, ax=ax1
    )
    ax1.set(title=f'Full Spectrogram ({duration:.1f}s)')
    ax1.label_outer()
    
    # Harmonic spectrogram
    ax2 = fig.add_subplot(gs[1, :])
    img2 = librosa.display.specshow(
        librosa.amplitude_to_db(np.abs(harmonic), ref=np.max(np.abs(stft))),
        y_axis='log', x_axis='time', sr=sr, hop_length=hop_length, ax=ax2
    )
    ax2.set(title='Harmonic Component')
    ax2.label_outer()
    
    # Percussive spectrogram
    ax3 = fig.add_subplot(gs[2, :])
    img3 = librosa.display.specshow(
        librosa.amplitude_to_db(np.abs(percussive), ref=np.max(np.abs(stft))),
        y_axis='log', x_axis='time', sr=sr, hop_length=hop_length, ax=ax3
    )
    ax3.set(title='Percussive Component')
    ax3.set_xlabel('Time (seconds)')
    
    cax = fig.add_subplot(gs[3, :])
    fig.colorbar(img1, cax=cax, orientation='horizontal', format='%+2.0f dB')
    cax.set_xlabel('Power (dB)')
    
    plt.subplots_adjust(top=0.95, bottom=0.08, left=0.08, right=0.92, hspace=0.4)
    
    plt.show()
    
    print(f"STFT shape: {stft.shape}")
    print(f"Frequency bins: {stft.shape[0]}, Time frames: {stft.shape[1]}")
    
    # Return the reconstructed audio
    return harmonic_audio, percussive_audio

def simple_separation_playback(audio, sr=44100, track_name="audio"):
    """
    Simple version for quick harmonic/percussive separation and playback
    Saves files to project directory instead of embedding in notebook
    """
    # STFT parameters
    n_fft = 4096
    hop_length = n_fft // 4
    
    # Compute STFT and separate
    stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length)
    harmonic_stft, percussive_stft = librosa.decompose.hpss(stft)
    
    # Reconstruct audio
    harmonic_audio = librosa.istft(harmonic_stft, hop_length=hop_length, length=len(audio))
    percussive_audio = librosa.istft(percussive_stft, hop_length=hop_length, length=len(audio))
    
    # Normalize for playback
    harmonic_audio = harmonic_audio / np.max(np.abs(harmonic_audio))
    percussive_audio = percussive_audio / np.max(np.abs(percussive_audio))
    
    # Save to files in project directory
    harmonic_filename = f"harmonic_{track_name}.wav"
    percussive_filename = f"percussive_{track_name}.wav"
    
    sf.write(harmonic_filename, harmonic_audio, sr)
    sf.write(percussive_filename, percussive_audio, sr)
    
    print(f"Harmonic Component (melody) saved as: {harmonic_filename}")
    print(f"Percussive Component (drums) saved as: {percussive_filename}")
    
    # Still provide playback in notebook
    print("\nPlayback in notebook:")
    print("Harmonic Component (melody):")
    display(Audio(harmonic_audio, rate=sr))
    
    print("Percussive Component (drums):")
    display(Audio(percussive_audio, rate=sr))
    
    return harmonic_audio, percussive_audio

if audio is not None:
    harmonic_audio, percussive_audio = perform_dsp(audio, sr=44100)
    harmonic, percussive = simple_separation_playback(audio, track_name=track_name) # comment this line to improve performance - this is just to check the hpss output
        

else:
    print("Failed to get audio")

##### 3. Constant Q-Transform Chroma Spectrogram

In [None]:
def chroma_cqt_spec(audio, sr=44100):
    """
    Perform DSP analysis with proper time scaling and layout
    """
    
    chroma_cqt = librosa.feature.chroma_cqt(y=audio, sr=sr)
    
    plt.figure(figsize=(10, 4))
    librosa.display.specshow(chroma_cqt, x_axis='time', y_axis='chroma', cmap='coolwarm')
    plt.colorbar(label='Intensity')
    plt.title('Constant-Q Chroma Spectrogram')
    plt.tight_layout()
    plt.show()

if harmonic_audio is not None:
    chroma_cqt_spec(harmonic_audio, sr=44100)

##### 4. Pitch-Shifting Augmentation
It will undoubtedly be the case that the dataset will not have equal representation of all keys, to mitigate this, we can artificially create more datapoints in less common keys by pitch shifting songs, while ensuring that the tempo information remains constant. **it will have to be the case that the whole usable dataset is parsed to run analystics of which keys are more prevelant so that more datapoints can be generated for underrepresnted keys (to be implemented)**. The following implementation outlines the pitch shifting technique

In [None]:
harmonic_audio_shifted = librosa.effects.pitch_shift(y=harmonic_audio, sr=44100, n_steps=6)
chroma_cqt_spec(harmonic_audio_shifted, sr=44100)

In [None]:
def extract_features(audio, sr=44100, n_fft=4096, hop_length=512):
    """
    Extracts pitch and key-relevant DSP features from an audio signal.
    Designed to be used on the harmonic component (after HPSS).
    Returns a dictionary of feature arrays for supervised ML.
    Parameters
    ----------
    audio : np.ndarray
        Mono audio signal (preferably harmonic component)
    sr : int
        Sample rate
    n_fft : int
        FFT size for analysis
    hop_length : int
        Hop length for STFT-based operations
    Returns
    -------
    features : dict
        {
        "chroma_mean": (12,),
        "chroma_std": (12,),
        "key_template_corr": (24,),
        "pitch_hist": (12,)
        }
    """
    features = {}
    # --- Tuning estimation ---
    tuning = librosa.estimate_tuning(y=audio, sr=sr, n_fft=n_fft)

    # Compute STFT once and keep phase
    stft = librosa.stft(audio, n_fft=n_fft, hop_length=hop_length)
    S, phase = np.abs(stft), np.exp(1j * np.angle(stft))
    
    # HPSS decomposition
    H, _ = librosa.decompose.hpss(S)

    # Reconstruct harmonic audio with original phase
    harmonic = librosa.istft(H * phase, hop_length=hop_length, length=len(audio))

    # --- Chroma CQT (tuning-corrected & normalized) ---
    chroma = librosa.feature.chroma_cqt(y=harmonic, sr=sr, tuning=tuning, hop_length=hop_length)
    chroma = librosa.util.normalize(chroma, norm=1, axis=0)

    # --- Beat-synchronous chroma aggregation ---
    onset_env = librosa.onset.onset_strength(y=audio, sr=sr, hop_length=hop_length)
    _, beats = librosa.beat.beat_track(onset_envelope=onset_env, sr=sr, hop_length=hop_length)
    if len(beats) > 0:
        beat_chroma = librosa.util.sync(chroma, beats, aggregate=np.median)
    else:
        beat_chroma = chroma  # fallback if beat tracking fails
    features["chroma_mean"] = np.mean(beat_chroma, axis=1)
    features["chroma_std"] = np.std(beat_chroma, axis=1)
    # --- Key-template correlation (Krumhansl & Kessler profiles) ---
    krum_major = np.array([6.35, 2.23, 3.48, 2.33, 4.38, 4.09,
                           2.52, 5.19, 2.39, 3.66, 2.29, 2.88])
    krum_minor = np.array([6.33, 2.68, 3.52, 5.38, 2.60, 3.53,
                           2.54, 4.75, 3.98, 2.69, 3.34, 3.17])
    krum_major /= np.linalg.norm(krum_major)
    krum_minor /= np.linalg.norm(krum_minor)
    corrs = []
    for k in range(12):
        corrs.append(np.dot(features["chroma_mean"], np.roll(krum_major, k)))
    for k in range(12):
        corrs.append(np.dot(features["chroma_mean"], np.roll(krum_minor, k)))
    features["key_template_corr"] = np.array(corrs)
    # --- Predominant pitch histogram ---
    f0, voiced_flag, voiced_probs = librosa.pyin(
        harmonic,
        fmin=librosa.note_to_hz("C2"),
        fmax=librosa.note_to_hz("C7"),
        sr=sr
    )
    if f0 is not None:
        semitones = (12 * np.log2(f0 / 440.0) + 69) % 12
        semitones = semitones[~np.isnan(semitones)]
        hist, _ = np.histogram(semitones, bins=np.arange(13))
        hist = hist / (np.sum(hist) + 1e-12)
        features["pitch_hist"] = hist
    else:
        features["pitch_hist"] = np.zeros(12)
    
    # --- Plotting ---
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # Plot 1: Pitch class distribution
    note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
    axes[0].bar(note_names, features["pitch_hist"])
    axes[0].set_xlabel("Pitch Class")
    axes[0].set_ylabel("Probability")
    axes[0].set_title("Pitch Class Distribution")
    axes[0].grid(axis='y', alpha=0.3)
    
    # Plot 2: Key correlation (Major vs Minor)
    major_corrs = features["key_template_corr"][:12]
    minor_corrs = features["key_template_corr"][12:]
    
    key_labels = [f"{note}\nMaj" for note in note_names] + [f"{note}\nMin" for note in note_names]
    colors = ['#3498db'] * 12 + ['#e74c3c'] * 12  # Blue for major, red for minor
    
    axes[1].bar(range(24), features["key_template_corr"], color=colors, alpha=0.7)
    axes[1].set_xlabel("Key")
    axes[1].set_ylabel("Correlation")
    axes[1].set_title("Krumhansl-Kessler Key Profile Correlation")
    axes[1].set_xticks(range(24))
    axes[1].set_xticklabels(key_labels, rotation=45, ha='right', fontsize=8)
    axes[1].grid(axis='y', alpha=0.3)
    axes[1].axhline(y=0, color='k', linestyle='-', linewidth=0.5)
    
    # Add legend
    from matplotlib.patches import Patch
    legend_elements = [Patch(facecolor='#3498db', alpha=0.7, label='Major'),
                      Patch(facecolor='#e74c3c', alpha=0.7, label='Minor')]
    axes[1].legend(handles=legend_elements, loc='upper right')
    
    # Find and annotate the most likely key
    best_idx = np.argmax(features["key_template_corr"])
    best_key = key_labels[best_idx].replace('\n', ' ')
    axes[1].annotate(f'Most likely: {best_key}', 
                     xy=(best_idx, features["key_template_corr"][best_idx]),
                     xytext=(10, 10), textcoords='offset points',
                     bbox=dict(boxstyle='round,pad=0.5', fc='yellow', alpha=0.7),
                     arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0'))
    
    plt.tight_layout()
    plt.show()
    
    return features

features = extract_features(harmonic_audio, sr=44100)
for k, v in features.items():
    print(f"{k}: {np.shape(v)}")