In [42]:
import scipy.io.wavfile as wav
import numpy as np

def read_wave_file(file_path) -> tuple[int, np.ndarray]:
    rate, data = wav.read(file_path)
    if data.ndim == 2:
        data = data[:, 0]
    # convert to float
    if data.dtype == 'int16':
        data = data / 32768.0
    elif data.dtype == 'float32':
        pass
    else:
        raise ValueError("Unknown data type")
    return rate, data

def save_wave_file(file_path, rate, data):
    # Ensure data is in the correct format
    if data.dtype != 'float32':
        data = data.astype('float32')

    # Convert float data to int16 for saving
    data_int16 = (data * 32767).astype('int16')

    # Write the data to a wav file
    wav.write(file_path, rate, data_int16)

# plot the audio data
# use popup window
%matplotlib qt
import matplotlib.pyplot as plt

import sounddevice as sd

In [252]:
def plot_audio(data, rate, seconds=1):
    # If seconds is None, plot the entire audio data
    if seconds is None:
        num_samples = len(data)
        seconds = num_samples / rate
    else:
        num_samples = min(int(seconds * rate), len(data))
        seconds = min(seconds, len(data) / rate)

    # Create a time axis based on the sample rate and the specified duration
    time = np.linspace(0., seconds, num_samples)

    # Plot the audio data for the specified duration
    plt.figure()
    plt.plot(time, data[:num_samples])
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')
    plt.title(f'Audio Data (First {seconds} seconds)' if seconds is not None else 'Audio Data (Full)')
    plt.show()

def plot_fft(data, rate, seconds=1, freq_range=(0, 24000)):
    # If seconds is None, perform FFT on the entire audio data
    if seconds is None:
        num_samples = len(data)
    else:
        num_samples = min(int(seconds * rate), len(data))
        seconds = min(seconds, len(data) / rate)
    assert num_samples % 2 == 0
    # Perform FFT on the audio data
    freqs = np.fft.fft(data[:num_samples])
    freqs = np.abs(freqs) / num_samples
    freqs = freqs[:num_samples // 2]
    plt.figure()
    plt.plot(np.linspace(0., rate / 2, num_samples // 2), freqs)
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Magnitude')
    plt.title(f'FFT of Audio Data (First {seconds} seconds)' if seconds is not None else 'FFT of Audio Data (Full)')
    plt.xlim(*freq_range)
    plt.show()

def plot_wav_file(file_path, seconds=1):
    rate, data = read_wave_file(file_path)
    plot_audio(data, rate, seconds)
    plot_fft(data, rate, seconds)

# plot_audio(data, rate)
# plot_fft(data, rate)

In [6]:
# plot_wav_file("../build/raw_output.wav")
# plot_wav_file("../build/raw_input.wav")

In [138]:
def play_audio(audio: np.array, rate=48000):
    save_wave_file("../build/temp.wav", rate, audio)
    sd.play(audio, rate)
    sd.wait()

def audio_by_function(func: callable, rate=48000, duration=1):
    timepoints = np.linspace(0, duration, int(rate * duration))
    return func(timepoints)

def play_audio_by_function(func: callable, rate=48000, duration=1):
    timepoints = np.linspace(0, duration, int(rate * duration))
    audio = audio_by_function(func, rate, duration)
    play_audio(audio, rate)

def sine_wave(freq_hz, phase=0):
    return lambda time_in_second: np.sin(2 * np.pi * freq_hz * time_in_second + phase)

sine_440hz = sine_wave(440)
sine_880hz = sine_wave(880)

def square_wave(freq):
    return lambda time_in_second: np.sign(np.sin(2 * np.pi * freq * time_in_second))

def cosine_440hz(time_in_second):
    return np.cos(2 * np.pi * 440 * time_in_second)

def multiply_amplitude(amplitude: float, func: callable):
    def wrapper(*args, **kwargs):
        return amplitude * func(*args, **kwargs)
    return wrapper

def sine_wave_samples(freq, cycle, phase, amplitude, rate=48000):
    duration = cycle / freq
    timepoints = np.linspace(0, duration, int(rate * duration))
    return multiply_amplitude(amplitude, sine_wave(freq, phase))(timepoints)

def play_sine_waves(sine_waves: tuple[int, float, float], rate=48000):
    audio = np.zeros(0)
    for freq, cycle, amplitude in sine_waves:
        sine_wave_audio = sine_wave_samples(freq, cycle, 0, amplitude, rate)
        audio = np.concatenate((audio, sine_wave_audio))
    play_audio(audio, rate)

# play_sine_waves([(4800, 4, 0.5), (4800, 4, 1), (2400, 4, 1)] * 500)

# play_audio_by_function(sine_wave(2400))
# play_audio_by_function(sine_wave(4800))
# play_audio_by_function(sine_wave(9600))

In [58]:
rate = 48000
freq = 10
cycles = 1
amplitude = 1
window_size = int(rate / freq * cycles)

one = sine_wave_samples(freq, cycles, 0, amplitude)

def plot_modulation(one, zero, title_prefix):
    # zero_shift = zero * one
    # zero_smooth = np.convolve(zero_shift, np.ones(window_size) / window_size, mode='full')
    # fig, axs = plt.subplots(2, 2)
    # axs[0, 0].plot(one)
    # axs[0, 0].set_title(f"{title_prefix} One")
    # axs[0, 0].set_ylim([-1, 1])
    # axs[0, 1].plot(zero)
    # axs[0, 1].set_title(f"{title_prefix} Zero")
    # axs[0, 1].set_ylim([-1, 1])
    # axs[1, 0].plot(zero_shift)
    # axs[1, 0].set_title(f"{title_prefix} Zero Shift")
    # axs[1, 0].set_ylim([-1, 1])
    # axs[1, 1].plot(zero_smooth)
    # axs[1, 1].set_title(f"{title_prefix} Zero Smooth")
    # axs[1, 1].set_ylim([-1, 1])
    # axs[1, 1].axvline(x=window_size, color='r', linestyle='--')
    # axs[1, 1].annotate(f'Result: {zero_smooth[window_size]:.2e}', xy=(0.5, 0.9), xycoords='axes fraction', fontsize=10, ha='center', va='center', bbox=dict(boxstyle="round,pad=0.3", edgecolor='black', facecolor='white'))
    # plt.show()
    one_shift = one * one
    one_smooth = np.convolve(one_shift, np.ones(window_size) / window_size, mode='full')
    zero_shift = zero * one
    zero_smooth = np.convolve(zero_shift, np.ones(window_size) / window_size, mode='full')
    fig, axs = plt.subplots(2, 3)
    axs[0, 0].plot(one)
    axs[0, 0].set_title(f"{title_prefix} One")
    axs[0, 0].set_ylim([-1, 1])
    axs[0, 1].plot(one_shift)
    axs[0, 1].set_title(f"{title_prefix} One Shift")
    axs[0, 1].set_ylim([-1, 1])
    axs[0, 2].plot(one_smooth)
    axs[0, 2].set_title(f"{title_prefix} One Smooth")
    axs[0, 2].set_ylim([-1, 1])
    axs[0, 2].axvline(x=window_size, color='r', linestyle='--')
    axs[0, 2].annotate(f'Result: {one_smooth[window_size]:.2e}', xy=(0.5, 0.9), xycoords='axes fraction', fontsize=10, ha='center', va='center', bbox=dict(boxstyle="round,pad=0.3", edgecolor='black', facecolor='white'))
    axs[1, 0].plot(zero)
    axs[1, 0].set_title(f"{title_prefix} Zero")
    axs[1, 0].set_ylim([-1, 1])
    axs[1, 1].plot(zero_shift)
    axs[1, 1].set_title(f"{title_prefix} Zero Shift")
    axs[1, 1].set_ylim([-1, 1])
    axs[1, 2].plot(zero_smooth)
    axs[1, 2].set_title(f"{title_prefix} Zero Smooth")
    axs[1, 2].set_ylim([-1, 1])
    axs[1, 2].axvline(x=window_size, color='r', linestyle='--')
    axs[1, 2].annotate(f'Result: {zero_smooth[window_size]:.2e}', xy=(0.5, 0.9), xycoords='axes fraction', fontsize=10, ha='center', va='center', bbox=dict(boxstyle="round,pad=0.3", edgecolor='black', facecolor='white'))
    plt.show()

# ASK, Amplitude Shift Keying
zero_ask = sine_wave_samples(freq, cycles, 0, 0.5)
plot_modulation(one, zero_ask, "ASK")
# one = 0.5
# zero = 0.25

# FSK, Frequency Shift Keying
zero_fsk = sine_wave_samples(freq * 2, cycles * 2, 0, 1)
plot_modulation(one, zero_fsk, "FSK")
# one = 0.5
# zero = 0

# PSK, Phase Shift Keying
zero_psk = sine_wave_samples(freq, cycles, np.pi, 1)
plot_modulation(one, zero_psk, "PSK pi")
# one = 0.5
# zero = -0.5

# PSK, Phase Shift Keying
zero_psk = sine_wave_samples(freq, cycles, np.pi/2, 1)
plot_modulation(one, zero_psk, "PSK pi/2")
# one = 0.5
# zero = 0

The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x58dae9cca540
qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x58daea22aec0
qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x58dae9ab2b

In [82]:
rate = 10000
freq = 10
cycles = 1
amplitude = 1
window_size = int(rate / freq * cycles)

duration = cycles / freq
timepoints = np.linspace(0, duration, int(rate * duration))

one = np.sin(2 * np.pi * freq * timepoints)
zero = np.cos(2 * np.pi * 2 * freq * timepoints)
# one = np.ones(len(timepoints))
carrier = np.cos(2 * np.pi * 200 * timepoints)

one_passband = one * carrier
zero_passband = zero * carrier

one_decode = one_passband * carrier

plot_fft(one_decode, rate)

# plt.plot(one_passband)
# plt.plot(one_decode)
# (one_decode, rate)
print(np.dot(one_decode, np.sin(2 * np.pi * freq * timepoints)))
print(np.dot(one_decode, np.cos(2 * np.pi * freq * timepoints)))

print(np.dot(one_decode, np.sin(2 * np.pi * 2 * freq * timepoints)))
print(np.dot(one_decode, np.cos(2 * np.pi * 2 * freq * timepoints)))

249.74999999999986
-1.5727643521394e-13
2.8395679091495346e-13
-1.2243277947546984e-13


The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x58dae9a30bd0


In [185]:
rate = 20

freq = 10
cycles = 20
amplitude = 1
window_size = int(rate / freq * cycles)

duration = cycles / freq
x = np.linspace(0, duration, int(rate * duration))
x_more = np.linspace(0, duration, int(rate * duration * 10))

wave = sine_wave(freq)(x)

plt.plot(x, wave)
# plt.plot(x_more, sine_wave(freq)(x_more))

# plot_fft(wave, rate)

[<matplotlib.lines.Line2D at 0x7c6ea3d5d0a0>]

In [186]:
t = np.linspace(0, 1, 48000)
# wave = sine_wave(200)(t) * sine_wave(1000)(t)
wave = sine_wave(200)(t) * sine_wave(200, np.pi / 2)(t)
plot_fft(wave, 48000)

In [187]:
plot_wav_file("../build/temp.wav", seconds=None)

In [188]:
def play_and_record(data, rate, discard=True):
    save_wave_file("../build/temp.wav", rate, data)

    cmd = "../build/supersonic/play_and_record -f ../build/temp.wav -i 'UGREEN CM564 USB Audio  Mono:capture_MONO' -o 'USB2.0 Device Analog Stereo:playback_FL'"
    import subprocess
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    p.wait()
    
    rate, record_wave = read_wave_file("raw_input.wav")
    rate, play_wave = read_wave_file("raw_output.wav")
    if discard:
        # discard first 0.5s
        record_wave = record_wave[int(0.5 * rate):]
        play_wave = play_wave[int(0.5 * rate):]
    return record_wave, play_wave, rate

The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.


In [189]:
rate = 48000
duration = 10

none_wave = np.zeros(int(rate * duration))

noise_wave, _, rate = play_and_record(none_wave, rate)

KeyboardInterrupt: 

The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
The cached device pixel ratio value was stale on window update.  Please file a QTBUG which explains how to reproduce.
qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x5c0a3b850740
qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x5c0a3baf01d0


In [58]:
# calculate noise power
noise_power = np.sum(noise_wave ** 2) / len(noise_wave) * rate
print(f"noise power: {noise_power}")

noise power: 0.09256891161203384


In [81]:
plot_fft(noise_wave, rate)

qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x5c0a3b80e030


In [99]:
class FrequencyData:
    def __init__(self, freq, wave, signal, noise):
        self.freq = freq
        self.wave = wave
        self.s_div_n = signal / noise
        print(f"freq={freq} S/N={self.s_div_n}")
freq_data_map = {}

In [103]:
def test_freq_SNR(freq):
    rate = 48000
    duration = 1.5
    cycles = duration * freq
    phase = 0
    amplitude = 1

    # print(f"Freq = {freq}, Period (s) = {1 / freq}")

    wave = sine_wave_samples(freq, cycles, phase, amplitude, rate)
    record_wave, play_wave, rate = play_and_record(wave, rate)

    # calculate signal power
    signal_noise_power = np.sum(record_wave ** 2) / len(record_wave) * rate
    signal_power = signal_noise_power - noise_power
    # print(f"freq={freq} signal power: {signal_power}")

    # calculate SNR
    # print(f"freq={freq} S/N={signal_power / noise_power}")
    # print(f"freq={freq} SNR={10 * np.log10(signal_power / noise_power)}")

    freq_data_map[freq] = FrequencyData(freq, record_wave, signal_power, noise_power)

for freq in range(200, 1000, 100):
    test_freq_SNR(freq)

for freq in range(1000, 10000, 500):
    test_freq_SNR(freq)

for freq in range(10000, 15000, 500):
    test_freq_SNR(freq)

for freq in range(15000, 20000, 500):
    test_freq_SNR(freq)

# for freq in [100, 200, 400, 800, 1600, 3200, 6400, 12800]:
#     test_freq_SNR(freq)

freq=200 S/N=2.439788818359375
freq=300 S/N=2.4707531929016113
freq=400 S/N=8.894071578979492
freq=500 S/N=2.5171334743499756
freq=600 S/N=1.8965137004852295
freq=700 S/N=4.5611138343811035
freq=800 S/N=4.237361907958984
freq=900 S/N=5.708857536315918
freq=1000 S/N=9.166345596313477
freq=1500 S/N=5.319309711456299
freq=2000 S/N=9.06489372253418
freq=2500 S/N=17.16437339782715
freq=3000 S/N=10.600259780883789
freq=3500 S/N=20.592689514160156
freq=4000 S/N=15.892398834228516
freq=4500 S/N=18.32355499267578
freq=5000 S/N=16.683053970336914
freq=5500 S/N=16.264638900756836
freq=6000 S/N=16.570907592773438
freq=6500 S/N=15.168131828308105
freq=7000 S/N=9.810857772827148
freq=7500 S/N=4.563968181610107
freq=8000 S/N=4.931657791137695
freq=8500 S/N=21.85814094543457
freq=9000 S/N=31.73781967163086
freq=9500 S/N=50.711936950683594
freq=10000 S/N=26.714826583862305
freq=10500 S/N=52.22666549682617
freq=11000 S/N=26.453899383544922
freq=11500 S/N=60.73630142211914
freq=12000 S/N=21.5073890686035

In [116]:
# plot_fft(freq_data_map[13000].wave, rate)
# plot freq and S/N
freqs = sorted(list(freq_data_map.keys()))
s_div_n = [freq_data_map[freq].s_div_n for freq in freqs]
# plt.plot(freqs, s_div_n)
plt.title("Freq - log2(1 + S/N)")
plt.plot(freqs, np.log2(1 + np.array(s_div_n)))

[<matplotlib.lines.Line2D at 0x7c6ed8ae4470>]

qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x5c0a3bcce420


In [132]:
# plot_audio(freq_data_map[13500].wave, rate)
# plot_fft(freq_data_map[13500].wave, rate)
# plot_audio(sine_wave(13500)(np.linspace(0, 0.2, 48000)), 48000)

x = np.linspace(0, 0.01, int(0.01*48000))
x_fine = np.linspace(0, 0.01, 10000)
y = sine_wave(13500)(x)
y_fine = sine_wave(13500)(x_fine)
# plot graph and point
plt.plot(x, y)
plt.plot(x_fine, y_fine)
plt.scatter(x, y)
plt.show()

qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x5c0a3be4d130


In [114]:
B = 13500 - 8500
C = B * np.log2(1 + freq_data_map[8500].s_div_n)
print(f"Channel Capacity: {C} bps, {C/1000} kbps")

Channel Capacity: 22573.1796875 bps, 22.573179244995117 kbps


In [153]:
# try play square wave
rate = 48000
freq = 500
duration = 1
timepoints = np.linspace(0, duration, int(rate * duration))
wave = square_wave(freq)(timepoints)
record_wave, play_wave, rate = play_and_record(wave, rate)

In [161]:
# try play triangle wave
rate = 48000
freq = 500
duration = 1
timepoints = np.linspace(0, duration, int(rate * duration))
def triangle_wave(freq):
    return lambda time_in_second: np.abs(2 * (time_in_second * freq - np.floor(time_in_second * freq + 0.5)))
wave = triangle_wave(freq)(timepoints)
record_wave, play_wave, rate = play_and_record(wave, rate)

In [261]:
# try play sweep wave
rate = 48000
duration = 5
timepoints = np.linspace(0, duration, int(rate * duration) + 1)
# print(timepoints[1], 1. / rate)
wave = np.zeros(len(timepoints))
count = 0
frequencies = [261.63, 327.04, 392.45]

for freq in frequencies:
    # print(freq)
    count += 1
    wave += sine_wave(freq)(timepoints)
wave /= count
record_wave, play_wave, rate = play_and_record(wave, rate)

In [262]:
# plot_audio(record_wave, rate)
# plot_fft(record_wave, rate, freq_range=(200, 400))
# plot_fft(play_wave, rate)
# plot_audio(play_wave, rate)

qt.qpa.wayland.textinput: virtual void QtWaylandClient::QWaylandTextInputv3::zwp_text_input_v3_leave(wl_surface*) Got leave event for surface 0x0 focused surface 0x5c0a3b9d7ab0
