In [None]:
#default_exp agents.cartpole

# Cartpole Agent
> An agent for solving the Cartpole environment

In [None]:
#export

import gym
import time
import logging

from typing import List

# OpenCog
from opencog.logger import log
from opencog.pln import *
from opencog.type_constructors import *
from opencog.utilities import set_default_atomspace

# ROCCA
from rocca.envs.wrappers import GymWrapper
from rocca.agents import OpencogAgent
from rocca.agents.utils import *

from rocca.utils import *
from rocca.agents.core import logger as ac_logger

In [None]:
from tensorboardX import SummaryWriter

## CartPole Wrapper

In [None]:
#export

class CartPoleWrapper(GymWrapper):
    def __init__(self, env):
        action_list = ["Go Left", "Go Right"]
        super().__init__(env, action_list)

    def labeled_observations(self, space, obs, sbs=""):
        """Translate gym observation to Atomese

        There are 4 observations (taken from CartPoleEnv help)

        Observation               Min             Max
        -----------               ---             ---
        Cart Position             -4.8            4.8
        Cart Velocity             -Inf            Inf
        Pole Angle                -24 deg         24 deg
        Pole Velocity At Tip      -Inf            Inf

        They are represented in atomese as follows

        Evaluation
            Predicate "Cart Position"
            Number CP

        Evaluation
            Predicate "Cart Velocity"
            Number CV

        Evaluation
            Predicate "Pole Angle"
            Number PA

        Evaluation
            Predicate "Pole Velocity At Tip"
            Number PVAT

        Note that the observations are neither tv-set nor
        timestamped. It is up to the caller to do it.

        A python list (not an atomese list) is returned with these 4
        Atomese observations.

        """

        cp = NumberNode(str(obs[0]))
        cv = NumberNode(str(obs[1]))
        pa = NumberNode(str(obs[2]))
        pvat = NumberNode(str(obs[3]))

        return [
            EvaluationLink(PredicateNode("Cart Position"), cp),
            EvaluationLink(PredicateNode("Cart Velocity"), cv),
            EvaluationLink(PredicateNode("Pole Angle"), pa),
            EvaluationLink(PredicateNode("Cart Velocity At Tip"), pvat),
        ]

## Agent Definition

### Fixed Rule Agent

In [None]:
#export

class FixedCartPoleAgent(OpencogAgent):
    def __init__(self, env: GymWrapper):
        # Create Action Space. The set of allowed actions an agent can take.
        # TODO take care of action parameters.
        action_space = {ExecutionLink(SchemaNode(a)) for a in env.action_list}

        # Create Goal
        pgoal = EvaluationLink(PredicateNode("Reward"), NumberNode("1"))
        ngoal = EvaluationLink(PredicateNode("Reward"), NumberNode("0"))

        # Call super ctor
        super().__init__(env, action_space, pgoal, ngoal)

    def plan(self, goal, expiry) -> List:
        """Plan the next actions given a goal and its expiry time offset

        Return a python list of cognitive schematics.  Whole cognitive
        schematics are output (instead of action plans) in order to
        make a decision based on their truth values.  Alternatively it
        could return a pair (action plan, tv), where tv has been
        evaluated to take into account the truth value of the context
        as well (which would differ from the truth value of rule in
        case the context is uncertain).

        The format for a cognitive schematic is as follows

        PredictiveImplicationScope <tv>
          <vardecl>
          <expiry>
          And (or SimultaneousAnd?)
            <context>
            Execution
              <action>
              <input> [optional]
              <output> [optional]
          <goal>

        """

        # For now we provide 2 hardwired rules
        #
        # 1. Push cart to the left (0) if angle is negative
        # 2. Push cart to the right (1) if angle is positive
        #
        # with some arbitrary truth value (stv 0.9, 0.1)
        angle = VariableNode("$angle")
        numt = TypeNode("NumberNode")
        time_offset = to_nat(1)
        pole_angle = PredicateNode("Pole Angle")
        go_right = SchemaNode("Go Right")
        go_left = SchemaNode("Go Left")
        reward = PredicateNode("Reward")
        epsilon = NumberNode("0.01")
        mepsilon = NumberNode("-0.01")
        unit = NumberNode("1")
        hTV = TruthValue(0.9, 0.1)  # High TV
        lTV = TruthValue(0.1, 0.1)  # Low TV

        # PredictiveImplicationScope <high TV>
        #   TypedVariable
        #     Variable "$angle"
        #     Type "NumberNode"
        #   Time "1"
        #   And
        #     Evaluation
        #       Predicate "Pole Angle"
        #       Variable "$angle"
        #     GreaterThan
        #       Variable "$angle"
        #       0
        #     Execution
        #       Schema "Go Right"
        #   Evaluation
        #     Predicate "Reward"
        #     Number "1"
        cs_rr = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(angle, epsilon),
                # Action
                ExecutionLink(go_right),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=hTV,
        )

        # PredictiveImplicationScope <high TV>
        #   TypedVariable
        #     Variable "$angle"
        #     Type "NumberNode"
        #   Time "1"
        #   And
        #     Evaluation
        #       Predicate "Pole Angle"
        #       Variable "$angle"
        #     GreaterThan
        #       0
        #       Variable "$angle"
        #     Execution
        #       Schema "Go Left"
        #   Evaluation
        #     Predicate "Reward"
        #     Number "1"
        cs_ll = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(mepsilon, angle),
                # Action
                ExecutionLink(go_left),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=hTV,
        )

        # To cover all possibilities we shouldn't forget the complementary
        # actions, i.e. going left when the pole is falling to the right
        # and such, which should make the situation worse.

        # PredictiveImplicationScope <low TV>
        #   TypedVariable
        #     Variable "$angle"
        #     Type "NumberNode"
        #   Time "1"
        #   And (or SimultaneousAnd?)
        #     Evaluation
        #       Predicate "Pole Angle"
        #       Variable "$angle"
        #     GreaterThan
        #       Variable "$angle"
        #       0
        #     Execution
        #       Schema "Go Left"
        #   Evaluation
        #     Predicate "Reward"
        #     Number "1"
        cs_rl = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(angle, epsilon),
                # Action
                ExecutionLink(go_left),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=lTV,
        )

        # PredictiveImplicationScope <low TV>
        #   TypedVariable
        #     Variable "$angle"
        #     Type "NumberNode"
        #   Time "1"
        #   And (or SimultaneousAnd?)
        #     Evaluation
        #       Predicate "Pole Angle"
        #       Variable "$angle"
        #     GreaterThan
        #       0
        #       Variable "$angle"
        #     Execution
        #       Schema "Go Right"
        #   Evaluation
        #     Predicate "Reward"
        #     Number "1"
        cs_lr = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(mepsilon, angle),
                # Action
                ExecutionLink(go_right),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=lTV,
        )

        # Ideally we want to return only relevant cognitive schematics
        # (i.e. with contexts probabilistically currently true) for
        # now however we return everything and let to the deduction
        # process deal with it, as it should be able to.
        return [cs_ll, cs_rr, cs_rl, cs_lr]

In [None]:
def seed_with(agent, knowledge):  # TODO: figure out how to pass atoms to be inserted.
    set_default_atomspace(agent.atomspace)

    angle = VariableNode("$angle")
    numt = TypeNode("NumberNode")
    time_offset = to_nat(1)
    pole_angle = PredicateNode("Pole Angle")
    go_right = SchemaNode("Go Right")
    go_left = SchemaNode("Go Left")
    reward = PredicateNode("Reward")
    epsilon = NumberNode("0.01")
    mepsilon = NumberNode("-0.01")
    unit = NumberNode("1")
    hTV = TruthValue(0.9, 0.1)  # High TV
    lTV = TruthValue(0.1, 0.1)  # Low TV

    cs_rr = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(angle, epsilon),
                # Action
                ExecutionLink(go_right),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=hTV,
        )

    cs_ll = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(mepsilon, angle),
                # Action
                ExecutionLink(go_left),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=hTV,
        )

    cs_rl = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(angle, epsilon),
                # Action
                ExecutionLink(go_left),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=lTV,
        )
        
    cs_lr = PredictiveImplicationScopeLink(
            TypedVariableLink(angle, numt),
            time_offset,
            AndLink(
                # Context
                EvaluationLink(pole_angle, angle),
                GreaterThanLink(mepsilon, angle),
                # Action
                ExecutionLink(go_right),
            ),
            # Goal
            EvaluationLink(reward, unit),
            # TV
            tv=lTV,
        )

    agent.cognitive_schematics.update(set([cs_ll, cs_rr, cs_rl, cs_lr]))  # TODO: the code should update Python-side automatically.

### Learning Agent

In [None]:
#export

class LearningCartPoleAgent(OpencogAgent):
    def __init__(self, env: GymWrapper, log_level="debug"):
        # Create Action Space. The set of allowed actions an agent can take.
        # TODO take care of action parameters.
        action_space = {ExecutionLink(SchemaNode(a)) for a in env.action_list}

        # Create Goal
        pgoal = EvaluationLink(PredicateNode("Reward"), NumberNode("1"))
        ngoal = EvaluationLink(PredicateNode("Reward"), NumberNode("0"))

        # Call super ctor
        super().__init__(env, action_space, pgoal, ngoal, log_level=log_level)

## Experiment

In [None]:
env = gym.make("CartPole-v1")

In [None]:
env.action_space

In [None]:
env.observation_space

### Fixed Agent

In [None]:
atomspace = AtomSpace()
set_default_atomspace(atomspace)
wrapped_env = CartPoleWrapper(env)

In [None]:
# tb_writer = SummaryWriter(comment="cartpole-fixed")

cpa = FixedCartPoleAgent(wrapped_env)
cpa.delta = 1.0e-16

# Run control loop
while not cpa.step():
    time.sleep(0.1)
    log.debug("step_count = {}".format(cpa.step_count))
    # tb_writer.add_scalar(
    #     "accumulated_reward", cpa.accumulated_reward, cpa.step_count
    # )

log_msg(agent_log, f"The final reward is {cpa.accumulated_reward}.")

In [None]:
cpa.atomspace

### Learning Agent

In [None]:
atomspace = AtomSpace()
set_default_atomspace(atomspace)
wrapped_env = CartPoleWrapper(env)

In [None]:
agent = LearningCartPoleAgent(wrapped_env, log_level="fine")
agent.delta = 1.0e-16

seed_with(agent, ["fixme"])

In [None]:
tb_writer = SummaryWriter(comment="cartpole-learning-seeded")
ac_logger.setLevel(logging.DEBUG)  # The agents.core logger

epochs = 10  # Number of epochs (learning / interacting episodes)
epoch_len = 200

for i in range(epochs):
    wrapped_env.restart()
    agent.reset_action_counter()
    accreward = agent.accumulated_reward  # Keep track of the reward before

    # Learning phase: discover patterns to make more informed decisions
    log_msg(agent_log, f"Learning phase started. ({i + 1}/{epochs})")
    agent.learn()
    
    # Run agent to accumulate percepta
    log_msg(agent_log, f"Interaction phase started. ({i + 1}/{epochs})")
    for j in range(epoch_len):
        done = agent.step()
        time.sleep(0.01)
        log.debug("step_count = {}".format(agent.step_count))
        if done:
            break
        
    new_reward = agent.accumulated_reward - accreward
    tb_writer.add_scalar("train/accumulated_reward", new_reward, agent.step_count)
    log_msg(agent_log, "Accumulated reward during {}th epoch = {}".format(i + 1, new_reward))
    log_msg(agent_log, "Action counter during {}th epoch:\n{}".format(i + 1, agent.action_counter))  # TODO: make the action counter look good

log_msg(agent_log, f"The average total reward over {epochs} trials (training): {agent.accumulated_reward / epochs}.")
# TODO: add a separate testing loop and measure average total reward.