# Notes

Transpose and plot a  `78,78,3` array.

```
sp.plot_single_stream(istft(x.transpose(1, 2, 0)[:,:,0], fs=100, nperseg=155)[1], label="", fs=100, nperseg=155)
```

In [None]:
import logging
import h5py
from numpy import zeros
from numpy import ones
from numpy.lib.financial import nper
from numpy.random import randn
from numpy.random import randint
from tensorflow.keras.optimizers import Adam
from keras.models import Sequential
from keras import layers

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from scipy.signal import spectrogram, stft, istft
import numpy as np

In [None]:
def plot_all(do, label, fs=100, nperseg=155, file_path=None):
    d0 = pd.DataFrame(data=do[0][:6000])
    d1 = pd.DataFrame(data=do[1][:6000])
    d2 = pd.DataFrame(data=do[2][:6000])

    fig = plt.figure(figsize=(16, 10), dpi=80)
    ax1 = plt.subplot2grid((5, 6), (0, 0), colspan=3)
    ax2 = plt.subplot2grid((5, 6), (1, 0), colspan=3)
    ax3 = plt.subplot2grid((5, 6), (2, 0), colspan=3)
    ax4 = plt.subplot2grid((5, 6), (0, 3), colspan=3)
    ax5 = plt.subplot2grid((5, 6), (1, 3), colspan=3)
    ax6 = plt.subplot2grid((5, 6), (2, 3), colspan=3)
    ax7 = plt.subplot2grid((5, 6), (3, 0), colspan=2, rowspan=2)
    ax8 = plt.subplot2grid((5, 6), (3, 2), colspan=2, rowspan=2)
    ax9 = plt.subplot2grid((5, 6), (3, 4), colspan=2, rowspan=2)

    plt.subplots_adjust(hspace=1, wspace=1)

    sns.lineplot(data=d0, ax=ax1, linewidth=1, legend=None)
    sns.lineplot(data=d1, ax=ax2, linewidth=1, legend=None)
    sns.lineplot(data=d2, ax=ax3, linewidth=1, legend=None)

    ax1.set_title("Vertical component waveform")
    ax1.set(xlabel="Samples", ylabel="Amplitude counts")
    ax1.locator_params(nbins=6, axis="y")

    ax2.set_title("North component waveform")
    ax2.set(xlabel="Samples", ylabel="Amplitude counts")
    ax2.locator_params(nbins=6, axis="y")

    ax3.set_title("East component waveform")
    ax3.set(xlabel="Samples", ylabel="Amplitude counts")
    ax3.locator_params(nbins=6, axis="y")

    f_0, t_0, Sxx_0 = spectrogram(x=do[0], fs=fs)
    f_1, t_1, Sxx_1 = spectrogram(x=do[1], fs=fs)
    f_2, t_2, Sxx_2 = spectrogram(x=do[2], fs=fs)

    ax4.clear()
    ax4.set_title("Vertical component spectrogram")
    _ax4 = ax4.pcolormesh(t_0, f_0, Sxx_0, shading="gouraud")
    ax4.set(xlabel="Time [sec]", ylabel="Frequency [Hz]")
    fig.colorbar(_ax4, ax=ax4)

    ax5.clear()
    ax5.set_title("North component spectrogram")
    _ax5 = ax5.pcolormesh(t_1, f_1, Sxx_1, shading="gouraud")
    ax5.set(xlabel="Time [sec]", ylabel="Frequency [Hz]")
    fig.colorbar(_ax5, ax=ax5)

    ax6.clear()
    ax6.set_title("East component spectrogram")
    _ax6 = ax6.pcolormesh(t_2, f_2, Sxx_2, shading="gouraud")
    ax6.set(xlabel="Time [sec]", ylabel="Frequency [Hz]")
    fig.colorbar(_ax6, ax=ax6)

    f_sftt_0, t_sftt_0, Zxx_0 = stft(
        do[0], window="hanning", fs=fs, nperseg=nperseg
    )
    f_sftt_1, t_sftt_1, Zxx_1 = stft(
        do[1], window="hanning", fs=fs, nperseg=nperseg
    )
    f_sftt_2, t_sftt_2, Zxx_2 = stft(
        do[2], window="hanning", fs=fs, nperseg=nperseg
    )

    ticks = None

    if fs == 100:
        ticks = np.arange(78)
    else:
        ticks = np.arange(64)

    ax7.clear()
    ax7.set_title("Vertical component STFT")
    _ax7 = ax7.pcolormesh(ticks, ticks, np.abs(Zxx_0), shading="auto")
    fig.colorbar(_ax7, ax=ax7)

    ax8.clear()
    ax8.set_title("North component STFT")
    _ax8 = ax8.pcolormesh(ticks, ticks, np.abs(Zxx_1), shading="auto")
    fig.colorbar(_ax8, ax=ax8)

    ax9.clear()
    ax9.set_title("East component STFT")
    _ax9 = ax9.pcolormesh(ticks, ticks, np.abs(Zxx_2), shading="auto")
    fig.colorbar(_ax9, ax=ax9)

    plt.suptitle(label, fontsize=14)

    if file_path != None:
        plt.savefig(file_path)
        plt.close(fig)

In [None]:
def make_generator_model(latent_dim):
    model = Sequential()
    model.add(layers.Dense(3 * 3 * 256, use_bias=False, input_shape=(latent_dim,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((3, 3, 256)))

    model.add(
        layers.Conv2DTranspose(
            64, (20, 20), strides=(13, 13), padding="same", use_bias=False
        )
    )
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.1))
    
    model.add(
        layers.Conv2DTranspose(
            64, (10, 10), strides=(2, 2), padding="same", use_bias=False
        )
    )
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.1))
    
    model.add(
        layers.Conv2DTranspose(
            64, (10, 10), strides=(1, 1), padding="same", use_bias=False
        )
    )
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.1))
    

    model.add(
        layers.Conv2DTranspose(
            3,
            (3, 3),
            padding="same",
            use_bias=False,
            activation="linear",
        )
    )

    return model

In [None]:
def make_discriminator_model():
    model = Sequential()

    model.add(layers.Conv2D(128, (1, 1), padding="same", input_shape=[78, 78, 3]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.2))

    model.add(layers.Conv2D(128, (13, 13), strides=(2, 2), padding="same"))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.2))

    model.add(layers.Conv2D(128, (2, 2), strides=(2, 2), padding="same"))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.2))
    
    model.add(layers.Conv2D(128, (1, 1), strides=(2, 2), padding="same"))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.2))

    model.add(layers.Flatten())
    model.add(layers.Dense(1, activation="sigmoid"))

    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss="binary_crossentropy", optimizer=opt, metrics=["accuracy"])
    return model

In [None]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
    d_model.trainable = False
    # connect them
    model = Sequential()
    # add generator
    model.add(g_model)
    # add the discriminator
    model.add(d_model)
    # compile model
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss="binary_crossentropy", optimizer=opt)
    return model

In [None]:
def load_real_samples(arr_len=1000):
    with h5py.File("data/stead_learn_100_hz.hdf5", "r") as f:
        keys = f["keys"][:arr_len]
        labels = f["labels"][:arr_len]
        data = f["data"][:arr_len]
        return data, keys, labels

In [None]:
# select real samples
def generate_real_samples(dataset, n_samples):
    # choose random instances
    ix = randint(0, dataset.shape[0], n_samples)
    # retrieve selected images
    X = dataset[ix]
    # generate 'real' class labels (1)
    y = ones((n_samples, 1))
    return X, y

In [None]:
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
    # Generate latent space points; we multiply by 2 due to complex number data type
    x_input = randn(2 * latent_dim * n_samples).view(np.complex128) 
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n_samples, latent_dim)
    return x_input

In [None]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(g_model, latent_dim, n_samples):
    # generate points in latent space
    x_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    X = g_model.predict(x_input)
    # create 'fake' class labels (0)
    y = zeros((n_samples, 1))
    return X, y

In [None]:
# create and save a plot of generated images
def save_plot(examples, epoch):
    examples = examples.reshape(examples.shape[0], 64, 64)
    for idx, _ in enumerate(examples[:5]):
        inversed = istft(examples[idx], window="hanning", fs=66, nperseg=127)
        plot_single_stream(
            inversed[1][:4000],
            f"GAN Event (epoch {epoch+1})",
            fs=66,
            nperseg=127,
            file_path=f"out/epoch_{epoch+1}_image_{idx}.png"
        )

In [None]:
# evaluate the discriminator, plot generated images, save generator model
def summarize_performance(epoch, g_model, d_model, dataset, latent_dim, n_samples=150):
    # prepare real samples
    X_real, y_real = generate_real_samples(dataset, n_samples)
    # evaluate discriminator on real examples
    _, acc_real = d_model.evaluate(X_real, y_real, verbose=0)
    # prepare fake examples
    x_fake, y_fake = generate_fake_samples(g_model, latent_dim, n_samples)
    # evaluate discriminator on fake examples
    _, acc_fake = d_model.evaluate(x_fake, y_fake, verbose=0)
    # summarize discriminator performance
    logging.info(f">Accuracy real: {acc_real * 100}, fake: {acc_fake * 100}")
    save_plot(x_fake, epoch)
    # save the generator model tile file
    g_model.save(f"out/gen-{epoch+1}")
    d_model.save(f"out/desc-{epoch+1}")

In [None]:
# train the generator and discriminator
def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=50, n_batch=128):
    bat_per_epo = int(dataset.shape[0] / n_batch)
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_epochs):
        # enumerate batches over the training set
        for j in range(bat_per_epo):
            # get randomly selected 'real' samples
            X_real, y_real = generate_real_samples(dataset, half_batch)
            # update discriminator model weights
            d_loss1, _ = d_model.train_on_batch(X_real, y_real)
            # generate 'fake' examples
            X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
            # update discriminator model weights
            d_loss2, _ = d_model.train_on_batch(X_fake, y_fake)
            # prepare points in latent space as input for the generator
            X_gan = generate_latent_points(latent_dim, n_batch)
            # create inverted labels for the fake samples
            y_gan = ones((n_batch, 1))
            # update the generator via the discriminator's error
            g_loss = gan_model.train_on_batch(X_gan, y_gan)
            # summarize loss on this batch
            logging.info(f"Epoch {i + 1}, batch {j + 1}/{bat_per_epo}, {d_loss1=}, {d_loss2=}, {g_loss=}")
        # evaluate the model performance, sometimes
        if (i + 1) % 10 == 0:
            summarize_performance(i, g_model, d_model, dataset, latent_dim)

In [None]:
data, keys, labels = load_real_samples()

In [None]:
idx = 11
plot_all(istft(data[idx].transpose(2, 0, 1), fs=100, nperseg=155)[1][:6000], label=f"{keys[idx]}", fs=100, nperseg=155)

In [None]:
test_gen = make_generator_model(100)
test_fake = generate_fake_samples(test_gen, 100, 1)
plot_all(istft(test_fake[0][0].transpose(2, 0, 1), fs=100, nperseg=155)[1], label=f"{keys[idx]}", fs=100, nperseg=155)

In [None]:
# size of the latent space
latent_dim = 100
# create the discriminator
d_model = make_discriminator_model()
# create the generator
g_model = make_generator_model(latent_dim)
# create the gan
gan_model = define_gan(g_model, d_model)
# load image data
dataset, _, _ = load_real_samples()
# train model
train(g_model, d_model, gan_model, dataset, latent_dim)