In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import neat

DATA_PATH = "salesdaily-data.xlsx"
CONFIG_PATH = "NEAT.txt"

TARGET_COL = "M01AE"
L = 2
H = 1

TRAIN_START, TRAIN_END = "2014-01-02", "2019-06-30"
TEST_START,  TEST_END  = "2019-07-01", "2019-10-08"

N_GENERATIONS = 100
RANDOM_SEED = 42

BATCH_FRACTION = 0.35    
L2_PENALTY = 1e-4        
EPS = 1e-8

df = pd.read_excel(DATA_PATH)
df["datetime"] = pd.to_datetime(df["datetime"])
df = df.sort_values("datetime").reset_index(drop=True)

df = df[["datetime", TARGET_COL]].copy()

df["month"] = df["datetime"].dt.month.astype(float)      # 1..12
df["dow"]   = df["datetime"].dt.dayofweek.astype(float)  # 0..6

df["lag1"] = df[TARGET_COL].shift(1)
df["lag2"] = df[TARGET_COL].shift(2)
df["y"]    = df[TARGET_COL].astype(float)

df = df.dropna(subset=["lag1", "lag2", "y"]).reset_index(drop=True)

train_mask = (df["datetime"] >= pd.to_datetime(TRAIN_START)) & (df["datetime"] <= pd.to_datetime(TRAIN_END))
test_mask  = (df["datetime"] >= pd.to_datetime(TEST_START))  & (df["datetime"] <= pd.to_datetime(TEST_END))

train_df = df.loc[train_mask].copy()
test_df  = df.loc[test_mask].copy()

# -----------------------------
# Scale using TRAIN only
# -----------------------------
def scale_month(m): return (m - 1.0) / 11.0
def scale_dow(d):   return d / 6.0

train_df["month_s"] = scale_month(train_df["month"])
train_df["dow_s"]   = scale_dow(train_df["dow"])
test_df["month_s"]  = scale_month(test_df["month"])
test_df["dow_s"]    = scale_dow(test_df["dow"])

y_mean = train_df["y"].mean()
y_std  = train_df["y"].std() + 1e-8

for col in ["y", "lag1", "lag2"]:
    train_df[col + "_s"] = (train_df[col] - y_mean) / y_std
    test_df[col + "_s"]  = (test_df[col]  - y_mean) / y_std

FEATURE_COLS = ["month_s", "dow_s", "lag1_s", "lag2_s"]
TARGET_SCALED = "y_s"

X_train = train_df[FEATURE_COLS].to_numpy(dtype=float)
y_train = train_df[TARGET_SCALED].to_numpy(dtype=float)

# Naive baseline (scaled): predict y(t) ≈ lag1_s
naive_train = train_df["lag1_s"].to_numpy(dtype=float)

X_test   = test_df[FEATURE_COLS].to_numpy(dtype=float)
y_test   = test_df["y"].to_numpy(dtype=float)  # unscaled actual
test_dates = test_df["datetime"].to_numpy()

# For evaluating baseline on test too (optional diagnostic)
naive_test_unscaled = test_df["lag1"].to_numpy(dtype=float)


# -----------------------------
# NEAT: baseline-relative fitness
# -----------------------------
rng = np.random.default_rng(RANDOM_SEED)

def eval_genomes(genomes, config):
    """
    Fitness encourages beating the naive baseline (lag1 predictor).

    Define:
      mse_g     = MSE(genome_pred, y_true) on a batch
      mse_naive = MSE(lag1, y_true) on same batch

    Fitness:
      improvement = 1 - (mse_g / (mse_naive + eps))
    - If genome matches baseline => ~0
    - Better than baseline => positive
    - Worse than baseline => negative

    Also add tiny L2 penalty on output magnitude for stability.
    """
    n = len(y_train)
    batch_size = max(256, int(BATCH_FRACTION * n))
    idx = rng.choice(n, size=batch_size, replace=False)

    Xb = X_train[idx]
    yb = y_train[idx]
    nb = naive_train[idx]

    mse_naive = np.mean((nb - yb) ** 2) + EPS

    for _, genome in genomes:
        net = neat.nn.FeedForwardNetwork.create(genome, config)

        preds = np.empty(batch_size, dtype=float)
        for i, x in enumerate(Xb):
            preds[i] = net.activate(x)[0]

        mse_g = np.mean((preds - yb) ** 2)

        # baseline-relative improvement
        improvement = 1.0 - (mse_g / mse_naive)

        # tiny stability penalty (discourages extreme outputs)
        penalty = L2_PENALTY * float(np.mean(preds ** 2))

        genome.fitness = float(improvement - penalty)


# -----------------------------
# Run evolution (progress bar over generations)
# -----------------------------
def run_neat():
    config = neat.Config(
        neat.DefaultGenome,
        neat.DefaultReproduction,
        neat.DefaultSpeciesSet,
        neat.DefaultStagnation,
        str(CONFIG_PATH),
    )

    p = neat.Population(config)
    p.add_reporter(neat.StdOutReporter(False))
    stats = neat.StatisticsReporter()
    p.add_reporter(stats)

    winner = None
    for _ in tqdm(range(N_GENERATIONS), desc="NEAT evolution (generations)"):
        winner = p.run(eval_genomes, 1)

    return winner, config


winner, config = run_neat()


# -----------------------------
# Predict on TEST (progress bar)
# -----------------------------
winner_net = neat.nn.FeedForwardNetwork.create(winner, config)

pred_test_s = np.empty(len(X_test), dtype=float)
for i, x in enumerate(tqdm(X_test, desc="Predicting on test")):
    pred_test_s[i] = winner_net.activate(x)[0]

pred_test = pred_test_s * y_std + y_mean  # unscale


# -----------------------------
# Plot: Actual vs Predicted on test (plus naive baseline)
# -----------------------------
plt.figure(figsize=(10, 4))
plt.plot(test_dates, y_test, label="Actual")
plt.plot(test_dates, pred_test, label="NEAT Predicted")
plt.plot(test_dates, naive_test_unscaled, label="Naive (lag1)", alpha=0.7)
plt.title(f"NEAT Forecast — {TARGET_COL} (L=2, H=1) Test")
plt.xlabel("Date")
plt.ylabel("Sales")
plt.legend()
plt.tight_layout()
plt.show()
