In [None]:
import numpy as np
import matplotlib.pyplot as plt
import yaml
import tensorflow as tf
import tensorflow.keras.backend as K

from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import Callback, EarlyStopping

%matplotlib inline

In [None]:
def create_replicas(x_data, x_err, n_rep = 100):
    x_dist = np.zeros((n_rep, x_data.shape[0]))
    for i, mean in enumerate(x_data):
        x_dist[:,i] = np.random.normal(loc=mean, scale=x_err[i], size=n_rep)
    return x_dist

In [None]:
def split_trval(x_data, y_data, y_sys, y_stat, perc = 0.3):
    size_val = round(len(x_data)*perc)
    idx = np.random.choice(np.arange(1, len(x_data)-1, 2), size_val, replace=False)
    x_val = x_data[idx]
    y_val = y_data[idx]
    y_val_sys = y_sys[idx]
    y_val_stat = y_stat[idx]
    x_tr = np.delete(x_data, idx)
    y_tr = np.delete(y_data, idx)
    y_tr_sys = np.delete(y_sys, idx)
    y_tr_stat = np.delete(y_stat, idx)

    return x_tr, y_tr, x_val, y_val, y_tr_sys, y_tr_stat, y_val_sys, y_val_stat

In [None]:
def load_data():
    filename = "./data/DATA_CHORUS_0.02.yaml"
    with open(filename, "r") as file:
        input_data = yaml.safe_load(file)
    x = input_data["x"]
    Q2 = np.array(input_data["Q2"])
    F_2 = np.array(input_data["F_2"])
    F_2_err_stat = np.array(input_data["F_2_err_stat"])
    F_2_err_sys = np.array(input_data["F_2_err_sys"])
    F_2_err = F_2_err_stat + F_2_err_sys
    
    Q2_tr, y_tr, Q2_val, y_val, y_tr_sys, y_tr_stat, y_val_sys, y_val_stat = split_trval(Q2, F_2, F_2_err_sys, F_2_err_stat)
    
    return {"x": x, "Q2": Q2, "y": F_2, "y_stat": F_2_err_stat, "y_sys": F_2_err_sys, "Q2_tr": Q2_tr, "y_tr": y_tr, "Q2_val": Q2_val, "y_val": y_val, "y_tr_sys": y_tr_sys, "y_tr_stat": y_tr_stat, "y_val_sys": y_val_sys, "y_val_stat": y_val_stat}

In [None]:
def compute_covmat(data_dict, data_set = ""):
    data_set = "y" + data_set
    ndata = data_dict[data_set].shape[0]
    covmat = np.zeros((ndata, ndata))
    for i in range(ndata):
        for j in range(ndata):
            covmat[i, j] = (
                data_dict[data_set+"_sys"][i] * data_dict[data_set+"_sys"][j]
                + data_dict[data_set][i] * data_dict[data_set][j]
            )
            if i == j:
                covmat[i, j] += data_dict[data_set+"_stat"][i] ** 2
    
    return ndata, covmat

In [None]:
def get_covmat(y_data, y_sys, y_stat):
    ndata = y_data.shape[0]
    covmat = np.zeros((ndata, ndata))
    for i in range(ndata):
        for j in range(ndata):
            covmat[i, j] = (
                y_sys[i] * y_sys[j]
                + y_data[i] * y_data[j]
            )
            if i == j:
                covmat[i, j] += y_stat[i] ** 2
    
    return ndata, covmat

In [None]:
covmat_tr.shape, covmat_val.shape

In [None]:
def chi2_with_covmat(covmat_tr, covmat_val, ndata_tr, ndata_val):
    inverted_tr = np.linalg.inv(covmat_tr)
    inverted_val = np.linalg.inv(covmat_val)
    # Convert numpy array into tensorflow object
    invcovmat_tr = K.constant(inverted_tr)
    invcovmat_val = K.constant(inverted_val)

    def costum_loss(y_true, y_pred):
        
        def custom_loss_tr(y_true, y_pred):
            # (yt - yp) * covmat * (yt - yp)
            tmp = y_true - y_pred

            right_dot = tf.tensordot(invcovmat_tr, K.transpose(tmp), axes=[[1], [1]])
            loss = tf.tensordot(tmp, right_dot, axes=[[0],[0]]) / ndata_tr
            return loss

        def custom_loss_val(y_true, y_pred):
            # (yt - yp) * covmat * (yt - yp)
            tmp = y_true - y_pred

            right_dot = tf.tensordot(invcovmat_val, K.transpose(tmp), axes=[[1], [1]])
            loss = tf.tensordot(tmp, right_dot, axes=[[0],[0]]) / ndata_val
            return loss
        
        return K.in_train_phase(custom_loss_tr(y_true, y_pred), custom_loss_val(y_true, y_pred))
        

    return costum_loss

In [None]:
data_dict = load_data()

In [None]:
# Create the NN model
model = Sequential()
model.add(Dense(units = 1, activation = 'linear', input_shape=[1]))
model.add(Dense(units = 32, activation = 'relu'))
model.add(Dense(units = 32, activation = 'relu'))
model.add(Dense(units = 1, activation = 'linear'))

ndata_tr, covmat_tr = compute_covmat(data_dict, "_tr")
ndata_val, covmat_val = compute_covmat(data_dict, "_val")
model.compile(loss=chi2_with_covmat(covmat_tr, covmat_val, ndata_tr, ndata_val), optimizer="adam", metrics=["accuracy"])
model.outputs[0]._uses_learning_phase = True

# early stopping
es = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=50)

# Display the model
model.summary()

In [None]:
model.fit(data_dict["Q2_tr"], data_dict["y_tr"], validation_data=(data_dict["Q2_val"], data_dict["y_val"]), epochs=100, verbose=0)

In [None]:
data = np.array([1,2,3,4])
y_true = K.constant([2,3])

mask = []
for y in y_true:
    mask.append(np.where(y == data)[0][0])
mask

In [None]:
Q2_grid = np.linspace(data_dict["Q2"][0], data_dict["Q2"][-1], 100)
y_pred = model.predict(Q2_grid)

plt.plot(Q2_grid, y_pred)
plt.scatter(data_dict["Q2"], data_dict["y"])

In [None]:
# Loop over replicas
n_rep = 10
F_2_reps = create_replicas(F_2, F_2_err, n_rep = n_rep)
x_pred = np.linspace(Q2[0], Q2[-1], num=100)
y_pred = []

for y_data in F_2_reps:
    x_tr, y_tr, x_val, y_val = split_trval(Q2, y_data)
    model.fit(x_tr, y_tr, validation_data=(x_val ,y_val), epochs=1000, batch_size=10, verbose=0, callbacks=[es])
    
    y_pred.append(model.predict(x_pred))

In [None]:
p1_high = np.nanpercentile(y_pred,84,axis=0)
p1_low = np.nanpercentile(y_pred,16,axis=0)
p1_mid = (p1_high + p1_low )/2.
p1_error = (p1_high - p1_low )/2.

p1_mid = p1_mid.reshape(-1)
p1_error = p1_error.reshape(-1)

In [None]:
plt.errorbar(Q2, F_2, yerr=F_2_err, label = "Data", fmt="ko", capsize=5)
plt.fill_between(x_pred, y1=p1_mid-p1_error, y2=p1_mid+p1_error, color="red", edgecolor="red", label="Prediction", alpha=0.25)
plt.plot(x_pred, p1_mid, color="red", linestyle="dashed")
plt.legend()
plt.xlabel("$Q^2$ [GeV$^2$]")
plt.ylabel("$F_2$")
plt.title(f"Prediction of $F_2$ at $x={x}$")