In [1]:
import math

# Values taken from game

inner_radius = 5
outer_radius = 15
rider_height = 4
rider_offset_y = 0.6

wheel_radius = 1.1
wheel_mass = 5
wheel_inertia = 70
gravity = 9.81
friction = 0.9

pedaling_acceleration = 3 # radians
lean_speed = math.radians(200)

delta_time = 1/60 # 60 FPS

In [65]:
# Construct classes/helpers for 2d equivalent of the game,
# with numerically equivalent dynamics.

import numpy as np
import math
import random

def lerp(a, b, t):
    return t*b + (1-t)*a

def invlerp(a, b, c):
    return (c-a)/(b-a)

def absolute_angle(x, y):
    angle = math.atan2(y, x)
    if angle < 0:
        angle = math.pi + abs(angle + math.pi)
    return angle

class Loop:
    def __init__(self, inner_radius, outer_radius):
        self.inner_radius = inner_radius
        self.outer_radius = outer_radius
        
    def on_track(self, position):
        r = math.hypot(position[0], position[1])
        return r > self.inner_radius and r < self.outer_radius
        
    def generate_starting_position(self):
        r = random.randrange(self.inner_radius, self.outer_radius)
        return [r, 0]
        
    def draw_canvas(self, canvas):
        canvas.save()
        
        canvas.begin_path()
        canvas.fill_style = '#aaa'
        canvas.arc(0, 0, self.outer_radius, 0, 2 * math.pi, False)
        canvas.arc(0, 0, self.inner_radius, 0, 2 * math.pi, True)
        canvas.fill()
        
        canvas.begin_path()
        canvas.stroke_style = '#000'
        canvas.line_width = 0.1
        canvas.move_to(self.inner_radius, 0)
        canvas.line_to(self.outer_radius, 0)
        canvas.stroke()
        
        canvas.restore()

class Unicycle:
    def __init__(self, position=[0, 0], track=None):
        self.track = track
        self.position = position
        self.heading_angle = math.pi/2
        self.heading = [math.cos(self.heading_angle), math.sin(self.heading_angle)]
        
        # Unicycle state (radians)
        self.rider_lean_angle       = 0 # Lean of rider wrt front of unicycle
        self.unicycle_roll_angle    = 0 # Lean of unicycle wrt track
        self.unicycle_roll_momentum = 0 # Angular momentum of unicycle
        self.wheel_angular_momentum = 0
        
        self.failed = False
        
    def clone(self, to=None):
        # TODO: Be able to evaluate actions w/o stepping unicycle

        if to is None:
            to = Unicycle(self.position, self.track)

        to.position[0] = self.position[0]
        to.position[1] = self.position[1]
        to.track = self.track        

        to.heading_angle = self.heading_angle
        to.heading = self.heading

        to.rider_lean_angle       = self.rider_lean_angle # Lean of rider wrt front of unicycle
        to.unicycle_roll_angle    = self.unicycle_roll_angle # Lean of unicycle wrt track
        to.unicycle_roll_momentum = self.unicycle_roll_momentum # Angular momentum of unicycle
        to.wheel_angular_momentum = self.wheel_angular_momentum

        return to
        
    def draw_canvas(self, canvas):
        canvas.save()
        
        length = 2
        width = 1
        
        canvas.translate(self.position[0], self.position[1])
        canvas.rotate(self.heading_angle)

        # Osculating circle
        if abs(self.unicycle_roll_angle) > 1e-3:
            canvas.begin_path()
            denom = lerp(0, 1, invlerp(0, math.pi/2, abs(self.unicycle_roll_angle)))
            k = math.inf if abs(denom) < 1e-6 else 1/denom
            canvas.begin_path()
            canvas.line_width = 0.3
            canvas.stroke_style = '#ff0'
            canvas.arc(0, math.copysign(k, self.unicycle_roll_angle), k, 0, 2 * math.pi)
            canvas.stroke()
            
        # "Wheel"/body
        canvas.fill_style = '#f00' if self.failed else '#0f0'
        canvas.fill_rect(-length/2, -width/2, length, width) # Align w/ heading, so on x-axis
        
        # Heading arrow
        canvas.begin_path()
        canvas.stroke_style = '#ff0'
        canvas.line_width = 0.3
        canvas.move_to(length/2, 0)
        canvas.line_to(length/2 + 2, 0)
        canvas.stroke()
        
        # Wheel roll helper arrow
        canvas.begin_path()
        canvas.stroke_style = '#00f' if self.unicycle_roll_angle > 0 else '#f00'
        canvas.move_to(0, math.copysign(width/2, self.unicycle_roll_angle))
        canvas.line_to(0, (math.copysign(width/2, self.unicycle_roll_angle) + math.sin(self.unicycle_roll_angle) * 1.5))
        canvas.stroke()
        
        canvas.restore()
        
    def act(self, action, delta_time):
        # Take in action (maps to keyboard presses in game),
        # update rider lean angle, wheel pitch momentum

        # type UnicycleInput = {
        #    leaning: -1 | 0 | 1,
        #    pedaling: -2 | -1 | 0 | 1 | 2,
        # }
        
        # action: [leaning, pedaling]
        
        # Assume that leaning/pedaling in action are already discretized
        leaning = action[0]
        pedaling = action[1]
        
        self.wheel_angular_momentum += pedaling * pedaling_acceleration * delta_time

        dtheta = leaning * lean_speed * delta_time
        self.rider_lean_angle -= dtheta

    def update(self, delta_time):
        # Perform physics update step, according to delta time
        
        # Update roll from leaning
        r_x = math.sin(self.unicycle_roll_angle) * (wheel_radius * 2) \
            + math.sin(self.unicycle_roll_angle + self.rider_lean_angle) * (rider_height/2 + rider_offset_y)
        torque = r_x * gravity * wheel_mass

        self.unicycle_roll_momentum += torque/wheel_inertia * delta_time

        self.unicycle_roll_angle += self.unicycle_roll_momentum * delta_time
        self.unicycle_roll_angle = np.clip(self.unicycle_roll_angle, -math.pi/2, math.pi/2)

        # Update wheel rotation
        dx = self.wheel_angular_momentum * wheel_radius * delta_time
        self.wheel_angular_momentum -= dx * friction

        # Update heading according to curvature
        denom = lerp(0, 1, invlerp(0, math.pi/2, abs(self.unicycle_roll_angle)))
        k = math.inf if abs(denom) < 1e-3 else 1/denom

        dtheta = self.wheel_angular_momentum/k * delta_time
        self.heading_angle -= math.copysign(dtheta, -self.unicycle_roll_angle)

        self.heading[0] = math.cos(self.heading_angle)
        self.heading[1] = math.sin(self.heading_angle)
        
        # Move according to heading
        self.position[0] += self.heading[0] * dx
        self.position[1] += self.heading[1] * dx
        
        # Update failed or not, used to terminate episode
        if not self.failed:
            self.failed = not self.track.on_track(self.position) \
                or abs(self.unicycle_roll_angle) > math.pi/2 - 1e-1
        
    # Steps through timestep given action, returns reward
    def step(self, action, delta_time):
        starting_abs_angle = absolute_angle(self.position[0], self.position[1])

        self.act(action, delta_time)
        self.update(delta_time)

        ending_abs_angle = absolute_angle(self.position[0], self.position[1])

        # Then must have driven backwards from finish line, which ends the episode
        if ending_abs_angle - starting_abs_angle > abs(math.pi):
            self.failed = True
            return 0
        
        # Reward is progress made around circle
        progress = ending_abs_angle - starting_abs_angle

        return progress
            
            
# Abstract all the drawing logic for
# visualizing episodes to this function,
# to then be used in following cells.
def draw(canvas, entities):
    screen_width, screen_height = canvas.width, canvas.height
    
    world_width = 40
    world_height = screen_height/screen_width * world_width
    
    canvas.clear()

    canvas.save()

    canvas.scale(screen_width/world_width, -screen_height/world_height)
    canvas.translate(world_width/2, -world_height/2)

    for entity in entities:
        entity.draw_canvas(canvas)

    canvas.restore()

In [3]:
# Define matrix of all possible actions, an action being both an input for leaning and pedaling.
# To simplify, we'll ignore that the player can hold shift to pedal twice as fast, and just take all
# combinations of leaning forward, backward, or not at all (-1, 0, 1) and of pedaling forward, backward or not at all (-1, 0, 1).
# This results in 3*3=9 combinations, each with two entries (leaning/pedaling), making a 9x2 matrix. 
possible_actions = np.array([[[p, l] for p in [-1, 0, 1]] for l in [-1, 0, 1]]).reshape(9, 2)

In [78]:
# state is a 10x1 feature vector (9 features + bias)
# action is a 2x1 vector
    # type UnicycleInput = {
    #    leaning: -1 | 0 | 1,
    #    pedaling: -2 | -1 | 0 | 1 | 2,
    # }

def get_state(unicycle):
    # [r, cos_theta, sin_theta, cos_heading, sin_heading, rider_lean, lean, lean_momentum, wheel_momentum, 1]
    
    # Collect values from unicycle and normalize them.
    # Use Cartesian coordinates to represent angles, to remove discontinuity.
    x, y = unicycle.position
    r = math.hypot(x, y)
    cos_theta, sin_theta = x/r, y/r
    cos_heading = math.cos(unicycle.heading_angle)
    sin_heading = math.sin(unicycle.heading_angle)
    rider_lean = unicycle.rider_lean_angle/(math.pi/2)
    lean = unicycle.unicycle_roll_angle/(math.pi/2)
    wheel_momentum = unicycle.wheel_angular_momentum/6 # Max asymptotic wheel omega
    lean_momentum = unicycle.unicycle_roll_momentum # Angular vel, so no real meaning for unit
    
    # Flip sign of lean to be consistent? (I didn't bother figuring this out)
    lean *= -1
    lean_momentum *= -1
    
    # Add 1 as a last entry as a bias term an agent can incorporate
    return np.array([r, cos_theta, sin_theta, cos_heading, sin_heading, rider_lean, lean, wheel_momentum, lean_momentum, 1])

u = Unicycle()

def featurize_state_action(unicycle, action, delta_time):
    # Featurize state but incorporate action through
    # cheating by updating the environment. However, this incorporates
    # both the state and the action into a unique feature vector.
    # This should differentiate the features for each action for a given state,
    # making it more possible for the agent to discern between them.

    unicycle.clone(to=u)

    u.step(action, delta_time)
    state = get_state(unicycle)
    
    return state

def score(state):
    # Naive reward: (absolute) angle of ending point wrt finish line.
    # Should maybe encourage the agent to move backwards?
    x, y = state[1], state[2] # Use cos_theta, sin_theta
    angle = math.atan2(y, x)
    if angle < 0:
        angle = math.pi + abs(angle + math.pi)
    return angle

def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=0)

class Agent:
    def __init__(self):
        pass
    
    def policy(self, state):
        r, cos_theta, sin_theta, cos_heading, sin_heading, rider_lean, lean, wheel_momentum, lean_momentum = state

#         # [leaning, pedaling]
#         p = softmax([abs(min(0, lean)), 1e-3, max(0, lean)])
#         leaning = -1
#         if lean_momentum > 0.05:
#             leaning = -1
#         elif lean_momentum < -0.05:
#             leaning = 1
            
#         # return [np.random.choice([-1, 0, 1], p=p), 1]
#         return [leaning, 1]

# Agent that acts greedily according to a linear action-value (Q) function
# defined by a weight matrix `w` (the value of an action is a unique linear
# combination of all features). 
class GreedyLinearAgent:
    def __init__(self, w, unicycle, delta_time):
        # 9x10 weight matrix: separate weight vector for each action.
        self.w = w
        self.delta_time = delta_time
        self.unicycle = unicycle
    
    def policy(self, state):
        featurized_action_value_pairs = np.array([
            featurize_state_action(self.unicycle, action, self.delta_time)
             for action in possible_actions
        ])
        action_values = np.sum(np.multiply(self.w, featurized_action_value_pairs), axis=1)
        
        action = possible_actions[np.argmax(action_values)]
            
        return action

action_indices = np.arange(len(possible_actions))

class EpsGreedyLinearAgent:
    def __init__(self, w, unicycle, delta_time, eps):
        self.w = w
        self.unicycle = unicycle
        self.eps = eps
        self.gla = GreedyLinearAgent(w, unicycle, delta_time)
        self.delta_time = delta_time
        
    def replay(self, state, action, ret):
        # TODO: Clean this up
        action_index = 0
        for i, a in enumerate(possible_actions):
            if abs(a[0]-action[0]) < 1e-3 and abs(a[1]-action[1]) < 1e-3:
                action_index = i
                break

        target = ret
        featurized = featurize_state_action(self.unicycle, action, self.delta_time)
        current = self.w[action_index] @ featurized
        alpha = 0.5
        self.w[action_index] -= alpha * (target - current) * featurized
        
    def policy(self, state):
        self.gla.w = self.w
        greedy_action = self.gla.policy(state)

        if np.random.randn() < (1 - self.eps):
            return greedy_action

        random_action_index = np.random.choice(action_indices)
        random_action = possible_actions[random_action_index]

        return random_action
        

In [83]:
# Episode rollout loop example.
# Each iteration, featurize the state of the unicycle,
# pass it to the policy (in this case something dumb),
# step the unicycle one timestep through that action,
# then draw to the canvas to visualize the policy.

from time import sleep
from ipycanvas import Canvas, hold_canvas

canvas = Canvas(width=400, height=300)
display(canvas)

track = Loop(inner_radius, outer_radius)

agent = Agent()
unicycle = Unicycle(position=track.generate_starting_position(), track=track)

def policy(state):
    leaning = np.clip(math.floor(np.sum(state)/3) - 1, -1, 1)
    pedaling = 2 - leaning

    return [leaning, pedaling]

n = 0
while True:
    with hold_canvas(): # wrap in hold_canvas() for performant canvas updates
        if unicycle.failed: # .failed becomes true whenever unicycle gets off-track or tips over
            break

        state = get_state(unicycle)
        action = policy(state)
        unicycle.step([-1, 1], delta_time)
        
        n += 1

        draw(canvas, [track, unicycle])

        if n == 50:
            break
            
        # Optional
        sleep(delta_time)
print(n)

Canvas(height=300, width=400)

50


In [64]:
# https://proceedings.neurips.cc/paper_files/paper/2013/file/7504adad8bb96320eb3afdd4df6e1f60-Paper.pdf
# Use action-value function (Q) defined as a unique linear combination
# of the feature vector for each action. Weights are passed in, where the CEM
# algorithm is done to update the family of agents each iteration.

from time import sleep
from ipycanvas import Canvas, hold_canvas
import matplotlib.pyplot as plt

# canvas = Canvas(width=400, height=300)
# display(canvas)

track = Loop(inner_radius, outer_radius)

# Shape of input/output
n_actions = 9
n_state_features = 10

# Hyperparameters for initial (meta-)weights generation
start_means_mean = 0
start_means_stdev = 0.7

start_stdevs_mean = 0
start_stdevs_stdev = 1

discount_factor = 0.9

w_mean = np.random.normal(start_means_mean, start_means_stdev, size=(n_actions, n_state_features))
w_stdev = np.absolute(np.random.normal(start_stdevs_mean, start_stdevs_stdev, size=(n_actions, n_state_features)))

generation_size = 1000
episodes_per_eval = 1
proportion = 0.1
n_candidates = math.floor(proportion * generation_size)

k_noise = 0 # Noise when updating standard deviation

rounds = 10

delta_time = 1/60 # Time used for looking ahead to future states

xs = []
ys = []

print('Starting loop')
for k in range(rounds):
    # Sample agents from probability distribution
    agents_unicycles = []

    for _ in range(generation_size):
        unicycle = Unicycle(position=track.generate_starting_position(), track=track)
        w = np.random.normal(w_mean, w_stdev)
        agent = GreedyLinearAgent(w, unicycle, delta_time)
        agents_unicycles.append((agent, unicycle))

    # Evaluate each agent, tracking the best-performing ones
    candidates = []

    for agent, unicycle in agents_unicycles:
        # Play n episodes
        average_return = 0
        for i in range(episodes_per_eval):
            delta_position = 0
            iters_idle = 0
            stuck = False
            return_ = 0
            
            while True:
                if unicycle.failed:
                    break

                old_x, old_y = unicycle.position
                    
                state = get_state(unicycle)
                action = agent.policy(state)
                reward = unicycle.step(action, delta_time)
                
                delta_position = math.hypot(unicycle.position[0] - old_x, unicycle.position[1] - old_y)

                return_ = discount_factor * return_ + reward
                
                if delta_position < 1e-3:
                    iters_idle += 1
                else:
                    iters_idle = 0
                    
                if iters_idle > 30:
                    stuck = True
                    break
            
            # Try to disincentivize getting stuck. (Try changing this?)
            reward = -1 if stuck else score(state)
            average_return += reward/episodes_per_eval

        # If happened to do well, potentially add as candidate
        if len(candidates) < n_candidates:
            candidates.append((agent, reward))
        else:
            if reward > candidates[0][1]:
                candidates[0] = (agent, reward)
            candidates.sort(key=lambda ar: ar[1])

    # Update new mean and stdev matrices, based on candidates and noise.
    
    new_w_stdev = 1/n_candidates * np.sum([np.abs(agent.w - w_mean) for agent, _ in candidates], axis=0)
    new_w_stdev = np.abs(new_w_stdev + k_noise * np.random.randn(n_actions, n_state_features))
    delta_w_stdev = np.linalg.norm(new_w_stdev - w_stdev)
    w_stdev = new_w_stdev
    
    new_w_mean = 1/n_candidates * np.sum([agent.w for agent, _ in candidates], axis=0)
    delta_w_mean = np.linalg.norm(new_w_mean - w_mean)
    w_mean = new_w_mean
    
    best_agent_avg_reward = candidates[-1][1]
    
    xs.append(k)
    ys.append(best_agent_avg_reward)
    
    for _, return_ in candidates:
        print(return_)    

    print(f'({k+1}/{rounds}) Finished evaluating generation. Best avg. reward: ', best_agent_avg_reward)
    print('Updated distribution! Delta mean: ', delta_w_mean, '. Delta stdev: ', delta_w_stdev)

# Return graph over each iteration
plt.plot(xs, ys)

Starting loop


KeyboardInterrupt: 

In [None]:
canvas = Canvas(width=400, height=300)
display(canvas)

track = Loop(inner_radius, outer_radius)

agent = candidates[0][0]
unicycle = Unicycle(position=track.generate_starting_position(), track=track)

n = 0
while True:
    with hold_canvas(): # wrap in hold_canvas() for performant canvas updates
        if unicycle.failed: # .failed becomes true whenever unicycle gets off-track or tips over
            break

        state = get_state(unicycle)
        action = agent.policy(state)
        unicycle.step(action, delta_time)
        
        n += 1

        draw(canvas, [track, unicycle])
            
        # Optional
        sleep(delta_time)
print(n)

Canvas(height=300, width=400)

221


In [79]:
# Q-Value Learning (in this case, through just a linear function, so technically not DQN).
# Have one agent/policy. For each epoch, play some number of episodes,
# occasionally collecting (randomly sampling) state/action/return tuples.
# Then, at the end of the epoch, update the Q function through backups based on the tracked state-action pairs.

from time import sleep
from ipycanvas import Canvas, hold_canvas
import matplotlib.pyplot as plt

track = Loop(inner_radius, outer_radius)

# Shape of input/output
n_actions = 9
n_state_features = 10

# Hyperparameters for initial (meta-)weights generation
epsilon = 0.95
epsilon_decay = 0.995 # Get greedier over time
episodes_per_epoch = 1
memorize_p = 0.1
discount_factor = 0.99

rounds = 100

delta_time = 1/60 # Time used for looking ahead to future states

w = np.random.randn(n_actions, n_state_features)

unicycle = Unicycle(position=track.generate_starting_position(), track=track)
agent = EpsGreedyLinearAgent(w, unicycle, delta_time, eps=epsilon)

for k in range(rounds):
    memory = []

    # Play n episodes
    for i in range(episodes_per_epoch):
        delta_position = 0
        iters_idle = 0
        stuck = False
        
        state_actions_to_replay = []

        return_ = 0
        
        while True:
            if unicycle.failed:
                break

            old_x, old_y = unicycle.position
                
            state = get_state(unicycle)
            action = agent.policy(state)
            
            if np.random.randn() < memorize_p:
                state_actions_to_replay.append((state, action))

            reward = unicycle.step(action, delta_time)
            
            return_ = discount_factor * return_ + reward
            
            delta_position = math.hypot(unicycle.position[0] - old_x, unicycle.position[1] - old_y)
            
            if delta_position < 1e-3:
                iters_idle += 1
            else:
                iters_idle = 0
                
            if iters_idle > 30:
                stuck = True
                return_ += -10
                break
        
        for state, action in state_actions_to_replay:
            memory.append((state, action, return_))

    print(f'({k+1}/{rounds}) Finished epoch. Return: {return_}')

    for state, action, return_ in memory:
        agent.replay(state, action, return_) # Would normally pass subsequent state, but calculated from unicycle and action (deterministic).
            
    agent.eps *= epsilon_decay

(1/100) Finished epoch. Return: 6.795778835263244e-05
(2/100) Finished epoch. Return: 0
(3/100) Finished epoch. Return: 0
(4/100) Finished epoch. Return: 0
(5/100) Finished epoch. Return: 0
(6/100) Finished epoch. Return: 0
(7/100) Finished epoch. Return: 0
(8/100) Finished epoch. Return: 0
(9/100) Finished epoch. Return: 0
(10/100) Finished epoch. Return: 0
(11/100) Finished epoch. Return: 0
(12/100) Finished epoch. Return: 0
(13/100) Finished epoch. Return: 0
(14/100) Finished epoch. Return: 0
(15/100) Finished epoch. Return: 0
(16/100) Finished epoch. Return: 0
(17/100) Finished epoch. Return: 0
(18/100) Finished epoch. Return: 0
(19/100) Finished epoch. Return: 0
(20/100) Finished epoch. Return: 0
(21/100) Finished epoch. Return: 0
(22/100) Finished epoch. Return: 0
(23/100) Finished epoch. Return: 0
(24/100) Finished epoch. Return: 0
(25/100) Finished epoch. Return: 0
(26/100) Finished epoch. Return: 0
(27/100) Finished epoch. Return: 0
(28/100) Finished epoch. Return: 0
(29/100) 