<a href="https://colab.research.google.com/github/lsteffenel/M2Atmo_et_Climat/blob/main/12_Comparaison_TS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import random
from math import sin, cos
import matplotlib.pyplot as plt

def get_time_series_data(length):
    a = .2
    b = 300
    c = 20
    ls = 5
    ms = 20
    gs = 100

    ts = []

    for i in range(length):
        ts.append(b + a * i + ls * sin(i / 5) + ms * cos(i / 24) + gs * sin(i / 120) + c * random.random())

    return ts



In [None]:
import numpy as np
from sklearn.model_selection import train_test_split

def get_time_series_datasets(features, ts_len):
    ts = get_time_series_data(ts_len)
    X = []
    Y = []
    for i in range(features + 1, ts_len):
        X.append(ts[i - (features + 1):i - 1])
        Y.append([ts[i]])

    X = np.array(X, dtype=np.float32)
    Y = np.array(Y, dtype=np.float32)

    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, shuffle=False)
    X_val, X_test, Y_val, Y_test = train_test_split(X_test, Y_test, test_size=0.5, shuffle=False)

    return X_train, X_val, X_test, Y_train, Y_val, Y_test


In [None]:
import numpy as np
from tensorflow import keras

class DummyPredictor(keras.Model):
    def call(self, x):
        x_np = x.numpy() if hasattr(x, "numpy") else np.asarray(x)
        last_values = x_np[:, -1].reshape(-1, 1)  # dernière valeur de chaque ligne, format (batch, 1)
        return last_values


In [None]:
from scipy import interpolate
import numpy as np
from tensorflow import keras

class InterpolationPredictor(keras.Model):
    def call(self, x):
        # x est normalement un tf.Tensor ou numpy array
        x_np = x.numpy() if hasattr(x, "numpy") else np.asarray(x)
        last_values = []
        for v in x_np:
            xi = np.arange(0, len(v))
            interp = interpolate.interp1d(xi, v, fill_value='extrapolate')
            # On extrapole juste après la dernière valeur
            pred = interp(len(v))
            last_values.append([pred])
        return np.array(last_values, dtype=np.float32)



In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import numpy as np
from tensorflow import keras

class HwesPredictor(keras.Model):
    def call(self, x):
        # x : tf.Tensor or np.ndarray (shape: batch, time_steps)
        x_np = x.numpy() if hasattr(x, "numpy") else np.asarray(x)
        last_values = []
        for r in x_np:
            model = ExponentialSmoothing(r)
            results = model.fit()
            forecast = results.forecast()
            last_values.append([forecast[0]])
        return np.array(last_values, dtype=np.float32)


In [None]:
from tensorflow import keras
from tensorflow.keras import layers

def build_fcnn_sequential(n_inp, l_1, l_2, n_out):
    model = keras.Sequential([
        layers.Dense(l_1, activation='relu', input_shape=(n_inp,)),
        layers.Dense(l_2, activation='relu'),
        layers.Dense(n_out)  # Pas d'activation finale
    ])
    return model



In [None]:
import tensorflow as tf
from tensorflow import keras
import random

random.seed(1)
tf.random.set_seed(1)

features = 256
ts_len = 3_000

x_train, x_val, x_test, y_train, y_val, y_test = get_time_series_datasets(features, ts_len)

net = build_fcnn_sequential(n_inp=features, l_1=64, l_2=32, n_out=1)
net.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.MeanSquaredError(),
    metrics=[keras.metrics.MeanSquaredError()]
)

# Définition des callbacks
callbacks = [
    #keras.callbacks.EarlyStopping(monitor='val_loss', patience=20, verbose=1, restore_best_weights=True),
    keras.callbacks.ModelCheckpoint('best_model.keras', monitor='val_loss', save_best_only=True, verbose=1)
]

# Entraînement du modèle avec callbacks
history = net.fit(
    x_train, y_train,
    epochs=1000, #10000
    batch_size=len(x_train),
    validation_data=(x_val, y_val),
    verbose=1,
    callbacks=callbacks
)

# Accès aux courbes de perte
training_loss = history.history['loss']
validation_loss = history.history['val_loss']


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf

# Les predict() Keras renvoient déjà des numpy arrays ou tensors convertibles, idem pour vos prédicteurs custom
print('Testing')
print(f'FCNN Loss: {loss_func(net.predict(x_test), y_test)}')
print(f'Dummy Loss: {loss_func(dummy_predictor(x_test), y_test)}')
print(f'Linear Interpolation Loss: {loss_func(interpolation_predictor(x_test), y_test)}')
print(f'HWES Loss: {loss_func(hwes_predictor(x_test), y_test)}')

plt.title("Training progress")
plt.yscale("log")
plt.plot(training_loss, label = 'training loss')
plt.plot(validation_loss, label = 'validation loss')
plt.legend()
plt.show()

plt.title("FCNN on Train Dataset")
plt.plot(y_train, label = 'actual')
plt.plot(net.predict(x_train).flatten(), label = 'predicted')
plt.legend()
plt.show()

plt.title('Test')
plt.plot(y_test, '--', label = 'actual')
plt.plot(net.predict(x_test).flatten(), label = 'FCNN')
plt.plot(hwes_predictor(x_test).flatten(), label = 'HWES')
plt.legend()
plt.show()

test_n = len(y_test)
net_abs_dev = np.abs(net.predict(x_test).flatten() - y_test.flatten())
hwes_abs_dev = np.abs(hwes_predictor(x_test).flatten() - y_test.flatten())

diff_pos = np.clip(hwes_abs_dev - net_abs_dev, 0, None)
diff_min = -np.clip(net_abs_dev - hwes_abs_dev, 0, None)

plt.title('HWES Predictor VS FCNN Predictor')
plt.hlines(0, xmin=0, xmax=test_n, linestyles='dashed')
plt.bar(list(range(test_n)), diff_pos, color='g', label='FCNN Wins')
plt.bar(list(range(test_n)), diff_min, color='r', label='HWES Wins')
plt.legend()
plt.show()
