# Road towards Anticipatory Learning Classifier Systems

History of LCS, XCS, ACS, ACS2, YACS, others. Mention why XCS is not considered. Michigan vs Pittsburgh

In [2]:
import gym
import gym_maze  # noqa: F401
import numpy as np
import matplotlib.pyplot as plt

import pathlib
import random
from itertools import groupby

from src.decorators import get_from_cache_or_run
from lcs import Perception

from src.visualization import PLOT_DPI
from src.utils import build_plots_dir_path, build_cache_dir_path

root_dir = pathlib.Path().cwd().parent.parent.parent
cwd_dir_name = pathlib.Path().cwd().name

plot_dir = build_plots_dir_path(root_dir) / cwd_dir_name
cache_dir = build_cache_dir_path(root_dir) / cwd_dir_name

plt.ioff()  # turn off interactive plotting

<matplotlib.pyplot._IoffContext at 0x7fe4f04054f0>

## ACS2

In [19]:
import lcs.agents.acs2 as acs2
from lcs.metrics import population_metrics

from typing import Optional, Tuple

def maze_metrics(agent, env):
    metrics = {}
    metrics.update(population_metrics(agent.population, env))
    return metrics

acs2_base_params = {
    'classifier_length': 8,
    'number_of_possible_actions': 8,
    'biased_exploration': 0,
    'metrics_trial_frequency': 1,
    'user_metrics_collector_fcn': maze_metrics
}

def run_acs2_explore_exploit(env, explore_trials, exploit_trials, **config):
    cfg = acs2.Configuration(**config)

    # explore phase
    agent = acs2.ACS2(cfg)
    metrics_explore = agent.explore(env, explore_trials)

    # exploit phase
    agent_exploit = acs2.ACS2(cfg, copy(agent.population))
    metrics_exploit = agent_exploit.exploit(env, exploit_trials)

    return (agent, metrics_explore), (agent_exploit, metrics_exploit)

def find_best_classifier(population: acs2.ClassifiersList, situation: Perception) -> Optional[acs2.Classifier]:
    match_set = population.form_match_set(situation)
    anticipated_change_cls = [cl for cl in match_set if cl.does_anticipate_change()]

    if len(anticipated_change_cls) > 0:
        return max(anticipated_change_cls, key=lambda cl: cl.fitness)

    return None

def build_fitness_and_action_matrices(env, population) -> Tuple:
    original = env.env.maze.matrix

    fitness = original.copy()
    action = original.copy().astype(str)

    action_lookup = {
        0: u'↑', 1: u'↗', 2: u'→', 3: u'↘',
        4: u'↓', 5: u'↙', 6: u'←', 7: u'↖'
    }

    for index, x in np.ndenumerate(original):
        if x == 0:  # path
            perception = env.env.maze.perception(index)
            best_cl = find_best_classifier(population, perception)

            if best_cl:
                fitness[index] = best_cl.fitness
                action[index] = action_lookup[best_cl.action]
            else:
                fitness[index] = -1
                action[index] = '?'

        if x == 1:  # wall
            fitness[index] = 0
            action[index] = '\#'

        if x == 9:  # reward
            # add 500 to make it more distinguishable
            fitness[index] = fitness.max() + 500
            action[index] = 'R'

    return fitness, action

def plot_policy(env, fitness_matrix, action_matrix):
    fig, ax = plt.subplots(1, 1, figsize=(14, 8))

    max_x, max_y = env.env.maze.matrix.shape

    # Render maze as image
    plt.imshow(fitness_matrix, interpolation='nearest', cmap='Reds', aspect='auto', extent=[0, max_x, max_y, 0])

    # Add labels to each cell
    for (y, x), val in np.ndenumerate(action_matrix):
        plt.text(x + 0.4, y + 0.5, "${}$".format(val))

    ax.set_title("Policy in Maze5 environment", fontsize=24)
    ax.set_xlabel('x', fontsize=18)
    ax.set_ylabel('y', fontsize=18)
    ax.set_xlim(0, max_x)
    ax.set_ylim(max_y, 0)
    ax.set_xticks(range(0, max_x))
    ax.set_yticks(range(0, max_y))
    ax.grid(True)
    fig.savefig(f'{plot_dir}/acs2-maze5-policy.png', dpi=PLOT_DPI)

@get_from_cache_or_run(cache_path=f'{cache_dir}/acs2_maze5.dill')
def run_acs2_in_maze5():
    env = gym.make('Maze5-v0')
    explore_phase, exploit_phase = run_acs2_explore_exploit(env, explore_trials=5000, exploit_trials=100, **acs2_base_params)
    return env, explore_phase, exploit_phase

# Run computation
env_, explore_, exploit_ = run_acs2_in_maze5()

# Plot the policy
fitness_matrix, action_matrix = build_fitness_and_action_matrices(env_, explore_[0].population)
plot_policy(env_, fitness_matrix, action_matrix)

:::{figure-md} maze5-fig
:class: full-width
<img src="../../_static/plots/2_selected_topics/acs2-maze5-policy.png">

Policy of Maze5. Saturation of red color reflects the best classifier fitness value.
:::

## MACS

In [20]:
import lcs.agents.macs.macs as macs

maze228_env = gym.make('Maze228-v0')

def _calculate_rotating_maze_knowledge(agent: macs.MACS, env):
    transitions = env.env.transitions
    covered_transitions = 0

    for p0, a, p1 in transitions:
        p0p = Perception(list(map(str, p0)))
        p1p = Perception(list(map(str, p1)))
        anticipations = list(agent.get_anticipations(p0p, a))

        # accurate classifiers
        if len(anticipations) == 1 and anticipations[0] == p1p:
            covered_transitions += 1

    return covered_transitions / len(transitions)

def _macs_metrics(agent: macs.MACS, env):
    population = agent.population
    return {
        'pop': len(population),
        'situations': len(agent.desirability_values),
        '0_cls': len([cl for cl in population if cl.action == 0 and cl.is_accurate]),
        '1_cls': len([cl for cl in population if cl.action == 1 and cl.is_accurate]),
        '2_cls': len([cl for cl in population if cl.action == 2 and cl.is_accurate]),
        'knowledge': _calculate_rotating_maze_knowledge(agent, env)
    }

@get_from_cache_or_run(cache_path=f'{cache_dir}/macs_maze228.dill')
def run_macs_in_maze228(env):
    cfg = macs.Configuration(classifier_length=9,
                        number_of_possible_actions=3,
                        feature_possible_values=[{'0', '1', '9'}] * 8 + [{'0', '9'}],
                        estimate_expected_improvements=True,
                        metrics_trial_frequency=10,
                        user_metrics_collector_fcn=_macs_metrics)

    agent = macs.MACS(cfg)
    metrics = agent.explore(env, 100)

    return agent, metrics

# run computations
macs_agent, macs_metrics = run_macs_in_maze228(maze228_env)

# visualization
random.seed(129)  # found experimentally
maze228_env.reset()
maze228_env.render()

left_from_reward = maze228_env.env.maze.perception()

print(f'Perception: {"".join(left_from_reward)}')


[30m■[0m [30m■[0m [30m■[0m [30m■[0m [30m■[0m [30m■[0m [30m■[0m
[30m■[0m [37m□[0m [30m■[0m [37m□[0m [30m■[0m [37m□[0m [30m■[0m
[30m■[0m [37m□[0m [37m□[0m [37m□[0m [37m□[0m [37m□[0m [30m■[0m
[30m■[0m [37m□[0m [30m■[0m [30m■[0m [37m□[0m [37m□[0m [30m■[0m
[30m■[0m [37m□[0m [30m■[0m [37m□[0m [37m□[0m [37m□[0m [30m■[0m
[30m■[0m [37m□[0m [37m□[0m [37m□[0m [31mA[0m [33m$[0m [30m■[0m
[30m■[0m [30m■[0m [30m■[0m [30m■[0m [30m■[0m [30m■[0m [30m■[0m
Perception: 009111000


In [21]:
print(f'Total MACS population size: {len(macs_agent.population)}')
s = sorted(macs_agent.population.form_match_set(left_from_reward), key=lambda cl: cl.action)

action_mapping = ['STEP_AHEAD', 'ROTATE_LEFT', 'ROTATE_RIGHT']

for action, group_cl in groupby(s, key=lambda cl: cl.action):
    print(f'\nAction: {action_mapping[action]}, anticipation: {list((macs_agent.get_anticipations(left_from_reward, action)))}')
    for cl in sorted(group_cl, key=lambda cl: cl.effect):
        print(f'\t{cl.condition} {cl.action} {cl.effect}')

Total MACS population size: 211

Action: STEP_AHEAD, anticipation: [0 0 0 9 0 0 0 1 0]
	0#9###### 0 0????????
	#0###10## 0 ?0???????
	0#9##1### 0 ?0???????
	#0####### 0 ??0??????
	0#9###### 0 ???9?????
	0######## 0 ????0????
	0#####0## 0 ?????0???
	#######0# 0 ??????0??
	##9###### 0 ??????0??
	##91#10## 0 ???????1?
	0######## 0 ????????0

Action: ROTATE_LEFT, anticipation: [0 0 0 0 9 1 1 1 0]
	######0## 1 0????????
	#######0# 1 ?0???????
	0######## 1 ??0??????
	#0####### 1 ???0?????
	##9###### 1 ????9????
	###1##### 1 ?????1???
	####1#### 1 ??????1??
	#####1### 1 ???????1?
	######### 1 ????????0

Action: ROTATE_RIGHT, anticipation: [9 1 1 1 0 0 0 0 0]
	##9###### 2 9????????
	###1##### 2 ?1???????
	####1#### 2 ??1??????
	#####1### 2 ???1?????
	######0## 2 ????0????
	#######0# 2 ?????0???
	0######## 2 ??????0??
	#0####### 2 ???????0?
	######### 2 ????????0


Comparing the ACS2 agent in the same environment

In [22]:
def _acs2_metrics(agent: acs2.ACS2, env):
    population = agent.population
    reliable = [cl for cl in population if cl.is_reliable()]
    return {
        'pop': len(population),
        'rel': len(reliable)
    }

@get_from_cache_or_run(cache_path=f'{cache_dir}/acs2_maze228.dill')
def run_acs2_in_maze228(env):
    cfg = acs2.Configuration(classifier_length=9,
                        number_of_possible_actions=3,
                        metrics_trial_frequency=1,
                        user_metrics_collector_fcn=_acs2_metrics,
                        do_ga=False)

    agent = acs2.ACS2(cfg)
    metrics = agent.explore(env, 5000)

    return agent, metrics

acs2_agent, acs2_metrics = run_acs2_in_maze228(maze228_env)

In [23]:
reliable_classifiers = [cl for cl in acs2_agent.population if cl.is_reliable()]

print(f'Total reliable population for ACS2: {len(reliable_classifiers)}')
for cl in acs2_agent.population.form_match_set(left_from_reward):
    if cl.is_reliable():
        print(f'{cl.condition} {cl.action} {cl.effect}')

Total reliable population for ACS2: 402
##911#00# 1 ##009#11#
##9111#0# 0 ##0900#1#
009#11### 2 911#00###


---

**Software packages used**

In [24]:
import session_info
session_info.show()