## Example curve prediction script
In this notebook we will try to predict the function coefficients.

In [None]:
%matplotlib widget

import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow import keras

from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt
from sklearn import preprocessing

from generate_TF import GenerateTF, get_freq
from scipy.optimize import curve_fit
from prettytable import PrettyTable

from pytorchClassifiers import get_keras_nn, plot_history

avg_pool1d = keras.layers.AveragePooling1D



In [None]:
# Load the data
df = pd.read_pickle('./data/tf-ampl-response-82000-noise0.1.pkl')
df.head()

In [None]:
# Extract the target variables
phase = df.pop('phase')
gain = df.pop('gain')

# All fmax, np should be equal
fmax = df.iloc[0].fmax
NP = df.iloc[0].np
df.drop(columns=['fmax', 'np'], inplace=True)

In [None]:
# target_orig is the vector with the originale phase, gain labels
target_orig = np.array((phase, gain), dtype=np.float32).T

target_scaler = preprocessing.StandardScaler().fit(target_orig)
# target is scaled, better for training
target = target_scaler.transform(target_orig)


In [None]:
# our dataset is 3D
values = np.zeros((len(df), len(df.iloc[0].real), 3), dtype=np.float32)
print(values.shape)
for index, row in df.iterrows():
    # print(row)
    values[index, :, 0] = row.real
    values[index, :, 1] = row.imag
    values[index, :, 2] = row.amplitude
data = values

In [None]:
# Split in train and test
X_train_orig, X_test_orig, y_train, y_test, y_train_orig, y_test_orig = train_test_split(
    data, target, target_orig, test_size=0.2, random_state=0)

# further divide X_test in test + validate
X_test_orig, X_validate_orig, y_test, y_validate, y_test_orig, y_validate_orig = \
    train_test_split(X_test_orig, y_test, y_test_orig, test_size=0.4, random_state=1)

X_train = X_train_orig[:, :, :2]
X_test = X_test_orig[:, :, :2]
X_validate = X_validate_orig[:, :, :2]

print(X_train.shape)
print(X_test.shape)
print(X_validate.shape)
print(y_train.shape)
print(y_test.shape)
print(y_validate.shape)


In [None]:
# load the models
model1 = keras.models.load_model('models/keras/regression/phase_best')
model2 = keras.models.load_model('models/keras/regression/gain_best')


In [None]:
def curve_fit_deluxe(func, freq, sample, trim_edges=0, kernel_size=1, stride=1, **kwargs):
    # center crop sample
    if trim_edges > 0:
        freq, sample = freq[trim_edges:-trim_edges], sample[trim_edges:-trim_edges]
    # prepare the shapes for avg_pooling
    freq = freq.reshape(1, -1, 1)
    sample = sample.reshape(1, -1, 1)
    # perform average pooling
    freq = avg_pool1d(pool_size=kernel_size, strides=stride)(freq).numpy().flatten()
    sample = avg_pool1d(pool_size=kernel_size, strides=stride)(sample).numpy().flatten()
    # pass to curve_fit
    return curve_fit(func, freq, sample, **kwargs)

In [None]:
# Get curve fit predictions
gen_tf = GenerateTF(fb_attn_index=3, with_noise=False)
freq = gen_tf.frequency.astype(np.float32)
y_optimizer = []
for sample in X_validate_orig[:, :, 2]:
    popt, _ = curve_fit_deluxe(gen_tf, freq, sample, trim_edges=130, kernel_size=4, stride=1,
                               bounds=([-20, 1e-4], [20, 1e-2]), method='trf')
    y_optimizer.append(popt)
y_optimizer = np.array(y_optimizer)


In [None]:
# Get model's predictions
y_nn_phase = model1.predict(X_validate).flatten()
y_nn_gain = model2.predict(X_validate).flatten()
y_nn = np.array([y_nn_phase, y_nn_gain]).T


In [None]:
# Descale predictions
y_nn_descaled = target_scaler.inverse_transform(y_nn)
y_nn_phase_descaled = y_nn_descaled[:, 0]
y_nn_gain_descaled = y_nn_descaled[:, 1]

phase_loss = model1.evaluate(X_validate, y_validate[:, 0])
gain_loss = model2.evaluate(X_validate, y_validate[:, 1])


In [None]:
# I want to make a heatmap, x axis phase, y axis gain, z: accuracy
x = np.unique(target_orig[:, 0])
y = np.unique(target_orig[:, 1])

# Z phase shows the correctly predicted phase per gain
z_phase = np.zeros((2, len(y)), dtype=float)
# Z gain shows the correctly predicted phase per phase
z_gain = np.zeros((2, len(px)), dtype=float)

# phase_one_off_acc = 0
# gain_one_off_acc = 0

# Tolerance for phase: 0.5
# tolerance for gain: 0.0001

for i in range(len(X_validate)):
    true_p, true_g = y_validate[i].numpy()
    pred_p, pred_g = y_nn_descaled[i].numpy()
    z_phase[1][true_g] += 1
    # Increase by 1 if predicted phase is equal to true phase
    z_phase[0][true_g] += np.abs(true_p - pred_p) < 0.5

    z_gain[1][true_p] += 1
    z_gain[0][true_p] += np.abs(true_g - pred_g) < 0.5

    # if np.abs(true_p - pred_p) < 2:
    #     phase_one_off_acc += 1

    # if np.abs(true_g - pred_g) < 2:
    #     gain_one_off_acc += 1


phase_acc = np.sum(z_phase[0]) / np.sum(z_phase[1])
gain_acc = np.sum(z_gain[0]) / np.sum(z_gain[1])
z_phase[0] /= z_phase[1]
z_gain[0] /= z_gain[1]

# caclulate 1 degree off accuracy
# phase_one_off_acc /= len(y_validate)
# gain_one_off_acc /= len(y_validate)


In [None]:
import matplotlib.pyplot as plt

fig, (ax1, ax2) = plt.subplots(nrows=2)
plt.sca(ax1)
bars = plt.bar(np.arange(len(z_phase[0])), z_phase[0], width=0.6, edgecolor='black', color='tab:blue',
               label=f'Phase accuracy per gain, AVG: {phase_acc:.3f}')
ax1.bar_label(bars, fmt='%.3f', rotation='vertical')
# for i, v in enumerate(z_phase[0]):
#     ax1.text()
plt.ylim(0, 1.05)
plt.xlabel('Gain')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')

plt.sca(ax2)
plt.bar(np.arange(len(z_gain[0])), z_gain[0], width=0.6, edgecolor='black', color='tab:blue',
        label=f'Gain accuracy per phase, AVG: {gain_acc:.3f}')
plt.ylim(0, 1.05)
plt.xlabel('Phase')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.tight_layout()
# plt.title('Gain prediction accuracy, per phase')


In [None]:
from sklearn.metrics import r2_score, mean_squared_error

r2_nn = r2_score(y_validate_orig, y_nn_descaled,
                   multioutput='raw_values')
mse_nn = mean_squared_error(y_validate_orig, y_nn_descaled,
                               multioutput='raw_values')

r2_opt = r2_score(y_validate_orig, y_optimizer,
                  multioutput='raw_values')
mse_opt = mean_squared_error(y_validate_orig, y_optimizer,
                              multioutput='raw_values')

print('R2\tPhase\tGain')
print('NeuralNet: ', r2_nn)
print('Optimizer:', r2_opt)

print('MSE\tPhase\tGain')
print('NeuralNet: ', mse_nn)
print('Optimizer:', mse_opt)



In [None]:
plt.figure(figsize=(7, 6))

table = PrettyTable()
table.field_names = ["idx", "param", "original", "NN", "Opt"]

gen_tf = GenerateTF(fb_attn_index=3, with_noise=False)
freq = gen_tf.frequency.astype(np.float32)

for idx in np.random.choice(np.arange(0, len(X_validate)), size=3):
    try:
        popt, _ = curve_fit_deluxe(gen_tf, freq, X_validate_orig[idx, :, 2], trim_edges=130, kernel_size=4, stride=1,
                                   bounds=([-20, 1e-4], [20, 1e-2]), method='trf')
    except:
        print(f'Scipy curve fit failed for idx: {idx}')
        continue

    table.add_row([idx, 'phase', y_validate_orig[idx]
                  [0], y_nn_descaled[idx][0], popt[0]])
    table.add_row([idx, 'gain', y_validate_orig[idx][1],
                  y_nn_descaled[idx][1], popt[1]])

    p = plt.plot(
        freq, gen_tf(freq, *(y_validate_orig[idx])), label=f'real_{idx}', ls='-')
    plt.plot(freq, gen_tf(
        freq, *(y_nn_descaled[idx])), label=f'NN_{idx}', ls='--', color=p[0].get_color())
    plt.plot(freq, gen_tf(freq, *popt),
             label=f'opt_{idx}', ls=':', color=p[0].get_color())
    # plt.plot(x, gen_tf(x, *poptModel), label=f'opt+model_{idx}', ls='-.', color=p[0].get_color())
print(table)
plt.legend(ncol=3)
plt.tight_layout()
