The objective of this project is to solve the lunar landing game from OpenAI Gym using Deep Q-Learning.

For additional details and analysis of lunar_landing.ipynb, please refer to lunar_landing.pdf.

Steps to run the uploaded Jupyter Notebook, lunar_landing.ipynb, are shown below. This will create and run the Deep Q-learning algorithm to solve the Lunar Landing game from OpenAI Gym.

Run cell 1 to import all the necessary libraries
Run cell 2 to setup the neural network
Run cell 3 to setup the class and functions to train the model
Run cell 4 to train the model
Run cell 5 to create a graph to see the rewards per episode during training
Run cell 6 to save the model (if necessary)
Run cell 7 to load the model (if necessary)
Run cell 8 to create a graph using the trained model to see reward per episode'''

In [None]:
#pip3 install box2d-py
#!pip3 install gym[Box_2D]
import gym
import numpy as np
import torch
import torch.nn as nn
import random

env = gym.make("LunarLander-v2")

In [None]:
#HYPERPARAMETERS
batch_size = 32
input_size = 8
layer1_size = 32
layer2_size = 32
output_size = 4
lr = 0.001
replay_size = 10000


device = "cuda" if torch.cuda.is_available() else "cpu"
print('Using {} device'.format(device))

#simple neural network 
class network(nn.Module):
    def __init__(self, input_size, layer1_size, layer2_size, output_size):
        super(network,self).__init__()

        self.cnn = nn.Sequential(
            nn.Linear(input_size, layer1_size),
            nn.ReLU(),
            nn.Linear(layer1_size, layer2_size),
            nn.ReLU(),
            nn.Linear(layer2_size, output_size)
        )

    def forward(self,x):
        return self.cnn(x)

    def act(self,results):
        #choose option with highest value
        act = torch.argmax(results).item()
        return act

#trianing model that will be updated with each step
model = network(input_size, layer1_size, layer2_size, output_size)
#target or more stable model used to determine update in each step; only gets updated occasionally
target_model = network(input_size, layer1_size, layer2_size, output_size)
#both models will start with same parameters
target_model.load_state_dict(model.state_dict())

In [None]:
#use MSE for loss calculation
loss_fn = nn.MSELoss(reduction='sum')
#use Adam optimizer
optimizer = torch.optim.Adam(model.parameters(),lr=lr)

class Train:
    #initialization
    def __init__(self):
        self.alpha = 1
        self.gamma = 0.6
        self.epsilon = 1
        self.epsilon_decay = 0.9
        self.min_epsilon = 0.01

    #update model using replay_memory and Q learning
    def train(self,replay_memory,model,loss_fn,optimizer):
        x = []
        y = []
        #randomly sample replay_size amount of steps if replay_memory is greater than or equal to that amount
        if len(replay_memory) >= replay_size:
          batch = random.sample(replay_memory[-replay_size:],batch_size)
        #randomly sample batch_size amount of steps if replay_memory is greater than or equal to that amount
        elif len(replay_memory) >= batch_size:
          batch = random.sample(replay_memory,batch_size)
        #else just use all of the replay_memory since not enough data yet
        else:
          batch = replay_memory

        #current state of each episode 
        state_current = np.array([item[0] for item in batch])
        #next state of each episode
        state_next = np.array([item[1] for item in batch])

        #get prediction of action using training model and current state
        pred_current = model(torch.tensor(state_current))
        #get prediction of action using target model and next state
        pred_next = target_model(torch.tensor(state_next))

        #for each episode in the batch
        for i,(state,new_state,action,reward,done) in enumerate(batch):
            #when have not reached a terminal state
            if not done:
                #q(s,a) = reward + gamma * max q(s',a')
                fq_max = reward + self.gamma*torch.max(pred_next[i]).item()
            #when reached terminal state
            else:
                #q(s,a) = reward
                fq_max = reward

            q = pred_current[i]
            #update q value for current state and action
            q[action] = fq_max

            x.append(state) 
            y.append(q.detach().numpy()) 

        #update model with loss and backpropagation
        model.train()
        pred = model(torch.tensor(x))
        loss = loss_fn(pred,torch.tensor(y))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    #use trained model to predict next steps
    def test(self,episodes,target_model,loss_fn,test_reward):
        model.eval()

        with torch.no_grad():
            for ep in episodes:
                done = False
                score = 0
                state = env.reset()
                while not done:
                    a = target_model.act(target_model(torch.tensor(state)))

                    next_state, r, done, _ = env.step(a)

                    score += r

                    state = next_state
                test_reward.append(score)

    
    #Determine next action for each step and update models
    def update(self,model, loss_fn, optimizer,train_rewards):
        replay_memory = []
        steps_update = 0
        episodes = 0

        #Continue updates until reach terminal state
        while True:
            episodes += 1
            state = env.reset()
            score = 0
            done = False

            while not done:
                steps_update += 1
                #uncomment below line to watch visual
                #env.render()

                rand_num = np.random.random()
                #allow some randomness in choice of action based on if random number selected is less than or equal to epsilon
                if rand_num <= self.epsilon:
                    a = np.random.randint(4)
                else:
                  #choose action based on model
                  with torch.no_grad():
                    results = model(torch.tensor(state))
                  a = model.act(results)

                #using built in step function for gym, get new state, reward, and if reached terminal state based on action provided
                new_state, r, done, _ = env.step(a)

                #update overall score with reward value
                score += r

                #add to the replay memory
                replay_memory.append([state,new_state,a,r,done])

                #update current state with new state
                state = new_state

                #update training model after each step once the number of steps is greater than batch_size 
                #This allows there to be some data before the model is updated
                if steps_update >= batch_size:
                  Train.train(self,replay_memory,model,loss_fn,optimizer)
                
                #Every 90 steps, update the target model so it stays more stable than the training model
                if steps_update % 90 == 0:
                    target_model.load_state_dict(model.state_dict())

            train_rewards.append(score)
            print('score: {}'.format(score))
            #decay epsilon every 20 episodes to decrease the randomness in actions
            if episodes % 20 == 0 and episodes >= 21:
              self.epsilon = max(self.epsilon_decay*self.epsilon,self.min_epsilon)
            #achieved over 200 points which is the goal
            if score >= 200:
              print('score over 200')
            #printing progress
            if episodes % 100 == 0:
              print('EPISODE {}, Average score = {}, Epsilon: {}'.format(episodes,np.mean(train_rewards[-100:]),self.epsilon))
            #successful once reaches an average of over 200 points in the last 100 consecutive episodes
            if np.mean(train_rewards[-100:]) >= 200  or episodes == 1000:
              break
        #env.close()

In [None]:
#Run and train model
import time

start = time.time()
print('start time: ',start)
train_rewards = []
NN = Train()
NN.update(model,loss_fn,optimizer,train_rewards)
end = time.time()
print('end time: {} \n time elapsed: {}'.format(end, end-start))

In [None]:
import matplotlib.pyplot as plt

#plot to show reward per training episode
episodes = range(len(train_rewards))
avg = []
for i in range(len(train_rewards)):
    avg.append(np.mean(train_rewards[0:i]))
    

plt.figure(figsize=[8,6])
plt.plot(episodes,train_rewards)
plt.plot(episodes,avg, label='Running Average')
plt.xlabel('Training Episode')
plt.ylabel('Rewards Per Episode')
plt.title('Rewards Per Training Episode')

plt.savefig('rewards_per_episode.png')


In [None]:
#save model if needed
torch.save(target_model.state_dict(), 'lunar_landing_model.pth')

In [None]:
#load model if needed
target_model = network(input_size, layer1_size, layer2_size, output_size)
target_model.load_state_dict(torch.load('lunar_landing_model.pth'))
target_model.eval()

In [None]:
#plot for reward per episode for 100 consecutive episodes using you trained agent
episodes = range(100)

test_rewards = []
NN = Train()
NN.test(episodes,target_model,loss_fn,test_rewards)

plt.figure(figsize=[8,6])
plt.plot(episodes,test_rewards)
plt.xlabel('Episodes')
plt.ylabel('Rewards Per Episode')
plt.title('Rewards Per Episode With Trained Agent')

plt.savefig('test_rewards_per_episode.png')