In [None]:
import gym
import time
import numpy as np
import matplotlib.pyplot as plt

env = gym.make("MountainCar-v0")

# print(env.observation_space.high)
# print(env.observation_space.low)
# print(env.action_space.n)

LEARNING_RATE = 0.1
DISCOUNT = 0.95
EPISODES = 2000
SHOW_EVERY = 500
EPSILON = 0.5
START_EPSILON_DECAYING = 1
END_EPISION_DECAYING = EPISODES // 2

epsilon_decay_value = EPSILON/(END_EPISION_DECAYING - START_EPSILON_DECAYING)

#Q-table logic
DISCRETE_OS_SIZE = [20]*len(env.observation_space.high)
discrete_os_win_size = (env.observation_space.high-env.observation_space.low)/DISCRETE_OS_SIZE
#print(discrete_os_win_size)

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE+[env.action_space.n]))
# print(q_table.shape)

ep_rewards = []
aggr_ep_reward = {"ep": [], "avg": [], "min":[], "max":[]}


def get_discrete_state(state):
    discrete_state = (state-env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))


for episode in range(EPISODES):
    episode_reward = 0
    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False
    
    discrete_state = get_discrete_state(env.reset())
    # print(discrete_state)
    # print(np.argmax(q_table[discrete_state]))

    done = False 
    while not done:
        if np.random.random() > EPSILON:
            action = np.argmax(q_table[discrete_state])
        else:
            action = np.random.randint(0, env.action_space.n)
        state_prime, reward, done, _ = env.step(action)
        episode_reward += reward
        #print(reward,state_prime)
        new_discrete_state = get_discrete_state(state_prime)
        if render:
            env.render()
        if not done:
            max_feature_q = np.max(q_table[new_discrete_state])
            current_q = q_table[discrete_state + (action, )]

            new_q = (1-LEARNING_RATE)*current_q+LEARNING_RATE*(reward+DISCOUNT*max_feature_q)
            q_table[discrete_state+(action, )] = new_q

        elif state_prime[0] >= env.goal_position:
            q_table[discrete_state+(action, )] = 0

        discrete_state = new_discrete_state
    if END_EPISION_DECAYING >= episode >= START_EPSILON_DECAYING:
        EPSILON -= epsilon_decay_value
    ep_rewards.append(episode_reward)
    
    if not episode % SHOW_EVERY:
        average_reward = sum(ep_rewards[-SHOW_EVERY:])/len(ep_rewards[-SHOW_EVERY:])
        aggr_ep_reward["ep"].append(episode)
        aggr_ep_reward["avg"].append(average_reward)
        aggr_ep_reward["min"].append(min(ep_rewards[-SHOW_EVERY:]))
        aggr_ep_reward["max"].append(max(ep_rewards[-SHOW_EVERY:]))
        
        print(f"Episode:{episode} avg: {average_reward} min: {min(ep_rewards[-SHOW_EVERY:])} max: {max(ep_rewards[-SHOW_EVERY:])} state: {state_prime}")
              
    env.close()
              
plt.plot(aggr_ep_reward["ep"], aggr_ep_reward['avg'], label="avg")             
plt.plot(aggr_ep_reward["ep"], aggr_ep_reward['min'], label="min")             
plt.plot(aggr_ep_reward["ep"], aggr_ep_reward['max'], label="max")
plt.legend(loc=4)
plt.show()

0
Episode:0 avg: -200.0 min: -200.0 max: -200.0 state: [-0.56780628 -0.01296347]


## CARTPOLE 

In [4]:
import numpy as np
import time, gym
import matplotlib.pyplot as plt
import tqdm 
import torch 
import torch.nn as nn


In [5]:
env = gym.make("CartPole-v0")
env.seed()
print(env.observation_space)
print(env.action_space.n)

class create_cartpole(nn.Module):
    def __init__(self):
        super(create_cartpole, self).__init__()
        model  = [nn.Linear(32),
                 nn.ReLU()]
        self.a = nn.Sequential(*model)
        
    def forward(self, x):
        return self.a
    


Box(-3.4028234663852886e+38, 3.4028234663852886e+38, (4,), float32)
2


In [1]:
import gym
env = gym.make("Pendulum-v0")
done = False
print(env.observation_space.high)
print(env.observation_space.low)
print(env.action_space)
number = 0
while not done:
    env.reset()
    number += 1
    action = (1,)
    state_prime, reward, done, _ = env.step(action)
    env.render()
    done = True
    print(state_prime, reward, number)

[1. 1. 8.]
[-1. -1. -8.]
Box(-2.0, 2.0, (1,), float32)
[-0.50689903 -0.86200544 -0.64531948] -4.288245488572728 1


In [4]:
import gym 
env = gym.make("CartPole-v1")
done = False
print(env.observation_space.high)
print(env.observation_space.low)
print(env.action_space.sample())

# while not done:
#     env.reset()
#     action = np.random.randint(0,2)
#     state_prime, reward, done, _ = env.step(action)
#     env.render()

[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]
0


In [None]:
import gym
from gym import wrappers

EPISODES = 1000
avg_time = 0
max_time = -1
env = gym.make('CartPole-v1')

for i_episode in range(EPISODES):
    # instansiating the environment
    observation = env.reset()
    for t in range(1000):
        # uncomment this is you want to see the rendering 
        #env.render()
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        env.render()
        if done:
            avg_time = avg_time + t
            if t >max_time:
                max_time = t
                print(max_time)
            #print("Episode finished after {} timesteps".format(t+1))
            break
    # resetting the enviroment
    env.reset()
        

# printing the avg time the game lasted
avg_time = avg_time/EPISODES
print('avg time network survives : ', avg_time)

53
65
67
85


In [2]:

import gym
import random
import numpy as np 

env = gym.make('CartPole-v1').env
bestLength = 0
episode_length =[]
best_weights = np.zeros(4)
flag = 0
max_life = 1000

for i in range(10):
    new_weights = np.random.uniform(-1, 1, 4)
    length = []
    for j in range(500):
        observation = env.reset()
        done = False
        count = 0
        while not done:
            count = count +1
            action = 1 if np.dot(observation,new_weights) >0 else 0
            observation,reward,done,_ = env.step(action)
            
            if done:
                break
            elif count > max_life:
                flag =1
                break
        length.append(count)
    avg_length = float(sum(length) / len(length))

    if avg_length >bestLength:
        bestLength = avg_length
        best_weights = new_weights 
    episode_length.append(avg_length)
    if flag ==1:
        break

print(best_weights)


## testing
done=  False
count = 0
observation = env.reset()

while not done:
    count = count +1
    action = 1 if np.dot(observation,best_weights) >0 else 0
    observation,reward,done,_ = env.step(action)
    env.render()

    if done:
        break
print('with best weights, game lasted ',count , ' moves')

[0.51774453 0.36259707 0.84740929 0.8989602 ]
with best weights, game lasted  453  moves


In [1]:
import gym
from gym import wrappers

import numpy as np 
import random
import keras
from keras import backend as k
from keras.models import Sequential
from keras.layers import Dense, Activation,Dropout
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint
from keras.models import load_model

'''
NOTE
action:
0 for left 
1 for right
'''
checkpoint = ModelCheckpoint('model/model_dnn.h5', monitor='val_loss',verbose=1, save_best_only=True)
no_of_observations = 500
min_score = 100

# generate the training data 
def generate_training_data(no_of_episodes):
    print('generating training data')
    # initize the environment
    env = gym.make('CartPole-v1').env
    X = []
    y =[]
    left = 0
    right = 0

    for i_episode in range(no_of_episodes):
        prev_observation = env.reset()
        score = 0
        X_memory  = []
        y_memory = []
        for t in range(no_of_observations):
            action = random.randrange(0,2)
            
            ## debugging code
            '''
            if action == 0:
                left = left + 1
            else:
                right = right + 1
            '''
            new_observation,reward,done,info = env.step(action)
            score = score + reward
            X_memory.append(prev_observation)
            y_memory.append(action)
            prev_observation = new_observation
            if done:
                if score >min_score:
                    for data in X_memory:
                        X.append(data)
                    for data in y_memory:
                        y.append(data)
                    print('episode : ',i_episode, ' score : ',score)
                break
        env.reset()
    #debugging code
    '''
    print('left : ', left)
    print('right: ',right)
    '''
    # converting them into numpy array
    X = np.asarray(X)
    y =np.asarray(y) 

    # saving the numpy array
    np.save('data/X',X)
    np.save('data/y',y)
    
    # printing the size
    print('shape of X: ',X.shape)
    print('shape of target labels', y.shape)

# defines the model to be trained
def get_model():
    model = Sequential()
    model.add(Dense(128, input_dim=4))
    model.add(Activation('relu'))
    model.add(Dropout(0.5))

    model.add(Dense(256))
    model.add(Activation('relu'))
    model.add(Dropout(.5))
     
    model.add(Dense(512))
    model.add(Activation('relu'))
    model.add(Dropout(.5))

    model.add(Dense(256))
    model.add(Activation('relu'))
    model.add(Dropout(.5))

    model.add(Dense(128))
    model.add(Activation('relu'))
    model.add(Dropout(.5))

    model.add(Dense(1))
    model.add(Activation('sigmoid'))
    
    model.summary()
    model.compile(loss='mean_squared_error', optimizer='adam', metrics=['mse'])
    return model
    

# trains the model
def train_model(model):
    # loading the training data from the disk
    X= np.load('data/X.npy')
    y = np.load('data/y.npy')
    # making train test split 
    X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = .2, random_state = 42)
    print('X_train: ',X_train.shape)
    print('y_train:', y_train.shape)
    print('X_test: ', X_test.shape)
    print('y_test: ', y_test.shape)
    # training the model
    model.fit(X_train,y_train,validation_data = [X_test,y_test],verbose = 1,
    callbacks=[checkpoint],
    epochs= 20, batch_size = 10000,shuffle =True)
    # returns the model
    return model

# testing the model 
def testing(model):
    #model = load_model('model/model.h5')
    env = gym.make('CartPole-v1').env
    env= wrappers.Monitor(env, 'nn_files', force = True)
    observation = env.reset()
    no_of_rounds = 10
    max_rounds = no_of_rounds
    min_score = 1000000
    max_score = -1
    avg_score = 0

    # playing a number of games
    while (no_of_rounds > 0):
        # initial score
        score =0
        action = 0
        prev_obs = []
        while (True):
            env.render()
            if len(prev_obs) == 0:
                action = random.randrange(0,2)
            else:
                data = np.asarray(prev_obs)
                data = np.reshape(data, (1,4))
                output = model.predict(data)
                # checking if the required action is left or right
                if output[0][0] >= .5:
                    action = 1
                elif output[0][0] < .5:
                    action = 0
            
            new_observation, reward, done, info = env.step(action)
            prev_obs = new_observation
            # calculating total reward
            score = score  + reward 
            
            if done:
                # if the game is over
                print('game over!! your score is :  ',score)
                if score > max_score:
                    max_score = score
                elif score < min_score:
                    min_score = score
                avg_score +=score 
                env.reset()
                break
        no_of_rounds = no_of_rounds - 1
        # stats about scores 
        if no_of_rounds == 0:
            print('avg score : ',avg_score/max_rounds)
            print('max score: ', max_score)
            print('min score: ',min_score)

# calling the functions
generate_training_data(50000)
model = get_model()
model = train_model(model)
testing(model)


generating training data
episode :  2013  score :  108.0
episode :  2666  score :  110.0
episode :  4412  score :  122.0
episode :  4508  score :  107.0
episode :  6881  score :  125.0
episode :  13546  score :  102.0
episode :  15413  score :  102.0
episode :  19679  score :  110.0
episode :  20834  score :  103.0
episode :  21832  score :  106.0
episode :  22813  score :  111.0
episode :  23568  score :  113.0
episode :  24381  score :  105.0
episode :  24520  score :  118.0
episode :  24850  score :  106.0
episode :  27188  score :  101.0
episode :  27620  score :  110.0
episode :  29683  score :  105.0
episode :  31020  score :  102.0
episode :  31637  score :  101.0
episode :  34338  score :  123.0
episode :  44814  score :  109.0
episode :  45890  score :  128.0
episode :  46103  score :  108.0
episode :  46608  score :  114.0
episode :  47261  score :  107.0
episode :  47731  score :  138.0
shape of X:  (2994, 4)
shape of target labels (2994,)
Model: "sequential"
_______________

ValueError: in user code:

    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\keras\engine\training.py:1224 test_function  *
        return step_function(self, iterator)
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\keras\engine\training.py:1215 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:1211 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:2585 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\distribute\distribute_lib.py:2945 _call_for_each_replica
        return fn(*args, **kwargs)
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\keras\engine\training.py:1208 run_step  **
        outputs = model.test_step(data)
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\keras\engine\training.py:1174 test_step
        y_pred = self(x, training=False)
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\keras\engine\base_layer.py:975 __call__
        input_spec.assert_input_compatibility(self.input_spec, inputs,
    c:\users\acer\appdata\local\programs\python\python38\lib\site-packages\tensorflow\python\keras\engine\input_spec.py:155 assert_input_compatibility
        raise ValueError('Layer ' + layer_name + ' expects ' +

    ValueError: Layer sequential expects 1 inputs, but it received 2 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, 4) dtype=float32>, <tf.Tensor 'ExpandDims:0' shape=(None, 1) dtype=int32>]
