In [1]:
import torch
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import Audio
from ipywidgets import interact, IntSlider
from scipy.io import wavfile
from matplotlib.animation import FuncAnimation, FFMpegWriter
import os
import subprocess
from scipy.signal import find_peaks
from scipy.io.wavfile import write

SAMPLE_RATE = 44100  # Sample rate in Hz
DURATION = 3  # Duration in seconds
NUM_FRAMES = int(SAMPLE_RATE * DURATION)

t = torch.linspace(0, DURATION, NUM_FRAMES)

In [None]:
# Code for STFT and interative plot

n_fft = 4096
hop_length = 64
win_length = 1024
window = torch.hann_window(win_length)

def STFT_spectrogram(soundwave):
  result = torch.stft(soundwave.squeeze(), n_fft, hop_length, win_length, window, return_complex=True)
  spectrogram = torch.abs(result)
  max_value = spectrogram.max().item()
  num_columns = spectrogram.shape[1]
  return spectrogram, max_value, num_columns

def get_tensor(audio):
    audio_tensor = torch.tensor(audio, dtype=torch.float32)
    if audio_tensor.ndimension() > 1:
        audio_tensor = torch.mean(audio_tensor, dim=1)
    audio_tensor /= torch.max(torch.abs(audio_tensor))  # Normalize
    return audio_tensor

# Generate the plot and return number of frames
def interactive_STFT_plot(soundwave):
    spectrogram, max_value, num_columns = STFT_spectrogram(soundwave)
    slider = IntSlider(min=0, max=num_columns-1, step=1, value=0)

    def update_plot(column):
        plt.figure(figsize=(8, 4))
        plt.plot(spectrogram[:, column])
        plt.xlim(0, 250)
        plt.ylim(0, max_value * 1.1)
        plt.xlabel('Frequency')
        plt.ylabel('dB')
        plt.title(f'Column {column} of Spectrogram')
        plt.show()

    interact(update_plot, column=slider)
    plt.show()
    print(f'num_columns: {num_columns}')
    return num_columns

# Compare two spectrograms and return number of frames
def interactive_STFT_plot_compare(soundwave,soundwave2):
    spectrogram, max_value, num_columns = STFT_spectrogram(soundwave)
    spectrogram2, max_value, num_columns = STFT_spectrogram(soundwave2)
    slider = IntSlider(min=0, max=num_columns-1, step=1, value=0)

    def update_plot(column):
        plt.figure(figsize=(8, 4))
        plt.plot(spectrogram[:, column])
        plt.plot(spectrogram2[:, column])
        plt.xlim(0, 250)
        plt.ylim(0, max_value * 1.1)
        plt.xlabel('Frequency')
        plt.ylabel('dB')
        plt.title(f'Column {column} of Spectrogram')
        plt.show()

    interact(update_plot, column=slider)
    plt.show()
    print(f'num_columns: {num_columns}')
    return num_columns

In [None]:
# Code for mp4 animations with/without sound
def plot_spectrogram_frame(soundwave, column):
    spectrogram, max_value, num_columns = STFT_spectrogram(soundwave)

    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(spectrogram[:, column])
    ax.set_xlim(0, 250)
    ax.set_ylim(0, max_value * 1.1)
    ax.set_xlabel('Frequency')
    ax.set_ylabel('dB')
    ax.set_title(f'Column {column} of Spectrogram')

    fig.canvas.draw()
    frame = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
    frame = frame.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)

    return frame

def save_spectrogram_as_mp4(num_columns, duration, audio_tensor, output_dir='animations', filename='sound.mp4'):
    # Calculate FPS to match the duration of the sound
    fps = num_columns / duration

    # Get each frame
    frames = []
    for column in range(num_columns):
        frame = plot_spectrogram_frame(audio_tensor, column)
        frames.append(frame)

    # Save as MP4
    fig, ax = plt.subplots()
    im = ax.imshow(frames[0])

    plt.axis('off')

    def update(i):
        im.set_data(frames[i])
        return [im]

    ani = FuncAnimation(fig, update, frames=len(frames), blit=True)

    # Ensure the directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save the animation to the specified path
    output_path = os.path.join(output_dir, filename)
    ani.save(output_path, writer=FFMpegWriter(fps=fps))

    print(f'Animation saved to {output_path}')

    return output_path

def combine_audio_and_video(video_path, audio_path, output_path):
    command = [
        'ffmpeg',
        '-i', video_path,
        '-i', audio_path,
        '-c:v', 'copy',
        '-c:a', 'aac',
        '-strict', 'experimental',
        output_path
    ]
    subprocess.run(command, check=True)
    print(f'Combined video and audio saved to {output_path}')

In [None]:
# Code for locating peaks of frequencies
def find_actual_frequencies(soundwave, column):
    spectrogram, _, num_columns = STFT_spectrogram(soundwave)

    column_data = spectrogram[:, column]
    peaks, _ = find_peaks(column_data.numpy(), prominence=8)  # Prominence: Higher more picky

    frequencies = []
    amplitudes = []

    if len(peaks) == 0:
        # If no peaks are found, add a placeholder peak at a minimum frequency
        frequencies.append(0)
        amplitudes.append(0)
    else:
        for peak_idx in peaks:
            frequency_idx = peak_idx * SAMPLE_RATE / n_fft
            frequencies.append(frequency_idx)

            # Calculate amplitude from the spectrogram data
            amplitude = np.abs(spectrogram[peak_idx, column])
            amplitudes.append(amplitude)

    return frequencies, amplitudes


def plot_peaks_frame(soundwave, column):
    frequencies, amplitudes = find_actual_frequencies(soundwave, column)
    #print(frequencies)
    #print(amplitudes)

    fig, ax = plt.subplots(figsize=(8, 4))

    ax.scatter(frequencies, amplitudes, s=20, c='b', alpha=0.5)

    # Draw thin lines connecting each scatter point to the x-axis
    for freq, amp in zip(frequencies, amplitudes):
        ax.plot([freq, freq], [0, amp], linewidth=0.5, color='black')

    ax.set_xlim(0, 1000)
    ax.set_ylim(0, 500)  #np.max(amplitudes) * 1.1
    ax.set_xlabel('Frequency (Hz)')
    ax.set_ylabel('Amplitude')
    ax.set_title(f'Detected Frequencies - Frame {column}')

    fig.canvas.draw()
    frame = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
    frame = frame.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)

    return frame

def interactive_peak_plot(soundwave):
    spectrogram, max_value, num_columns = STFT_spectrogram(soundwave)
    slider = IntSlider(min=0, max=num_columns-1, step=1, value=0)

    def update_plot(column):
        plot_peaks_frame(soundwave, column)

    interact(update_plot, column=slider)
    plt.show()
    print(f'num_columns: {num_columns}')
    return num_columns

In [None]:
def save_peak_frequencies_as_mp4(num_columns, duration, audio_tensor, output_dir='animations', filename='peak_frequencies.mp4'):
    # Calculate FPS to match the duration of the sound
    fps = num_columns / duration
    frames = []
    for column in range(num_columns):
        frame = plot_peaks_frame(audio_tensor, column)
        frames.append(frame)

    # Save as MP4
    fig, ax = plt.subplots()
    im = ax.imshow(frames[0])

    plt.axis('off')

    def update(i):
        im.set_data(frames[i])
        return [im]

    ani = FuncAnimation(fig, update, frames=len(frames), blit=True)

    # Ensure the directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save the animation to the specified path
    output_path = os.path.join(output_dir, filename)
    ani.save(output_path, writer=FFMpegWriter(fps=fps))

    print(f'Animation saved to {output_path}')

    return output_path


In [7]:
def get_peaks_all_frame(soundwave, num_columns):
    all_frequencies = []
    all_amplitudes = []

    for column in range(num_columns):
        frequencies, amplitudes = find_actual_frequencies(soundwave, column)
        all_frequencies.append(frequencies)
        all_amplitudes.append(amplitudes)

    return all_frequencies, all_amplitudes

def find_max_frequency_and_amplitude(all_frequencies, all_amplitudes):
    max_frequency = 0.0
    max_amplitude = 0.0

    for frequencies in all_frequencies:
        if frequencies:
            max_freq_in_frame = np.max(frequencies)
            if max_freq_in_frame > max_frequency:
                max_frequency = max_freq_in_frame

    for amplitudes in all_amplitudes:
        if amplitudes:
            max_amp_in_frame = np.max(amplitudes)
            if max_amp_in_frame > max_amplitude:
                max_amplitude = max_amp_in_frame

    return max_frequency, max_amplitude

def save_peak_as_mp4(num_columns, duration, audio_tensor, output_dir='animations', filename='peak_frequencies.mp4'):
    # Calculate FPS to match the duration of the sound
    fps = num_columns / duration

    # Get all frequencies and amplitudes
    all_frequencies, all_amplitudes = get_peaks_all_frame(audio_tensor, num_columns)

    # Find maximum frequency and amplitude
    max_frequency, max_amplitude = find_max_frequency_and_amplitude(all_frequencies, all_amplitudes)

    # Generate frames for animation
    frames = []
    for frequencies, amplitudes in zip(all_frequencies, all_amplitudes):
        fig, ax = plt.subplots(figsize=(8, 4))
        ax.scatter(frequencies, amplitudes, s=20, c='b', alpha=0.5)

        for freq, amp in zip(frequencies, amplitudes):
            ax.plot([freq, freq], [0, amp], linewidth=0.5, color='black')

        ax.set_xlim(0, max_frequency * 1.2)
        ax.set_ylim(0, max_amplitude * 1.2)

        ax.set_xlabel('Frequency (Hz)')
        ax.set_ylabel('Amplitude')
        ax.set_title(f'Detected Frequencies - Frame {len(frames)}')

        fig.canvas.draw()
        frame = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
        frame = frame.reshape(fig.canvas.get_width_height()[::-1] + (3,))
        plt.close(fig)

        frames.append(frame)

    # Save as MP4
    fig, ax = plt.subplots()
    im = ax.imshow(frames[0])

    plt.axis('off')

    def update(i):
        im.set_data(frames[i])
        return [im]

    ani = FuncAnimation(fig, update, frames=len(frames), blit=True)

    # Ensure the directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save the animation to the specified path
    output_path = os.path.join(output_dir, filename)
    ani.save(output_path, writer=FFMpegWriter(fps=fps))

    print(f'Animation saved to {output_path}')

    return output_path


In [None]:
def extract_frequency_amplitude(spectrogram, prominence=7, min_amplitude=0.05):
    num_freqs, num_frames = spectrogram.shape
    frequencies = []
    amplitudes = []

    freq_vector = torch.linspace(0, SAMPLE_RATE / 2, num_freqs)

    for column in range(num_frames):
        column_data = spectrogram[:, column]
        peaks, _ = find_peaks(column_data.numpy(), prominence=prominence)

        column_frequencies = freq_vector[peaks].numpy()
        column_amplitudes = column_data[peaks].numpy()

        # Filter out low-amplitude peaks
        mask = column_amplitudes >= min_amplitude
        frequencies.append(column_frequencies[mask])
        amplitudes.append(column_amplitudes[mask])

    return frequencies, amplitudes

def track_frequencies_and_amplitudes(frequencies, amplitudes, num_frames, num_samples, sample_rate, hop_length):
    frame_duration = hop_length / sample_rate
    time_points = np.arange(num_frames) * frame_duration
    sample_times = np.linspace(0, (num_frames - 1) * frame_duration, num_samples, endpoint=False)

    # Collect all unique frequencies
    all_freqs = sorted(set(np.concatenate([f for f in frequencies if f.size > 0])))
    
    # Build frequency and amplitude trajectories
    trajectories = []
    for freq in all_freqs:
        freq_trajectory = np.zeros(num_frames)
        amp_trajectory = np.zeros(num_frames)
        for frame in range(num_frames):
            frame_freqs = frequencies[frame]
            frame_amps = amplitudes[frame]
            # Find the closest frequency in this frame
            if frame_freqs.size > 0:
                closest_idx = np.argmin(np.abs(frame_freqs - freq))
                if np.abs(frame_freqs[closest_idx] - freq) < 25:  # Threshold for matching (25 Hz)
                    freq_trajectory[frame] = frame_freqs[closest_idx]
                    amp_trajectory[frame] = frame_amps[closest_idx]
                else:
                    freq_trajectory[frame] = freq  # Keep previous if no match
                    amp_trajectory[frame] = 0      # Zero amplitude if no match
            else:
                freq_trajectory[frame] = freq
                amp_trajectory[frame] = 0

        # Interpolate trajectories
        interpolated_freqs = np.interp(sample_times, time_points, freq_trajectory)
        interpolated_amps = np.interp(sample_times, time_points, amp_trajectory)
        trajectories.append((interpolated_freqs, interpolated_amps))

    return trajectories

def recreate_sound_from_frequencies_amplitudes(frequencies, amplitudes, num_frames, duration, sample_rate, hop_length):
    num_samples = int(sample_rate * duration)
    reconstructed_signal = np.zeros(num_samples)
    time = np.linspace(0, duration, num_samples, endpoint=False)

    # Track and interpolate frequencies and amplitudes
    trajectories = track_frequencies_and_amplitudes(frequencies, amplitudes, num_frames, num_samples, sample_rate, hop_length)

    # Apply fade-in and fade-out envelope
    fade_length = int(0.3 * sample_rate)
    envelope = np.ones(num_samples)
    envelope[:fade_length] = np.linspace(0, 1, fade_length)
    envelope[-fade_length:] = np.linspace(1, 0, fade_length)

    # Generate signal with phase continuity
    phases = np.zeros(len(trajectories))  # Initial phase for each trajectory
    for t in range(num_samples):
        for i, (freq_trajectory, amp_trajectory) in enumerate(trajectories):
            freq = freq_trajectory[t]
            amp = amp_trajectory[t]
            if amp > 0:
                phases[i] += 2 * np.pi * freq / sample_rate  # Update phase
                reconstructed_signal[t] += amp * np.sin(phases[i])

    # Apply envelope
    reconstructed_signal *= envelope

    # Normalize
    max_val = np.max(np.abs(reconstructed_signal))
    if max_val > 0:
        reconstructed_signal /= max_val

    return reconstructed_signal

def sine_recreate_sound(audio_tensor, n_fft=4096, hop_length=64):
    num_samples = len(audio_tensor)
    duration = num_samples / SAMPLE_RATE
    spectrogram, _, num_columns = STFT_spectrogram(audio_tensor)

    frequencies, amplitudes = extract_frequency_amplitude(spectrogram, prominence=7, min_amplitude=0.05)
    sound = recreate_sound_from_frequencies_amplitudes(
        frequencies, amplitudes, num_columns, duration, SAMPLE_RATE, hop_length
    )
    return sound

In [None]:
A = torch.sin(2 * np.pi * 220 * t)
B = 1.2* torch.sin(2 * np.pi * 437 * t + A)
C = 2.2* torch.sin(2 * np.pi * 6345 * t + B)
C = torch.sin(2 * np.pi * 132 * t + C)
interactive_STFT_plot(C)
Audio(B, rate=SAMPLE_RATE)

In [None]:
file_path = './DexedSounds/Sahara-1.wav'
sample_rate, audio = wavfile.read(file_path)

audio_tensor = get_tensor(audio)

Audio(audio_tensor, rate = sample_rate)


In [23]:
reconstructed_signal = sine_recreate_sound(audio_tensor)
Audio(reconstructed_signal, rate = SAMPLE_RATE)

In [None]:
file_path = './DexedSounds/Analog1-14.wav'
sample_rate, audio = wavfile.read(file_path)

audio_tensor = get_tensor(audio)
num_columns = interactive_STFT_plot(audio_tensor)

duration = len(audio) / sample_rate
print(duration)

save_peak_frequencies_as_mp4(num_columns, duration, audio_tensor, filename='analog_peak.mp4')

video_path = './DexedSounds/animations/analog_peak.mp4'
audio_path = './DexedSounds/Analog1-14.wav'
output_path = './DexedSounds/analog_peak_final.mp4'
combine_audio_and_video(video_path, file_path, output_path)