<img src='http://hilpisch.com/taim_logo.png' width="350px" align="right">

# AI Algorithms

Dr Yves J Hilpisch | The AI Machine

http://aimachine.io | http://twitter.com/dyjh

## Reinforcement Learning

In [None]:
!git clone https://github.com/tpq-classes/ai_in_finance.git
import sys
sys.path.append('ai_in_finance')


In [None]:
import random
import numpy as np
import pandas as pd
from pylab import plt

In [None]:
import warnings; warnings.simplefilter('ignore')

In [None]:
from IPython import display
plt.style.use('seaborn-v0_8')

## Gym Environment

In [None]:
# !pip install --upgrade gymnasium

In [None]:
import gymnasium as gym

In [None]:
gym.__version__

In [None]:
env = gym.make('CartPole-v1', render_mode='rgb_array')

## Action Space

In [None]:
env.action_space  # type of action space

In [None]:
env.action_space.n  # number of possible actions

In [None]:
env.action_space.sample()  # sample action | move left = 0 | move right = 1

In [None]:
env.action_space.sample()  # sample action | move left = 0 | move right = 1

## Observation Space

In [None]:
np.set_printoptions(precision=4, suppress=True)

In [None]:
env.observation_space  # type of observation space

In [None]:
env.observation_space.high.astype(np.float16) # upper bounds for observations

In [None]:
env.observation_space.low.astype(np.float16)  # lower bounds for observations

In [None]:
o = env.reset()
o  # [cart position, cart velocity, pole angle, pole angular velocity]

## Taking Action

The following visualizes the effect of a number of random actions taken. See https://gist.github.com/thomelane/79e97630ba46c45985a946cae4805885

In [None]:
a = env.action_space.sample()  # random action
a

In [None]:
r = env.step(a)  # taking action, capturing new observations
r  # (observation, reward, done, info)

In [None]:
# !pip install pygame

In [None]:
env.reset()
img = plt.imshow(env.render()) # initialize bitmap embedding
for _ in range(200):
    img.set_data(env.render()) # updating the data
    display.display(plt.gcf())
    display.clear_output(wait=True)
    a = env.action_space.sample()  # random action choice
    obs, rew, done, trunc, info = env.step(a)  # taking action
    if done and (_ + 1) < 200:
        print('*** FAILED ***')
        break

## Dimensionality Reduction

By using four weights and taking the dot product between the weights and the four observation values, the observation (state) space can be reduced from 4 dimensions to just 1. See http://kvfrans.com/simple-algoritms-for-solving-cartpole/.

In [None]:
weights = np.random.random(4) * 2 - 1  # 4 random weights ...

In [None]:
weights  # ... between -1 and 1

In [None]:
o, _ = env.reset()

In [None]:
o  # reduction of dimensionality from 4 ...

In [None]:
s = np.dot(weights, o)  # ... to 1
s

## Action Rule

The agent behaves according to the following action rule:

In [None]:
if s < 0:  # if single state value is negative ...
    a = 0  # ... move left
else:  # otherwise ...
    a = 1  # ... move right

In [None]:
a

## Learning Objective

Learn those `weights` that allow the agent to survive 200 steps based on the above action rule.

## Total Reward per Episode

Function that returns the total reward `trew` given certain `weights` and the action rule.

In [None]:
def run_episode(env, weights):  
    o, info = env.reset()
    trew = 0
    for _ in range(200):
        s = np.dot(weights, o)
        a = 0 if s < 0 else 1
        o, rew, done, trunc, info = env.step(a)
        trew += rew
        if done:
            break
    return trew

In [None]:
run_episode(env, weights)  # 200 means success

## Simple Learning 

The following code runs a maximum number of episodes `num_episodes` and stops when a certain `weights` combination makes the agent successful (200 survived actions).

In [None]:
num_episodes = 5000

In [None]:
%%time
bestweights = None  
besttrew = 0
for _ in range(1, num_episodes + 1):
    print('episode = {}'.format(_), end='\r')
    weights = np.random.rand(4) * 2 - 1
    trew = run_episode(env, weights)
    if trew > besttrew:
        besttrew = trew
        bestweights = weights
        if trew == 200:  # success?
            break

In [None]:
_

In [None]:
weights  # learned ('optimal') weights

## Testing the Results

In [None]:
# some episodes with the learned weights
for _ in range(20):
    trew = run_episode(env, weights)
    print(trew, end=' | ')

In [None]:
# a single episode visualized (inline)
o, info = env.reset()
img = plt.imshow(env.render()) # initialize bitmap embedding
for _ in range(200):
    img.set_data(env.render()) # updating the data
    display.display(plt.gcf())
    display.clear_output(wait=True)
    s = np.dot(weights, o)
    a = 0 if s < 0 else 1
    o, rew, done, trunc, info = env.step(a)  # taking action
    if done and (_ + 1) < 200:
        print('*** FAILED at STEP {} ***'.format(_ + 1))
        break
if done:
    print('*** SUCCESS ***')

<img src='http://hilpisch.com/taim_logo.png' width="350px" align="right">