In [None]:
import matplotlib.pyplot as plt
import numpy as np
from scipy.fft import fft, fftfreq, fftshift

data = np.load("../tenth_thread.npy")

VELA_PERIOD = 89.4  # ms

In [None]:
OVERLAP = 2**4
SEGMENT = 2**5
SAMPLING_RATE = int(51_200_000 / 27)  # Hz

window = np.hanning(SEGMENT)

n_segments = (data.shape[1] - SEGMENT) // (SEGMENT - OVERLAP)

frequencies_axis = fftfreq(SEGMENT, 1 / SAMPLING_RATE)
frequencies_axis = np.fft.fftshift(frequencies_axis)
times = np.arange(n_segments) * (SEGMENT - OVERLAP) / int(SAMPLING_RATE)

In [None]:
output = np.empty((8, n_segments, SEGMENT))

for channel in range(8):
    for j in range(n_segments):
        start = j * (SEGMENT - OVERLAP)
        end = start + SEGMENT
        if end > data.shape[1]:
            print("past end of array")
            break
        sliced_data = data[channel, start:end]

        ff_transform = fft(sliced_data * window)
        ff_shift = fftshift(ff_transform)
        power = np.abs(ff_shift)

        output[channel, j, :] = power

In [None]:
n_channels = 8


fig, axs = plt.subplots(n_channels, 1, figsize=(20, 16), sharex=True, sharey=True)

for i in range(n_channels):
    ax = axs[i]
    im = ax.imshow(
        output[i].T,
        aspect="auto",
        cmap="inferno",
        origin="lower",
        # vmin=500,
        # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
    )
    ax.set_ylabel(f"Channel {i + 1} Frequency [Hz]")
    ax.set_xlabel("Time [s]")


fig.colorbar(im, ax=axs, label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
n_channels = 8


fig, axs = plt.subplots(n_channels, 1, figsize=(20, 16), sharex=True, sharey=True)

for i in range(n_channels):
    ax = axs[i]
    im = ax.plot(
        times,
        output[i].sum(axis=1),
        # aspect="auto",
        # cmap="inferno",
        # origin="lower",
        # vmin=500,
        # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
    )
    ax.set_ylabel(f"Channel {i + 1} Frequency [Hz]")
    ax.set_xlabel("Time [s]")


# fig.colorbar(im, ax=axs, label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
n_channels = 8


plt.plot(
    times,
    output.sum(axis=0).sum(axis=1) / n_channels,
    # aspect="auto",
    # cmap="inferno",
    # origin="lower",
    # vmin=500,
    # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
)
plt.ylabel(f"Channel {i + 1} Frequency [Hz]")
plt.xlabel("Time [s]")


# im.colorbar(label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
# len(times) is number of data points in SEGMENT time
# so this gives the number of SEGMENT (FFT) required to get a period of
# 89ms = VELA period
seconds_between_samples = 27 / 51_200_000
samples_89ms = VELA_PERIOD / 1000 / seconds_between_samples
samples_89ms

In [None]:
samples_89ms // (SEGMENT - OVERLAP)

In [None]:
FOLD_SEGMENT = int(samples_89ms // (SEGMENT - OVERLAP))

output_folded = np.zeros((n_channels, FOLD_SEGMENT, SEGMENT))
num_folds = 0
for channel in range(n_channels):
    i = 0
    while True:
        start = i * FOLD_SEGMENT
        end = start + FOLD_SEGMENT
        if end > output.shape[1]:
            break
        output_folded[channel, :] += output[channel, start:end]
        num_folds += 1
        i += 1
num_folds /= n_channels

output_folded /= num_folds

In [None]:
def average_3d_array(arr, fold_length):
    """
    Averages the last two dimensions every fold_length elements along the second-to-last dimension.
    If the second-to-last dimension isn't divisible by fold_length, it truncates the extra elements.

    Parameters:
    - arr: np.ndarray of shape (D, N, M)
    - fold_length: length of the folded section

    Returns:
    - np.ndarray of shape (D, fold_length, M)
    """
    D, N, M = arr.shape
    N_trimmed = (N // fold_length) * fold_length

    arr_trimmed = arr[:, :N_trimmed, :]
    reshaped = arr_trimmed.reshape(D, N_trimmed // fold_length, fold_length, M)
    averaged = reshaped.mean(axis=1)

    return averaged


output_folded_second_method = average_3d_array(output, FOLD_SEGMENT)

In [None]:
fig, axs = plt.subplots(n_channels, 1, figsize=(20, 16), sharex=True, sharey=True)

for i in range(n_channels):
    ax = axs[i]
    im = ax.imshow(
        output_folded_second_method[i].T,
        aspect="auto",
        # cmap="inferno",
        origin="lower",
        # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
    )
    ax.set_ylabel(f"Channel {i + 1} Frequency [Hz]")
    ax.set_xlabel("Time [s]")


fig.colorbar(im, ax=axs, label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels (Folded)", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
n_channels = 8


plt.plot(
    times[0:FOLD_SEGMENT],
    output_folded.sum(axis=0).sum(axis=1) / n_channels,
    # aspect="auto",
    # cmap="inferno",
    # origin="lower",
    # vmin=500,
    # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
)
plt.ylabel(f"Channel {i + 1} Frequency [Hz]")
plt.xlabel("Time [s]")


# im.colorbar(label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
n_channels = 8


fig, ax = plt.subplots(1, 1)

ax.imshow(
    output_folded.sum(axis=0).T / n_channels,
    aspect="auto",
    cmap="inferno",
    origin="lower",
    # vmin=500,
    # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
)
ax.set_ylabel(f"Channel {i + 1} Frequency [Hz]")
ax.set_xlabel("Time [s]")


# im.colorbar(label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
fig, axs = plt.subplots(n_channels, 1, figsize=(20, 16), sharex=True, sharey=True)

for i in range(n_channels):
    ax = axs[i]
    im = ax.imshow(
        output_folded[i].T,
        aspect="auto",
        # cmap="inferno",
        origin="lower",
        # extent=[times[0], times[-1], frequencies_axis[0], frequencies_axis[-1]],
    )
    ax.set_ylabel(f"Channel {i + 1} Frequency [Hz]")
    ax.set_xlabel("Time [s]")


fig.colorbar(im, ax=axs, label="Amplitude")


plt.suptitle("Frequency vs Time for Multiple Channels (Folded)", fontsize=16)


# plt.tight_layout()
plt.show()

In [None]:
output_folded.shape

In [None]:
## use scipy
from scipy.signal import ShortTimeFFT
from scipy.signal.windows import gaussian

g_std = 512
w = gaussian(1024, std=g_std, sym=True)  # symmetric Gaussian window
SFT = ShortTimeFFT(w, hop=512, fs=SAMPLING_RATE, fft_mode="centered")
Sx = SFT.stft(data[0])  # perform the STFT

In [None]:
N = len(data[0])

T_x = 1 / SAMPLING_RATE
t_x = np.arange(N) * T_x


fig1, ax1 = plt.subplots(figsize=(6.0, 4.0))  # enlarge plot a bit
t_lo, t_hi = SFT.extent(N)[:2]  # time range of plot
ax1.set_title(
    rf"STFT ({SFT.m_num * SFT.T:g}$\,s$ Gaussian window, "
    + rf"$\sigma_t={g_std * SFT.T}\,$s)"
)
ax1.set(
    xlabel=f"Time $t$ in seconds ({SFT.p_num(N)} slices, "
    + rf"$\Delta t = {SFT.delta_t:g}\,$s)",
    ylabel=f"Freq. $f$ in Hz ({SFT.f_pts} bins, "
    + rf"$\Delta f = {SFT.delta_f:g}\,$Hz)",
    xlim=(t_lo, t_hi),
)

im1 = ax1.imshow(
    abs(Sx), origin="lower", aspect="auto", extent=SFT.extent(N), cmap="viridis"
)
# ax1.plot(t_x, f_i, 'r--', alpha=.5, label='$f_i(t)$')
fig1.colorbar(im1, label="Magnitude $|S_x(t, f)|$")

# Shade areas where window slices stick out to the side:
for t0_, t1_ in [
    (t_lo, SFT.lower_border_end[0] * SFT.T),
    (SFT.upper_border_begin(N)[0] * SFT.T, t_hi),
]:
    ax1.axvspan(t0_, t1_, color="w", linewidth=0, alpha=0.2)
for t_ in [0, N * SFT.T]:  # mark signal borders with vertical line:
    ax1.axvline(t_, color="y", linestyle="--", alpha=0.5)
ax1.legend()
fig1.tight_layout()
plt.show()