In [117]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as funct
import gymnasium as gym
import numpy as np
from matplotlib import pyplot as plt
import numpy as np
from time import sleep
from IPython import display
import copy
from itertools import count
import operator

In [118]:
#Just to get rid of annoying warnings
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=np.VisibleDeprecationWarning)
warnings.simplefilter(action='ignore', category=UserWarning)

In [119]:
env = gym.make("CartPole-v1")

In [120]:
class NN(nn.Module):
    """A Feed Forward Neural Network used for Deep-Q-Learning to solve the cartpole problem."""
    def __init__(self, obs_dim = env.observation_space.shape, n_actions = env.action_space.n):
        super(NN, self).__init__()      
        # The input (an observation/a state) has shape (4,) [cart pos, cart vel, pole angle, pole angular vel]
        self.fc1 = nn.Linear(*obs_dim, 128) # * to convert tuple (4,) to 4
        self.fc21 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, n_actions) # n_actions == 2

        self.optimizer = optim.Adam(self.parameters(), lr=1e-4)
        self.loss = nn.MSELoss()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)

    def forward(self, x):
        x = funct.relu(self.fc1(x))
        x = funct.relu(self.fc21(x))
        return self.fc3(x)

In [121]:
class Agent():
    # represents the agent of the RL
    def __init__(self, obs_dim = env.observation_space.shape, n_actions = env.action_space.n, 
                 eps = 0.05, popSize=20):
        
        self.obs_dim = obs_dim
        self.n_actions = n_actions
        self.action_space = [i for i in range(n_actions)]
        self.eps = eps
        self.popSize = popSize
        self.population = [NN() for _ in range(self.popSize)]

    def pick_action(self, state, nn):
        if np.random.random() > self.eps:
            actions = nn.forward(state) 
            action =  torch.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)
        return action
    
    def train(self, max_iter):
        iter = 0
        fitness = [-1 for _ in range(self.popSize)]
        while iter < max_iter and fitness[0] < 450:
            children = self.reprod()
            self.mutation(children)
            self.population.extend(children)
            fitness = self.fitness(reps=3)
            self.selection(fitness)
            print(iter)
            iter += 1

    def fitness(self, reps):
        fitness = {}
        idx = 0
        for nn in self.population:
            sum = 0
            for _ in range(reps):
                sum += self.play(nn)
            fitness.update([(idx, sum/reps)])
            idx += 1
        return fitness
    
    def selection(self, fitness):
        """Elitist Selection"""
        sortedFitness = sorted(fitness.items(), key=operator.itemgetter(1), reverse=True)
        idcs = [e[0] for e in sortedFitness[:self.popSize]]
        self.population = [self.population[i] for i in idcs]

    def mutation(self, children):
        for nn in children:
            nn.fc1.weight = torch.nn.Parameter(nn.fc1.weight.add(torch.Tensor(np.random.normal(size=np.shape(nn.fc1)))))
            nn.fc21.weight = torch.nn.Parameter(nn.fc21.weight.add(torch.Tensor(np.random.normal(size=np.shape(nn.fc21)))))
            nn.fc3.weight = torch.nn.Parameter(nn.fc3.weight.add(torch.Tensor(np.random.normal(size=np.shape(nn.fc3)))))

    def reprod(self):
        new = []
        for nn in self.population:
            new.append(copy.deepcopy(nn))
        return new

    def play(self, nn: NN):
        obs, _ = env.reset()
        obs = torch.tensor(obs, dtype=torch.float, device=nn.device)
        score = 0
        terminated = False
        truncated = False
        while not terminated and not truncated:
            action = self.pick_action(obs, nn)
            obs_, reward, terminated, truncated, _ = env.step(action)
            obs = torch.tensor(obs_, dtype=torch.float, device=nn.device)
            #display.clear_output(wait=True)
            score += 1
            #print("Score: " + str(score))
            #sleep(0.1)
        #env.close
        return score
            

In [122]:
def execute(reps=5, popSize=20, iter=500):
    agent = Agent(popSize)
    agent.train(iter)
    sum = 0
    for _ in range(reps):
        sum += agent.play(agent.population[0])
    print(sum/reps)

In [123]:
execute(popSize=20, iter=10)

0
1
2
3
4
5
6
7
8
9
73.8
