### This example demonstrates the use of experience replay with XCSF
Uses the [cart-pole](https://gymnasium.farama.org/environments/classic_control/cart_pole/) problem from OpenAI gymnasium (v.0.28.1)

```
$ pip install gymnasium[classic-control]

```

**Note:** 

These hyperparameters do not result in consistently optimal performance.

Normalising the inputs and reward, and using prioritised experience replay may increase performance.

In [1]:
from __future__ import annotations

import json
import random
from collections import deque

from matplotlib import rcParams
import matplotlib.pyplot as plt
import imageio
import gymnasium as gym
import numpy as np
from IPython.display import display, Image
from tqdm import tqdm

import xcsf

RANDOM_STATE: int = 101
random.seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)

### Initialise OpenAI Gym problem environment

In [2]:
env = gym.make("CartPole-v1", render_mode="rgb_array")
X_DIM: int = int(env.observation_space.shape[0])
N_ACTIONS: int = int(env.action_space.n)

### Initialise XCSF

In [3]:
xcs = xcsf.XCS(
    x_dim=X_DIM,
    y_dim=N_ACTIONS,
    n_actions=1,
    omp_num_threads=12,
    random_state=RANDOM_STATE,
    pop_init=False,
    max_trials=1,  # one trial per fit()
    pop_size=200,
    theta_del=100,
    e0=0.001,
    alpha=1,
    beta=0.05,
    ea={
        "select_type": "roulette",
        "theta_ea": 100,
        "lambda": 2,
    },
    condition={
        "type": "neural",
        "args": {
            "layer_0": {  # hidden layer
                "type": "connected",
                "activation": "selu",
                "evolve_weights": True,
                "evolve_neurons": True,
                "n_init": 1,
                "n_max": 100,
                "max_neuron_grow": 1,
            },
            "layer_1": {  # output layer
                "type": "connected",
                "activation": "linear",
                "evolve_weights": True,
                "n_init": 1,
            },
        },
    },
    prediction={
        "type": "rls_quadratic",
    },
)

GAMMA: float = 0.95  # discount rate for delayed reward
epsilon: float = 1  # initial probability of exploring
EPSILON_MIN: float = 0.1  # the minimum exploration rate
EPSILON_DECAY: float = 0.98  # the decay of exploration after each batch replay
REPLAY_TIME: int = 1  # perform replay update every n episodes

print(json.dumps(xcs.internal_params(), indent=4))

{
    "version": "1.3.0",
    "x_dim": 4,
    "y_dim": 2,
    "n_actions": 1,
    "omp_num_threads": 12,
    "random_state": 101,
    "population_file": "",
    "pop_init": false,
    "max_trials": 1,
    "perf_trials": 1000,
    "pop_size": 200,
    "loss_func": "mae",
    "set_subsumption": false,
    "theta_sub": 100,
    "e0": 0.001,
    "alpha": 1,
    "nu": 5,
    "beta": 0.05,
    "delta": 0.1,
    "theta_del": 100,
    "init_fitness": 0.01,
    "init_error": 0,
    "m_probation": 10000,
    "stateful": true,
    "compaction": false,
    "ea": {
        "select_type": "roulette",
        "theta_ea": 100,
        "lambda": 2,
        "p_crossover": 0.8,
        "err_reduc": 1,
        "fit_reduc": 0.1,
        "subsumption": false,
        "pred_reset": false
    },
    "condition": {
        "type": "neural",
        "args": {
            "layer_0": {
                "type": "connected",
                "activation": "selu",
                "n_inputs": 4,
                "n_init

### Execute experiment

In [None]:
total_steps: int = 0  # total number of steps performed
MAX_EPISODES: int = 2000  # maximum number of episodes to run
N: int = 100  # number of episodes to average performance
memory: deque[tuple[np.ndarray, int, float, np.ndarray, bool]] = deque(maxlen=50000)
scores: deque[float] = deque(maxlen=N)  # used to calculate moving average

# for rendering an episode as a gif
SAVE_GIF: bool = True
SAVE_GIF_EPISODES: int = 50

frames: list[list[float]] = []
fscore: list[float] = []
ftrial: list[int] = []


def replay(replay_size: int = 5000) -> None:
    """Performs experience replay updates"""
    batch_size: int = min(len(memory), replay_size)
    batch = random.sample(memory, batch_size)
    for state, action, reward, next_state, done in batch:
        y_target = reward
        if not done:
            prediction_array = xcs.predict(next_state.reshape(1, -1))[0]
            y_target += GAMMA * np.max(prediction_array)
        target = xcs.predict(state.reshape(1, -1))[0]
        target[action] = y_target
        xcs.fit(
            state.reshape(1, -1), target.reshape(1, -1), warm_start=True, verbose=False
        )


def egreedy_action(state: np.ndarray) -> int:
    """Selects an action using an epsilon greedy policy"""
    if np.random.rand() < epsilon:
        return random.randrange(N_ACTIONS)
    prediction_array = xcs.predict(state.reshape(1, -1))[0]
    # break ties randomly
    best_actions = np.where(prediction_array == prediction_array.max())[0]
    return int(np.random.choice(best_actions))


def episode(episode_nr: int, create_gif: bool) -> tuple[float, int]:
    """Executes a single episode, saving to memory buffer"""
    episode_score: float = 0
    episode_steps: int = 0
    state: np.ndarray = env.reset()[0]
    while True:
        action = egreedy_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        episode_steps += 1
        episode_score += reward
        memory.append((state, action, reward, next_state, done))
        if create_gif:
            frames.append(env.render())
            fscore.append(episode_score)
            ftrial.append(episode_nr)
        if done:
            if create_gif:
                for _ in range(100):
                    frames.append(frames[-1])
                    fscore.append(fscore[-1])
                    ftrial.append(ftrial[-1])
            break
        state = next_state
    return episode_score, episode_steps


# learning episodes
for ep in range(MAX_EPISODES):
    gif: bool = False
    if SAVE_GIF and ep % SAVE_GIF_EPISODES == 0:
        gif = True
    # execute a single episode
    ep_score, ep_steps = episode(ep, gif)
    # perform experience replay updates
    if ep % REPLAY_TIME == 0:
        replay()
    # display performance
    total_steps += ep_steps
    scores.append(ep_score)
    mean_score = np.mean(scores)
    print(
        f"episodes={ep} "
        f"steps={total_steps} "
        f"score={mean_score:.2f} "
        f"epsilon={epsilon:.5f} "
        f"error={xcs.error():.5f} "
        f"msize={xcs.mset_size():.2f}"
    )
    # is the problem solved?
    if ep > N and mean_score > env.spec.reward_threshold:
        print(
            f"solved after {ep} episodes: "
            f"mean score {mean_score:.2f} > {env.spec.reward_threshold:.2f}"
        )
        break
    # decay the exploration rate
    if epsilon > EPSILON_MIN:
        epsilon *= EPSILON_DECAY

episodes=0 steps=37 score=37.00 epsilon=1.00000 error=0.66732 msize=1.02
episodes=1 steps=51 score=25.50 epsilon=0.98000 error=0.71819 msize=1.06
episodes=2 steps=65 score=21.67 epsilon=0.96040 error=0.70016 msize=3.15
episodes=3 steps=74 score=18.50 epsilon=0.94119 error=0.61321 msize=4.93
episodes=4 steps=95 score=19.00 epsilon=0.92237 error=0.60354 msize=6.93
episodes=5 steps=116 score=19.33 epsilon=0.90392 error=0.62890 msize=9.01
episodes=6 steps=131 score=18.71 epsilon=0.88584 error=0.50549 msize=11.07
episodes=7 steps=144 score=18.00 epsilon=0.86813 error=0.59151 msize=14.92
episodes=8 steps=160 score=17.78 epsilon=0.85076 error=0.36281 msize=16.91
episodes=9 steps=182 score=18.20 epsilon=0.83375 error=0.38654 msize=20.90
episodes=10 steps=198 score=18.00 epsilon=0.81707 error=0.41125 msize=25.53
episodes=11 steps=217 score=18.08 epsilon=0.80073 error=0.39090 msize=29.26
episodes=12 steps=232 score=17.85 epsilon=0.78472 error=0.29265 msize=33.93
episodes=13 steps=250 score=17.86

episodes=105 steps=24646 score=245.30 epsilon=0.11988 error=0.05638 msize=170.13
episodes=106 steps=25146 score=250.15 epsilon=0.11748 error=0.05087 msize=195.02
episodes=107 steps=25646 score=255.02 epsilon=0.11513 error=0.04691 msize=189.09
episodes=108 steps=26146 score=259.86 epsilon=0.11283 error=0.15545 msize=186.05
episodes=109 steps=26531 score=263.49 epsilon=0.11057 error=0.08577 msize=178.33
episodes=110 steps=27031 score=268.33 epsilon=0.10836 error=0.06463 msize=198.15
episodes=111 steps=27432 score=272.15 epsilon=0.10619 error=0.10311 msize=188.33
episodes=112 steps=27885 score=276.53 epsilon=0.10407 error=0.05344 msize=190.00
episodes=113 steps=28330 score=280.80 epsilon=0.10199 error=0.08789 msize=180.88
episodes=114 steps=28830 score=285.66 epsilon=0.09995 error=0.08909 msize=189.55
episodes=115 steps=29330 score=290.53 epsilon=0.09995 error=0.05468 msize=194.62
episodes=116 steps=29759 score=294.61 epsilon=0.09995 error=0.17719 msize=198.12
episodes=117 steps=30259 sco

episodes=207 steps=61239 score=355.93 epsilon=0.09995 error=0.01995 msize=187.37


### Final exploit episode

In [None]:
epsilon = 0
ep_score, ep_steps = episode(ep, SAVE_GIF)
print(f"score = {ep_score}, steps = {ep_steps}")

### Render the learning episodes

In [None]:
if SAVE_GIF:
    # add score and episode nr
    rcParams["font.family"] = "monospace"
    bbox = dict(boxstyle="round", fc="0.8")
    annotated_frames = list()
    bar = tqdm(total=len(frames), position=0, leave=True)
    for i in range(len(frames)):
        fig = plt.figure(dpi=90)
        fig.set_size_inches(3, 3)
        ax = fig.add_subplot(111)
        plt.imshow(frames[i])
        plt.axis("off")
        strial = str(ftrial[i])
        sscore = str(int(fscore[i]))
        text = f"episode = {strial:3s}, score = {sscore:3s}"
        ax.annotate(text, xy=(0, 100), xytext=(-40, 1), fontsize=12, bbox=bbox)
        fig.canvas.draw()
        annotated_frames.append(np.asarray(fig.canvas.renderer.buffer_rgba()))
        plt.close(fig)
        bar.refresh()
        bar.update(1)
    bar.close()
    # write gif
    imageio.mimsave("animation.gif", annotated_frames, duration=30)
    display(Image(open("animation.gif", "rb").read()))

In [None]:
env.close()  # close Gym