In [None]:
import os
import numpy as np
import librosa
import ffmpeg
from pydub import AudioSegment
from scipy.signal import find_peaks
from tempfile import TemporaryDirectory
import pyogg
import soundfile
import subprocess

In [None]:
def load_audio(file_path):
    return AudioSegment.from_file(file_path, format="flac")

def compute_spectral_centroids(audio, sr=22050, frame_length=2048, hop_length=512):
    samples = np.array(audio.get_array_of_samples()).astype(np.float32)
    y = librosa.util.normalize(samples)
    centroids = librosa.feature.spectral_centroid(y=y, sr=sr, hop_length=hop_length)[0]
    return centroids

def segment_audio(centroids, threshold=1.5):
    # Compute difference of spectral centroid across frames
    diff = np.abs(np.diff(centroids))
    peaks, _ = find_peaks(diff, height=threshold * np.std(diff))
    return peaks

def assign_bitrates(centroids, peaks, low=64000, mid=96000, high=128000):
    bitrates = []
    split_centroids = np.split(centroids, peaks)
    for seg in split_centroids:
        avg = np.mean(seg)
        if avg < 2000:
            bitrates.append(low)
        elif avg < 4000:
            bitrates.append(mid)
        else:
            bitrates.append(high)
    return bitrates

def split_audio(audio, peaks, frame_duration_ms):
    segments = []
    start = 0
    for peak in peaks:
        end = peak * frame_duration_ms
        segments.append(audio[start:end])
        start = end
    segments.append(audio[start:])  # final segment
    return segments

def encode_segment_to_aac(segment, bitrate, path):
    segment.export(path, format="wav")
    aac_path = path.replace(".wav", ".m4a")
    ffmpeg.input(path).output(aac_path, audio_bitrate=f"{bitrate}k", acodec="aac").run(overwrite_output=True, quiet=True)
    return aac_path

def concatenate_aac_segments(paths, output_path):
    txt_list_path = os.path.join(os.path.dirname(paths[0]), "concat_list.txt")
    with open(txt_list_path, "w") as f:
        for p in paths:
            f.write(f"file '{p}'\n")
    ffmpeg.input(txt_list_path, format='concat', safe=0).output(output_path, acodec='copy').run(overwrite_output=True)

def vbr_compress(input_flac, output_path):
    ffmpeg.input(input_flac).output(output_path, acodec='aac', audio_bitrate='0', compression_level='5').run(overwrite_output=True)

In [None]:
def adaptive_encode(input_flac, output_adaptive, output_vbr):
    with TemporaryDirectory() as tempdir:
        print("[*] Loading audio...")
        audio = load_audio(input_flac)
        centroids = compute_spectral_centroids(audio)
        peaks = segment_audio(centroids)
        frame_duration_ms = len(audio) / len(centroids)
        segments = split_audio(audio, peaks, frame_duration_ms)
        bitrates = assign_bitrates(centroids, peaks)

        print("[*] Encoding segments...")
        segment_paths = []
        for i, (segment, br) in enumerate(zip(segments, bitrates)):
            temp_wav = os.path.join(tempdir, f"seg_{i}.wav")
            aac_path = encode_segment_to_aac(segment, br, temp_wav)
            segment_paths.append(aac_path)

        print("[*] Concatenating segments...")
        concatenate_aac_segments(segment_paths, output_adaptive)

        print("[*] Encoding baseline VBR...")
        vbr_compress(input_flac, output_vbr)

        print("[✓] Done.")
        

In [None]:
# audio = load_audio("HIT THE FLOOR.flac")
raw, sr = librosa.load("HIT THE FLOOR.flac", sr=None)

# May 7th

## Approach

1. Try splitting one song in two and then stitch and listen through
2. Try using genre data to create an ideal predictor
3. Envision an ideal predictor, and what it would contribute to the algorithm?
4. Compute waveform metrics for the []
5. 

In [None]:
# Step 1
half = len(audio) // 2
audio1 = audio[:half]
audio2 = audio[half:]

In [None]:
audio1.export("first_half.aac", format="adts")
audio2.export("second_half.aac", format="adts")

In [None]:
audio.export("full_audio.aac", format="adts")

In [None]:
import subprocess

# Create the concat list file
with open("aac_list.txt", "w") as f:
    f.write("file 'first_half.aac'\n")
    f.write("file 'second_half.aac'\n")

# Run ffmpeg command
subprocess.run([
    "ffmpeg", "-f", "concat", "-safe", "0",
    "-i", "aac_list.txt", "-c", "copy", "stitched_output.aac"
])

In [None]:
import numpy as np
import librosa
import zlib


def load_compress_audio(
    input_path: str,
    output_path: str,
    sr_target: int = 16000,
    n_fft: int = 1024,
    hop_length: int = 512,
    keep_bins: int = 100
):
    """
    Compress an audio file into a proprietary format that mimics AAC compression.

    Parameters:
        input_path (str): Path to the input audio file (e.g., .flac, .wav).
        output_path (str): Path to save the compressed binary file (.mycmp).
        sr_target (int): Target sample rate for downsampling.
        n_fft (int): FFT window size.
        hop_length (int): Hop length for STFT.
        keep_bins (int): Number of frequency bins to keep (controls compression level).
    """
    y, sr = librosa.load(input_path, sr=sr_target)
    stft = librosa.stft(y, n_fft=n_fft, hop_length=hop_length)
    magnitude = np.abs(stft)

    # Normalize and keep only lower frequency bins
    compressed = magnitude[:keep_bins, :]
    max_val = np.max(compressed) or 1.0  # Avoid divide by zero
    quantized = np.round(compressed / max_val * 255).astype(np.uint8)

    # Pack header + audio data
    header = np.array([sr_target, n_fft, hop_length, keep_bins, compressed.shape[1]], dtype=np.int32)
    data = np.concatenate([header.view(np.uint8), quantized.flatten()])

    # Compress and save
    packed = zlib.compress(data)
    with open(output_path, 'wb') as f:
        f.write(packed)

    return y, sr


def decompress_audio(
    input_path: str
) -> np.ndarray:
    """
    Decompress a proprietary audio format back into a waveform.

    Parameters:
        input_path (str): Path to the compressed binary file (.mycmp).

    Returns:
        np.ndarray: Reconstructed audio waveform (mono).
    """
    with open(input_path, 'rb') as f:
        packed = f.read()

    data = zlib.decompress(packed)

    # Extract header
    header_size = 5 * 4  # 5 int32s
    header = np.frombuffer(data[:header_size], dtype=np.int32)
    sr, n_fft, hop_length, keep_bins, n_frames = header

    # Extract and reshape audio data
    quantized = np.frombuffer(data[header_size:], dtype=np.uint8)
    magnitude = quantized.astype(np.float32).reshape((keep_bins, n_frames)) / 255.0

    # Reconstruct full STFT frame (zero-pad high frequencies)
    stft_shape = (n_fft // 2 + 1, n_frames)
    full_stft = np.zeros(stft_shape, dtype=np.float32)
    full_stft[:keep_bins, :] = magnitude

    # Inverse STFT
    waveform = librosa.istft(full_stft, hop_length=hop_length)
    return waveform


In [None]:
input_audio = "HIT THE FLOOR.flac"
compressed_file = "example.mycmp"
output_audio = "reconstructed.wav"

# Compress
x, sr = load_compress_audio(input_audio, compressed_file)

# Decompress
y = decompress_audio(compressed_file)


import soundfile as sf

sf.write(output_audio, y, samplerate=sr)

In [None]:
y = np.pad(y, (0, x.shape[0] - y.shape[0]), constant_values=[0, 0])

In [None]:
x - y

In [None]:
y

In [None]:
x

In [None]:
y[:35]

# Similar Attempt with PyOgg

## Strategies

What PyOgg Currently Does with the Opus Codec



1. 

In [None]:
# 

## Viewing the Waveform

In [None]:
raw

In [None]:
raw, sr = librosa.load("HIT THE FLOOR.flac", sr=None, mono=False)
print(raw.shape)

In [None]:
n_fft = 1200 // 6
hop_length = 600 // 6
win_length = n_fft // 6
window = 'hann'

raw_spec = librosa.stft(raw, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window)

In [None]:
hf_cutoff = 800
split_cutoff = 125

total_spec = np.copy(raw_spec)
raw_spec[:, hf_cutoff:, :] = np.mean(raw_spec[:, hf_cutoff:, :], axis=(0, 1))
raw_hf_spec = np.copy(raw_spec)
raw_spec[:, split_cutoff:, :] = 0
raw_hf_spec[:, :split_cutoff, :] = 0

In [None]:
edited_raw = librosa.istft(raw_spec, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, length=raw.shape[1])
edited_hf = librosa.istft(raw_hf_spec, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, length=raw.shape[1])
edited_total = librosa.istft(total_spec, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, length=raw.shape[1])

In [None]:
raw - (edited_hf + edited_raw)

In [None]:
raw_hf_spec.shape

In [None]:
raw_hf_spec.mean(axis=0)

In [None]:
librosa.display.specshow(raw_hf_spec.mean(axis=0), sr=sr)

In [None]:
# Learning algorithm to strip irrelevancies from HF noise

In [None]:
raw_hf_spec

In [None]:
# use 8 bit prediction to fill in the noise

# Imagine we have a predictor that can predict, based on the tempo of the song (given the )

# Use percentile cutoff to quantize noise

# HF-noise usually spread in sound stage, we neglect this for now, but can arbitrarily be placed further to one side after this algorithm takes place
a_raw_hf_spec = raw_hf_spec.mean(axis=0)
P = 20

# Smoothing quantizes all "noise" or signal below the 75th percentile of 
cutoff = np.percentile(np.abs(a_raw_hf_spec), P)

In [None]:
def nearest_lower_pow10(x):
    return 10 ** np.floor(np.log10(x))


In [None]:
a_raw_hf_spec = np.where(
    a_raw_hf_spec < cutoff,
    nearest_lower_pow10(np.abs(a_raw_hf_spec)),
    np.abs(a_raw_hf_spec)
)


In [None]:
a_raw_hf_spec[a_raw_hf_spec < 1e-9] = 0

In [None]:
a_raw_hf_spec

In [None]:
raw_hf_spec.shape

In [None]:
edited_a_hf = librosa.istft(a_raw_hf_spec, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, length=raw.shape[1])

In [None]:
edited_a_hf

In [None]:
soundfile.write("edited_hit_the_floor.wav", (edited_raw + edited_hf).T, sr)
soundfile.write("q_edited_hit_the_floor.wav", (edited_raw + edited_a_hf).T, sr)
soundfile.write("hit_the_floor.wav", raw.T, sr)

In [None]:
# SNR

def calculate_snr_from_arrays(clean, noisy):
    """
    Assumes:
    - clean: NumPy array of clean signal
    - noisy: NumPy array of noisy or degraded signal
    - Both are same shape and dtype
    """
    clean = clean.astype(np.float32)
    noisy = noisy.astype(np.float32)

    noise = clean - noisy
    signal_power = np.sum(clean ** 2)
    noise_power = np.sum(noise ** 2)

    print(signal_power)
    print(noise_power)

    snr = 10 * np.log10((signal_power) / (noise_power))  # Avoid division by zero
    return snr

In [None]:
calculate_snr_from_arrays(edited_a_hf + edited_raw, raw)

In [None]:
# Compress to see where gains are made by removing c

In [None]:
subprocess.run([
    "ffmpeg", "-y",
    "-i", "hit_the_floor.wav",
    "-c:a", "libopus",
    "-b:a", "96k",  # bitrate
    "hit_the_floor.opus"
])
print("")

In [None]:
subprocess.run([
    "ffmpeg", "-y",
    "-i", "edited_hit_the_floor.wav",
    "-c:a", "libopus",
    "-b:a", "96k",  # bitrate
    "edited_hit_the_floor.opus"
])
print("")

In [None]:
subprocess.run([
    "ffmpeg", "-y",
    "-i", "q_edited_hit_the_floor.wav",
    "-c:a", "libopus",
    "-b:a", "96k",  # bitrate
    "q_edited_hit_the_floor.opus"
])
print("")

In [None]:
raw_spec.size

In [None]:
raw.size

In [None]:
raw_spec

# Low Freq Compression

In [None]:
from IPython.display import Audio

In [None]:
raw_spec.shape

In [None]:
import numpy as np

def log_bin_quantize(array):
    """
    Vectorized log-bin quantization of a NumPy array.
    Returns a single array of the same shape, where each value is
    quantized to mantissa × 10^exponent, with mantissa having 3 sig. digits.
    """
    abs_array = np.abs(array)

    # Avoid log10(0) by masking zeros
    mask_nonzero = np.all([abs_array < 0.001, abs_array > 0])

    exponents = np.zeros_like(array, dtype=int)
    mantissas = np.zeros_like(array, dtype=float)

    # Compute exponents and mantissas
    exponents[mask_nonzero] = np.floor(np.log10(abs_array[mask_nonzero])).astype(int)
    mantissas[mask_nonzero] = abs_array[mask_nonzero] / (10. ** exponents[mask_nonzero])

    # Quantize mantissas to 3 significant digits
    mantissas = np.floor(mantissas * 1000) / 1000

    # Reconstruct quantized values (preserving sign)
    quantized = np.zeros_like(array, dtype=float)
    quantized[mask_nonzero] = mantissas[mask_nonzero] * (10. ** exponents[mask_nonzero])
    quantized[~mask_nonzero] = array[~mask_nonzero]
    # quantized[~mask_nonzero] = 0.0  # Handle zeros explicitly


    return quantized

In [None]:
spec = np.abs(total_spec.mean(axis=0))

In [None]:
len(np.unique(spec))

In [None]:
qspec = log_bin_quantize(spec)

In [None]:
qspec

In [None]:
len(np.unique(qspec))

In [None]:
Audio(data=raw, rate=sr)

In [None]:
clipped = librosa.istft(qspec, n_fft=n_fft//4, hop_length=hop_length//4, win_length=win_length//4, window=window, length=raw.shape[1])
Audio(data=clipped, rate=sr)

In [None]:
rp = np.max(raw_spec)
librosa.display.specshow(librosa.amplitude_to_db(np.abs(raw_spec.mean(axis=0)), ref=rp),
                         y_axis='log', x_axis='time')

In [None]:
spec = total_spec.mean(axis=0)

In [None]:
spec.shape

In [None]:
anchors = np.zeros((spec.shape[0], 0))

In [None]:
import numpy as np

def compress_spectrogram_with_anchors(spectrogram, threshold=2, match_tolerance=1e-2):
    """
    Compress spectrogram columns by anchoring based on Euclidean distance from 0.
    
    Returns:
    - anchors: dict of {int_distance: [(unique_id, column)]}
    - compressed: list of (either full column or reference (anchor_id, unique_id))
    """
    anchors = {}
    compressed = []
    next_id = 0  # Unique ID counter for columns

    for t in range(spectrogram.shape[1]):
        col = spectrogram[:, t]
        dist = np.linalg.norm(col)
        anchor_key = int(round(dist))

        found = False
        # Check nearby keys in anchor space
        for nearby_key in range(anchor_key - threshold, anchor_key + threshold + 1):
            if nearby_key in anchors:
                for uid, ref_col in anchors[nearby_key]:
                    diff = np.linalg.norm(col - ref_col)
                    if diff <= match_tolerance:
                        # Store a reference instead of full column
                        compressed.append(("ref", nearby_key, uid))
                        found = True
                        break
            if found:
                break

        if not found:
            # New anchor column, store full column and register in anchor map
            if anchor_key not in anchors:
                anchors[anchor_key] = []
            anchors[anchor_key].append((next_id, col.copy()))
            compressed.append(("full", anchor_key, next_id, col.copy()))
            next_id += 1

    return anchors, compressed

In [None]:
compress_spectrogram_with_anchors(spec, threshold=1000, match_tolerance=)

In [None]:
anchors

# Stem-Split Compression

In [None]:
from IPython.display import Audio

In [None]:
# We use librosa to split the stems

total_harm, total_perc = librosa.decompose.hpss(total_spec, margin=2)
rp = np.max(np.abs(total_spec))

In [None]:
librosa.display.specshow(librosa.amplitude_to_db(np.abs(total_harm[:, :, :300].mean(axis=0)), ref=rp),
                         y_axis='log', x_axis='time')

In [None]:
librosa.display.specshow(librosa.amplitude_to_db(np.abs(total_perc[:, :, :300].mean(axis=0)), ref=rp),
                         y_axis='log', x_axis='time')

In [None]:
y_harmonic = librosa.istft(total_harm, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, length=raw.shape[1])
Audio(data=y_harmonic, rate=sr)

In [None]:
y_harmonic

In [None]:
y_percussive = librosa.istft(total_perc, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window, length=raw.shape[1])
Audio(data=y_percussive, rate=sr)

In [None]:
Audio(data=y_percussive+y_harmonic, rate=sr)

In [None]:
soundfile.write("harm.wav", y_harmonic.T, samplerate=sr)
soundfile.write("perc.wav", y_percussive.T, samplerate=sr)

In [None]:
subprocess.run([
    "ffmpeg", "-y",
    "-i", "perc.wav",
    "-c:a", "libopus",
    "-b:a", "96k",  # bitrate
    "perc.opus"
])
print("")

subprocess.run([
    "ffmpeg", "-y",
    "-i", "harm.wav",
    "-c:a", "libopus",
    "-b:a", "96k",  # bitrate
    "harm.opus"
])
print("")

# VAE-VQ

# Metrics

In [None]:
# Compute SNR Global

In [None]:
# Compute SNR segment wise