### Cross Entropy for Rule Updating
---
#### Concept before Implementation:
- When our apriori rules have a high cross entropy, this indicates that during this time period, this rule provides less confidence. 
- This observation can lead to either updating the rule or telling users that they should wait until there is less noise between these two assets to go ahead and buy, sell, or stay in the current position. 
- POTENIAL ISSUE: Sliding windows for this will be hard to deal with. How do we know what period we should measure cross-entropy in? 
- POTENTIAL SOLUTION: Try cross entropy with multiple fixed sized windows. After this, use an attention model such as LTSM to mathematically compute the time window, and finally compare the results of these findings.

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import numpy.typing as npt
import numba

In [2]:
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks

In [15]:
from sklearn.metrics import f1_score, roc_auc_score

In [2]:
@numba.njit
def cross_entropy_histogram(
    returns_A: npt.NDArray[np.float64],
    returns_B: npt.NDArray[np.float64],
    epsilon: float = 1e-11
) -> float:
    """
    Compute cross entropy H(p, q) where p is the distribution of returns_A
    and q is the distribution of returns_B, approximated by normalized histograms.
    
    returns_A, returns_B: 1D arrays of returns in the given time window.
    epsilon: small value to avoid log(0).
    """
    n_rows_a = np.shape(returns_A)

    bins = int(np.ceil(np.log2(n_rows_a + 1)))
    hist_A, bin_edges = np.histogram(returns_A, bins=bins, density=True)
    hist_B, _ = np.histogram(returns_B, bins=bin_edges, density=True)

    p = hist_A
    q = hist_B

    ce = -np.sum(p * np.log(q + epsilon))
    return ce

#### Notes with this method: 
- The return streams, specifically A because it determines bucket size, we need to find a new method to determine window size

___

In [3]:
def _ce_matrix(
    a: npt.NDArray[np.float64],
    b: npt.NDArray[np.float64],
    c: npt.NDArray[np.float64],
    epsilon: float
) -> npt.NDArray[np.float64]:
    """
    Helper – given three 1‑D windows of equal length, return the 3×3
    directed cross‑entropy matrix exactly like your original routine.
    """
    # common bin edges (Sturges on the window size)
    n_bins = int(np.ceil(np.log2(len(a) + 1)))
    all_data = np.concatenate((a, b, c))
    _, edges = np.histogram(all_data, bins=n_bins, density=False)

    pA, _ = np.histogram(a, bins=edges, density=True)
    pB, _ = np.histogram(b, bins=edges, density=True)
    pC, _ = np.histogram(c, bins=edges, density=True)

    M = np.empty((3, 3), dtype=np.float64)
    M[0, 0] = -np.sum(pA * np.log(pA + epsilon))
    M[0, 1] = -np.sum(pA * np.log(pB + epsilon))
    M[0, 2] = -np.sum(pA * np.log(pC + epsilon))

    M[1, 0] = -np.sum(pB * np.log(pA + epsilon))
    M[1, 1] = -np.sum(pB * np.log(pB + epsilon))
    M[1, 2] = -np.sum(pB * np.log(pC + epsilon))

    M[2, 0] = -np.sum(pC * np.log(pA + epsilon))
    M[2, 1] = -np.sum(pC * np.log(pB + epsilon))
    M[2, 2] = -np.sum(pC * np.log(pC + epsilon))

    return M

### More than 2 assets 

In [5]:
# @numba.njit
def rolling_cross_entropy_histogram_3(
    returns_A: npt.NDArray[np.float64],
    returns_B: npt.NDArray[np.float64],
    returns_C: npt.NDArray[np.float64],
    window: int = 7,
    epsilon: float = 1e-11
) -> npt.NDArray[np.float64]:
    """
    Rolling cross‑entropy over time.

    Parameters
    ----------
    returns_A, returns_B, returns_C : 1‑D arrays of equal length T
        Daily return series for the three assets.
    window : int, default 7
        Number of days used to build the empirical distributions.
    epsilon : float
        Small constant to avoid log(0).

    Returns
    -------
    result : ndarray, shape (T‑window+1, 3, 3)
        result[t] is the 3×3 matrix H_t computed from
        days  t‑window+1 … t  (inclusive).
    """
    if not (len(returns_A) == len(returns_B) == len(returns_C)):
        raise ValueError("All three return arrays must have the same length.")
    if window < 2:
        raise ValueError("window must be at least 2.")

    T = len(returns_A)
    if window > T:
        raise ValueError("window larger than length of series.")

    out = np.empty((T - window + 1, 3, 3), dtype=np.float64)

    for t in range(window - 1, T):
        sl = slice(t - window + 1, t + 1)      # rolling window
        out[t - window + 1] = _ce_matrix(
            returns_A[sl], returns_B[sl], returns_C[sl], epsilon
        )

    return out

In [5]:
data_pd = pd.read_excel("Cross_Entropy_Updating.xlsx", sheet_name= "analysis")

In [6]:
data_np = data_pd.iloc[:, 1:].to_numpy()

In [7]:
results = rolling_cross_entropy_histogram_3(data_np[:, 0], data_np[:, 1], data_np[:, 2], 28)

In [8]:
results_df = pd.DataFrame(results[:, 2])

In [9]:
results_df.to_clipboard()

In [7]:
data_ce_pd = pd.read_excel("ce_review_1.xlsx", sheet_name= "python")

In [8]:
data_ce_np = data_ce_pd.to_numpy()

In [9]:
def make_dataset_np(
    series: np.ndarray,          # 1‑D array of length T
    lookback: int,               # L
    horizon: int,                # k
    alpha: float                 # quantile for divergence threshold
) -> tuple[np.ndarray, np.ndarray]:
    """
    Parameters
    ----------
    series   : shape (T,)
    lookback : number of past days fed to the network
    horizon  : number of days ahead used to define the label
    alpha    : quantile for the divergence threshold (e.g. 0.80)

    Returns
    -------
    X : ndarray, shape (samples, lookback, 1)
    y : ndarray, shape (samples,)  dtype int {0,1}
    """
    series = np.asarray(series, dtype=np.float64)
    T = series.size
    if T <= lookback + horizon:
        raise ValueError("series too short for given lookback & horizon")

    # 1) ΔH over the horizon  (length = T‑horizon)
    delta = series[horizon:] - series[:-horizon]

    # 2) binary label based on α‑quantile
    thresh = np.quantile(delta, alpha)
    y_full = (delta > thresh).astype(np.int8)        # length T‑horizon

    # 3) build rolling windows
    X_list, y_list = [], []
    for i in range(lookback, T - horizon):           # i is the *current* day
        X_list.append(series[i - lookback:i])        # past lookback values
        y_list.append(y_full[i])                     # diverge in next k days?

    X = np.asarray(X_list, dtype=np.float64)[..., None]  # add channel dim
    y = np.asarray(y_list, dtype=np.int8)

    return (X, y)


In [10]:
WINDOWS = [7, 14, 28, 60]           # same order as the columns

In [11]:
LOOKBACK = 30
HORIZON  = 5
ALPHA    = 0.80

datasets = {}
for j, w in enumerate(WINDOWS):          # j = column index
    X, y = make_dataset_np(
        data_ce_np[:, j],                # 1‑D slice for that window length
        LOOKBACK,
        HORIZON,
        ALPHA
    )
    datasets[w] = (X, y)

In [12]:
def ts_split(
    X: np.ndarray,
    y: np.ndarray,
    val_frac: float = 0.20
) -> tuple[tuple[np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray]]:
    """
    Deterministic split that keeps the **earlier** 1‑val_frac fraction for
    training and the **latest** val_frac fraction for validation.
    """
    split = int(len(X) * (1.0 - val_frac))
    return (X[:split], y[:split]), (X[split:], y[split:])

In [13]:
splits: dict[int, tuple[tuple[np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray]]] = {
    w: ts_split(*datasets[w]) for w in WINDOWS
}

In [14]:
def build_lstm(input_shape: tuple) -> tf.keras.Model:
    """
    input_shape = (LOOKBACK, n_channels)
    """
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.LSTM(32, return_sequences=False),
        layers.Dropout(0.2),
        layers.Dense(1, activation="sigmoid")
    ])
    model.compile(
        optimizer="adam",
        loss="binary_crossentropy",
        metrics=[tf.keras.metrics.AUC(name="auc")]
    )
    return model

In [17]:
LOOKBACK = datasets[WINDOWS[0]][0].shape[1]   # same for all

results = {}

for w, ((X_tr, y_tr), (X_val, y_val)) in splits.items():
    model = build_lstm((LOOKBACK, 1))
    es = callbacks.EarlyStopping(patience=10, restore_best_weights=True)

    model.fit(
        X_tr, y_tr,
        validation_data=(X_val, y_val),
        epochs=100,
        batch_size=32,
        verbose=0,
        callbacks=[es]
    )

    p_val = model.predict(X_val, verbose=0).ravel()
    auc   = roc_auc_score(y_val, p_val)
    f1    = f1_score(y_val, (p_val > 0.5).astype(int))

    results[w] = {"AUC": auc, "F1": f1}
    print(f"{w:>2}-day window →  AUC={auc:.3f}   F1={f1:.3f}")

 7-day window →  AUC=0.759   F1=0.104
14-day window →  AUC=0.591   F1=0.000
28-day window →  AUC=0.745   F1=0.000
60-day window →  AUC=0.597   F1=0.000


In [29]:
def build_lstm_attn(input_shape: tuple, lstm_units: int = 32) -> tf.keras.Model:
    n_channels = input_shape[-1]
    
    seq_in = layers.Input(shape=input_shape)    # shape: (LOOKBACK, n_channels)
    
    # 1) LSTM encoder on the time dimension
    lstm_out = layers.LSTM(lstm_units, return_sequences=False)(seq_in)  # (batch, lstm_units)
    
    # 2) Channel-wise attention:
    # (a) Get the last time step values per channel
    last_step = layers.Lambda(lambda t: t[:, -1, :])(seq_in)  # (batch, n_channels)
    
    # (b) Compute raw attention scores with a Dense layer (name it "att_dense")
    att_dense = layers.Dense(n_channels, name="att_dense")(last_step)  # (batch, n_channels)
    
    # (c) Apply softmax to get attention weights (Activation layer)
    att_soft = layers.Activation("softmax", name="attn")(att_dense)  # (batch, n_channels)
    
    # (d) Compute the weighted sum over the input channels
    context = layers.Lambda(
        lambda x: tf.reduce_sum(x[0] * x[1], axis=1, keepdims=True)
    )([last_step, att_soft])  # shape: (batch, 1)
    
    # 3) Concatenate the LSTM output and the context scalar.
    merged = layers.Concatenate()([lstm_out, context])  # (batch, lstm_units + 1)
    
    # 4) Final classification layer.
    out = layers.Dense(1, activation="sigmoid")(merged)
    
    model = models.Model(seq_in, out)
    model.compile(optimizer="adam",
                  loss="binary_crossentropy",
                  metrics=[tf.keras.metrics.AUC(name="auc")])
    return model


In [24]:
X_all = np.stack([datasets[w][0][..., 0] for w in WINDOWS], axis=-1)
y_all = datasets[WINDOWS[0]][1]  # Using the labels from one window (assumed to be identical across windows)

In [26]:
def ts_split(X, y, val_frac=0.2):
    split = int(len(X) * (1 - val_frac))
    return (X[:split], y[:split]), (X[split:], y[split:])

(X_tr, y_tr), (X_val, y_val) = ts_split(X_all, y_all)

In [30]:
# Assuming LOOKBACK and WINDOWS are defined:
model = build_lstm_attn((LOOKBACK, len(WINDOWS)))
es = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)

# X_all should have shape (samples, LOOKBACK, len(WINDOWS))
model.fit(
    X_all, y_all,
    validation_split=0.2,
    epochs=100,
    batch_size=32,
    callbacks=[es],
    verbose=0
)

# Inspect the attention weights:
# After training the model:
att_dense_weights = model.get_layer("att_dense").get_weights()[0]
print("Attention dense weights:", att_dense_weights)

Attention dense weights: [[-0.47787336 -0.7813181   0.40576404  0.27786258]
 [ 0.50365853 -0.40947962 -0.66947925  0.4415727 ]
 [ 0.5782949  -0.8263428  -0.6641926  -0.655264  ]
 [ 0.72733355 -0.6088978  -0.1244367  -0.61146486]]


In [32]:
# Build a sub-model to extract attention probabilities from the "attn" layer.
att_model = tf.keras.Model(inputs=model.input,
                           outputs=model.get_layer("attn").output)

# Use the sub-model to predict the attention probabilities for your validation set.
att_values = att_model.predict(X_val)

# For example, print the attention distribution for the first 5 validation samples.
print("Attention distributions for the first 5 samples:")
print(att_values[:15])


[1m11/11[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
Attention distributions for the first 5 samples:
[[0.0000000e+00 1.0000000e+00 0.0000000e+00 0.0000000e+00]
 [1.0000000e+00 0.0000000e+00 0.0000000e+00 6.1773922e-38]
 [0.0000000e+00 0.0000000e+00 0.0000000e+00 1.0000000e+00]
 [0.0000000e+00 1.0000000e+00 0.0000000e+00 1.0352684e-37]
 [0.0000000e+00 1.0000000e+00 0.0000000e+00 1.1190406e-32]
 [0.0000000e+00 1.0000000e+00 0.0000000e+00 1.2712054e-34]
 [0.0000000e+00 1.0000000e+00 0.0000000e+00 9.6068732e-36]
 [1.0000000e+00 0.0000000e+00 0.0000000e+00 0.0000000e+00]
 [0.0000000e+00 1.0000000e+00 0.0000000e+00 1.3140769e-26]
 [0.0000000e+00 1.0000000e+00 0.0000000e+00 0.0000000e+00]
 [0.0000000e+00 5.5701137e-14 0.0000000e+00 1.0000000e+00]
 [0.0000000e+00 6.0750152e-14 0.0000000e+00 1.0000000e+00]
 [9.3916673e-37 0.0000000e+00 0.0000000e+00 1.0000000e+00]
 [2.2225791e-37 7.1192921e-12 0.0000000e+00 1.0000000e+00]
 [8.3341433e-07 1.6233813e-16 0.0000000e+00 9.99999

In [34]:
attention_df = pd.DataFrame(att_values)

In [36]:
attention_df.to_clipboard()

In [33]:
att_values.shape

(345, 4)

In [None]:

def ts_split(X: np.ndarray, y: np.ndarray, val_frac: float = 0.20):
    split = int(len(X) * (1.0 - val_frac))
    return (X[:split], y[:split]), (X[split:], y[split:])

In [None]:
# ------------------------------------------
# Parameters for retraining loop
# ------------------------------------------
STEP_DAYS  = 20          # retrain every 20 samples (adjust as needed)
EXPAND_WIN = True        # if True, use all data up to the retraining point
MIN_TRAIN  = 250         # minimal training size (e.g. about one year of data)

# Here we assume X_all.shape[0] == number of samples (time steps)
# We'll simulate a date index as just the sample indices.
dates = np.arange(X_all.shape[0])

# Choose cut points starting after MIN_TRAIN samples, stepping every STEP_DAYS.
cut_points = dates[MIN_TRAIN::STEP_DAYS]

# Dictionary to hold the retrained models (keyed by cut point)
models = {}

for cut in cut_points:
    # ------------------------------------------
    # 1) Slice the data up to (but not including) the cut point
    # ------------------------------------------
    if EXPAND_WIN:
        X_cut = X_all[:cut]
        y_cut = y_all[:cut]
    else:
        # For a fixed-length sliding window, adjust the window length (e.g., 500 samples)
        window_size = 500
        if cut < window_size:
            continue
        X_cut = X_all[cut - window_size:cut]
        y_cut = y_all[cut - window_size:cut]

    # ------------------------------------------
    # 2) Split the data into training and validation sets
    # ------------------------------------------
    (X_tr, y_tr), (X_val, y_val) = ts_split(X_cut, y_cut, val_frac=0.20)

    # ------------------------------------------
    # 3) Build and train the model (using the updated build_lstm_attn)
    # ------------------------------------------
    model = build_lstm_attn((LOOKBACK, len(WINDOWS)))  # updated attention model
    es = tf.keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)
    
    model.fit(
        X_tr, y_tr,
        validation_data=(X_val, y_val),
        epochs=100,
        batch_size=32,
        verbose=0,
        callbacks=[es]
    )
    
    # ------------------------------------------
    # 4) Inspect the attention weights via the Dense layer
    # ------------------------------------------
    # Retrieve weights of the Dense layer named "att_dense".
    att_dense_weights = model.get_layer("att_dense").get_weights()[0]  # shape: (input_dim, n_channels)
    # Depending on the Dense implementation, the weight matrix has shape (input_dim, n_channels).
    # We can average the weights over the input dimension (here, input_dim corresponds to the last-step features)
    mean_att = att_dense_weights.mean(axis=0)
    
    print(f"Cut index {cut:4d} → Mean attention weights per channel: {dict(zip(WINDOWS, np.round(mean_att, 3)))}")
    
    # Optionally, you can also create a sub-model to get the actual softmax outputs:
    # att_model = tf.keras.Model(model.input, model.get_layer("attn").output)
    # att_values = att_model.predict(X_val)
    # print("Sample attention outputs (first validation sample):", att_values[0])
    
    models[cut] = model

print("Retraining complete. Models are stored in the `models` dictionary keyed by retraining index.")