In [1]:
import numpy as np
import pygame
import matplotlib.pyplot as plt

pygame 1.9.4
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
ROWS = 200
COLS = 100

In [3]:
class Generator:
    
    #HELPFUL FUNCTIONS
    def widen_hole_transformation(self,racetrack,start_cell,end_cell):
        
        δ = 1
        while(1):
            if ((start_cell[1] < δ) or (start_cell[0] < δ)):
                racetrack[0:end_cell[0],0:end_cell[1]] = -1
                break

            if ((end_cell[1]+δ > COLS) or (end_cell[0]+δ > ROWS)):
                racetrack[start_cell[0]:ROWS,start_cell[1]:COLS] = -1
                break
                
            δ += 1

        return racetrack
    
    def calculate_valid_fraction(self, racetrack):
        '''
        Returns the fraction of valid cells in the racetrack
        '''
        return (len(racetrack[racetrack==0])/(ROWS*COLS))

    def mark_finish_states(self, racetrack):
        '''
        Marks finish states in the racetrack
        Returns racetrack
        '''
        last_col = racetrack[0:ROWS,COLS-1]
        last_col[last_col==0] = 2
        return racetrack
    
    def mark_start_states(self, racetrack):
        '''
        Marks start states in the racetrack
        Returns racetrack
        '''
        last_row = racetrack[ROWS-1,0:COLS]
        last_row[last_row==0] = 1
        return racetrack
    
    
    #CONSTRUCTOR
    def __init__(self):
        pass
    
    def generate_racetrack(self):
        '''
        racetrack is a 2d numpy array
        codes for racetrack:
            0,1,2 : valid racetrack cells
            -1: invalid racetrack cell
            1: start line cells
            2: finish line cells
        returns randomly generated racetrack
        '''
        racetrack = np.zeros((ROWS,COLS),dtype='int')
        
        frac = 1
        while frac > 0.5:    
            
            #transformation
            random_cell = np.random.randint((ROWS,COLS))
            random_hole_dims = np.random.randint((ROWS//4,COLS//4))
            start_cell = np.array([max(0,x - y//2) for x,y in zip(random_cell,random_hole_dims)])
            end_cell = np.array([min(z,x+y) for x,y,z in zip(start_cell,random_hole_dims,[ROWS,COLS])])
        
            #apply_transformation
            racetrack = self.widen_hole_transformation(racetrack, start_cell, end_cell)
            frac = self.calculate_valid_fraction(racetrack)
        
        racetrack = self.mark_start_states(racetrack)
        racetrack = self.mark_finish_states(racetrack)
        
        return racetrack

In [4]:
class Data:
    
    #HELPFUL FUNCTIONS
    def get_start_line(self):
        '''
        Gets start line
        '''
        self.start_line = np.array([np.array([ROWS-1,j]) for j in range(COLS) if self.racetrack[ROWS-1,j] == 1])
        
    def get_finish_line(self):
        '''
        Gets finish line
        '''
        self.finish_line = np.array([np.array([i,COLS-1]) for i in range(ROWS) if self.racetrack[i,COLS-1] == 2])
    
    #CONSTRUCTOR
    def __init__(self):
        '''
            racetrack: 2 dimensional numpy array
            Q(s,a): 5 dimensional numpy array
            C(s,a): 5 dimensional numpy array
            π: target policy
            start_line: start_line is the set of start states
            finish_line: finish_line is the set of finish states
            hyperparameters like ε
            episode to be an empty list
        '''
        self.load_racetrack()
        self.get_start_line()
        self.get_finish_line()
        self.load_Q_vals()
        self.load_C_vals()
        self.load_π()
        self.load_rewards()
        self.ε = 0.1
        self.γ = 1
        self.episode = dict({'S':[],'A':[],'probs':[],'R':[None]})
        
    def save_rewards(self,filename = 'rewards'):
        '''
        saves self.rewards in rewards.npy file
        '''
        self.rewards = np.array(self.rewards)
        np.save(filename,self.rewards)
        self.rewards = list(self.rewards)
        
    def load_rewards(self):
        '''
        loads rewards from rewards.npy file
        '''
        self.rewards = list(np.load('rewards.npy'))
        
    def save_π(self,filename = 'π.npy'):
        '''
        saves self.π in π.npy file
        '''
        np.save(filename,self.π)
        
    def load_π(self):
        '''
        loads π from π.npy file
        '''
        self.π = np.load('π.npy')
        
    def save_C_vals(self,filename = 'C_vals.npy'):
        '''
        saves self.C_vals in C_vals.npy file
        '''
        np.save(filename,self.C_vals)
        
    def load_C_vals(self):
        '''
        loads C_vals from C_vals.npy file
        '''
        self.C_vals = np.load('C_vals.npy')
        
    def save_Q_vals(self,filename = 'Q_vals.npy'):
        '''
        saves self.Q_vals in Q_vals.npy file
        '''
        np.save(filename,self.Q_vals)
        
    def load_Q_vals(self):
        '''
        loads Q_vals from Q_vals.npy file
        '''
        self.Q_vals = np.load('Q_vals.npy')
        
    def save_racetrack(self,filename = 'racetrack.npy'):
        '''
        saves self.racetrack in racetrack.npy file
        '''
        np.save(filename,self.racetrack)
        
    def load_racetrack(self):
        '''
        loads racetrack from racetrack.npy file
        '''
        self.racetrack = np.load('racetrack.npy')

In [5]:
class Environment:
    
    #HELPFUL FUNCTIONS
    
    def get_new_state(self, state, action):
        '''
        Get new state after applying action on this state
        Assumption: The car keeps on moving with the current velocity and then action is applied to 
        change the velocity
        '''
        new_state = state.copy()
        new_state[0] = state[0] - state[2]
        new_state[1] = state[1] + state[3]
        new_state[2] = state[2] + action[0]
        new_state[3] = state[3] + action[1]
        return new_state
    
    def select_randomly(self,NUMPY_ARR):
        '''
        Returns a value uniform randomly from NUMPY_ARR
        Here NUMPY_ARR should be 1 dimensional
        '''
        return np.random.choice(NUMPY_ARR)
    
    def set_zero(NUMPY_ARR):
        '''
        Returns NUMPY_ARR after making zero all the elements in it
        '''
        NUMPY_ARR[:] = 0
        return NUMPY_ARR
    
    def is_finish_line_crossed(self, state, action):
        '''
        Returns True if the car crosses the finish line
                False otherwise
        '''
        new_state = self.get_new_state(state, action)
        old_cell, new_cell = state[0:2], new_state[0:2]
        
        '''
        new_cell's row index will be less
        '''
        rows = np.array(range(new_cell[0],old_cell[0]+1))
        cols = np.array(range(old_cell[1],new_cell[1]+1))
        fin = set([tuple(x) for x in self.data.finish_line])
        row_col_matrix = [(x,y) for x in rows for y in cols]
        intersect = [x for x in row_col_matrix if x in fin]
        
        return len(intersect) > 0
    
    def is_out_of_track(self, state, action):
        '''
        Returns True if the car goes out of track if action is taken on state
                False otherwise
        '''
        new_state = self.get_new_state(state, action)
        old_cell, new_cell = state[0:2], new_state[0:2]
        
        if new_cell[0] < 0 or new_cell[0] >= ROWS or new_cell[1] < 0 or new_cell[1] >= COLS:
            return True
        
        else:
            return self.data.racetrack[tuple(new_cell)] == -1
    
    #CONSTRUCTOR
    def __init__(self, data, gen):
        '''
        initialize step_count to be 0
        '''
        self.data = data
        self.gen = gen
        self.step_count = 0
    
    #MEMBER FUNCTIONS
    
    def reset(self):
        self.data.episode = dict({'S':[],'A':[],'probs':[],'R':[None]})
        self.step_count = 0
    
    def start(self):
        '''
        Makes the velocity of the car to be zero
        Returns the randomly selected start state.
        '''
        state = np.zeros(4,dtype='int')
        state[0] = ROWS-1
        state[1] = self.select_randomly(self.data.start_line[:,1])
        '''
        state[2] and state[3] are already zero
        '''
        return state
    
    def step(self, state, action):
        '''
        Returns the reward and new state when action is taken on state
        Checks the following 2 cases maintaining the order:
            1. car finishes race by crossing the finish line
            2. car goes out of track
        Ends the episode by returning reward as None and state as usual (which will be terminating)
        '''
        self.data.episode['A'].append(action)
        reward = -1
        
        if (self.is_finish_line_crossed(state, action)):
            new_state = self.get_new_state(state, action)
            
            self.data.episode['R'].append(reward)
            self.data.episode['S'].append(new_state)
            self.step_count += 1
            
            return None, new_state
            
        elif (self.is_out_of_track(state, action)):
            new_state = self.start()
        else:
            new_state = self.get_new_state(state, action)
        
        self.data.episode['R'].append(reward)
        self.data.episode['S'].append(new_state)
        self.step_count += 1
        
        return reward, new_state

In [6]:
class Agent:
    
    #HELPFUL FUNCTIONS
    def possible_actions(self, velocity):
        '''
        *** Performs two tasks, can be split up ***
        Universe of actions:  α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
                            
        Uses constraints to filter out invalid actions given the velocity
        
        0 <= v_x < 5
        0 <= v_y < 5
        v_x and v_y cannot be made both zero (you can't take an action which would make them zero simultaneously)
        Returns list of possible actions given the velocity
        '''
        α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
        α = [np.array(x) for x in α]

        β = []
        for i,x in enumerate(α):
            new_vel = np.add(velocity,x)
            if (new_vel[0] < 5) and (new_vel[0] >= 0) and (new_vel[1] < 5) and (new_vel[1] >= 0) and ~(new_vel[0] == 0 and new_vel[1] == 0):
                β.append(i)
        β = np.array(β)
        
        return β
    
    def map_to_1D(self,action):
        α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
        for i,x in enumerate(α):
            if list(action) == list(x):
                return i
    
    def map_to_2D(self,action):
        α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
        return α[action]
    
    #CONSTRUCTOR
    def __init__(self):
        pass
    
    def get_action(self, state, policy):
        '''
        Returns action given state using policy
        '''
        return self.map_to_2D(policy(state, self.possible_actions(state[2:4])))

In [7]:
class Monte_Carlo_Control:
    
    #HELPFUL FUNCTIONS
    
    def evaluate_target_policy(self):
        env.reset()
        state = env.start()
        self.data.episode['S'].append(state)
        rew = -1
        while rew!=None:
            action = agent.get_action(state,self.generate_target_policy_action)
            rew, state = env.step(state,action)
            
        self.data.rewards.append(sum(self.data.episode['R'][1:]))
        
    
    def plot_rewards(self):
        ax, fig = plt.subplots(figsize=(30,15))
        x = np.arange(1,len(self.data.rewards)+1)
        plt.plot(x*10, self.data.rewards, linewidth=0.5, color = '#BB8FCE')
        plt.xlabel('Episode number', size = 20)
        plt.ylabel('Reward',size = 20)
        plt.title('Plot of Reward vs Episode Number',size=20)
        plt.xticks(size=20)
        plt.yticks(size=20)
        plt.savefig('RewardGraph.png')
        plt.close()
    
    def save_your_work(self):
        self.data.save_Q_vals()
        self.data.save_C_vals()
        self.data.save_π()
        self.data.save_rewards()
    
    def determine_probability_behaviour(self, state, action, possible_actions):
        best_action = self.data.π[tuple(state)]
        num_actions = len(possible_actions)
        
        if best_action in possible_actions:
            if action == best_action:
                prob = 1 - self.data.ε + self.data.ε/num_actions
            else:
                prob = self.data.ε/num_actions
        else:
            prob = 1/num_actions
        
        self.data.episode['probs'].append(prob)
    
    def generate_target_policy_action(self, state, possible_actions):
        '''
        Returns target policy action, takes state and
        returns an action using this policy
        '''
        if self.data.π[tuple(state)] in possible_actions:
            action = self.data.π[tuple(state)]
        else:
            action = np.random.choice(possible_actions)
            
        return action
    
    def generate_behavioural_policy_action(self, state, possible_actions):
        '''
        Returns behavioural policy action
        which would be ε-greedy π policy, takes state and
        returns an action using this ε-greedy π policy
        '''
        if np.random.rand() > self.data.ε and self.data.π[tuple(state)] in possible_actions:
            action = self.data.π[tuple(state)]
        else:
            action = np.random.choice(possible_actions)
        
        self.determine_probability_behaviour(state, action, possible_actions)
    
        return action
    
    #CONSTRUCTOR
    def __init__(self, data):
        '''
        Initialize, for all s ∈ S, a ∈ A(s):
            data.Q(s, a) ← arbitrary (done in Data)
            data.C(s, a) ← 0 (done in Data)
            π(s) ← argmax_a Q(s,a) 
            (with ties broken consistently) 
            (some consistent approach needs to be followed))
        '''
        self.data = data
        for i in range(ROWS):
            for j in range(COLS):
                if self.data.racetrack[i,j]!=-1:
                    for k in range(5):
                        for l in range(5):
                            self.data.π[i,j,k,l] = np.argmax(self.data.Q_vals[i,j,k,l])
    
    def control(self,env,agent):
        '''
        Performs MC control using episode list [ S0 , A0 , R1, . . . , ST −1 , AT −1, RT , ST ]
        G ← 0
        W ← 1
        For t = T − 1, T − 2, . . . down to 0:
            G ← γ*G + R_t+1
            C(St, At ) ← C(St,At ) + W
            Q(St, At ) ← Q(St,At) + (W/C(St,At))*[G − Q(St,At )]
            π(St) ← argmax_a Q(St,a) (with ties broken consistently)
            If At != π(St) then exit For loop
            W ← W * (1/b(At|St))        
        '''
        env.reset()
        state = env.start()
        self.data.episode['S'].append(state)
        rew = -1
        while rew!=None:
            action = agent.get_action(state,self.generate_behavioural_policy_action)
            rew, state = env.step(state,action)
        
        G = 0
        W = 1
        T = env.step_count
    
        for t in range(T-1,-1,-1):
            G = data.γ * G + self.data.episode['R'][t+1]
            S_t = tuple(self.data.episode['S'][t])
            A_t = agent.map_to_1D(self.data.episode['A'][t])
            
            S_list = list(S_t)
            S_list.append(A_t)
            SA = tuple(S_list)
            
            self.data.C_vals[SA] += W
            self.data.Q_vals[SA] += (W*(G-self.data.Q_vals[SA]))/(self.data.C_vals[SA])           
            self.data.π[S_t] = np.argmax(self.data.Q_vals[S_t])
            if A_t!=self.data.π[S_t]:
                break
            W /= self.data.episode['probs'][t]

In [8]:
class Visualizer:
    
    #HELPFUL FUNCTIONS
    
    def visualize_episode():
        for i in range(self.data.episode['S']):
            vis.visualize_racetrack(i)
    
    def create_window(self):
        '''
        Creates window and assigns self.display variable
        '''
        self.display = pygame.display.set_mode((self.width, self.height))
        pygame.display.set_caption("Racetrack")
    
    def setup(self):
        '''
        Does things which occur only at the beginning
        '''
        self.cell_edge = 5
        self.width = COLS*self.cell_edge
        self.height = ROWS*self.cell_edge
        self.create_window()
        self.window = True

    def close_window(self):
        self.window = False
        pygame.quit()

    def draw(self, state = np.array([])):
        self.display.fill(0)
        for i in range(ROWS):
            for j in range(COLS):
                if self.data.racetrack[i,j]!=-1:
                    if self.data.racetrack[i,j] == 0:
                        color = (255,0,0)
                    elif self.data.racetrack[i,j] == 1:
                        color = (255,255,0)
                    elif self.data.racetrack[i,j] == 2:
                        color = (0,255,0)
                    pygame.draw.rect(self.display,color,((j*self.cell_edge,i*self.cell_edge),(self.cell_edge,self.cell_edge)),1)
        
        if len(state)>0:
            pygame.draw.rect(self.display,(255,255,255),((state[1]*self.cell_edge,state[0]*self.cell_edge),(self.cell_edge,self.cell_edge)),0)
        
        pygame.display.update()
        
        global count
        
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.loop = False
                self.close_window()
                return 'stop'
            elif event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE:
                pygame.image.save(vis.display, str(count)+'.png')
                count += 1
                self.loop = False
                
        return None
        
    def visualize_racetrack(self, state = np.array([])):
        '''
        Draws Racetrack in a pygame window
        '''
        if not self.window:
            self.setup()
        self.loop = True
        while(self.loop):
            ret = self.draw(state)
            if ret is not None:
                return ret
    
    #CONSTRUCTOR
    def __init__(self,data):
        self.data = data
        self.window = False

In [9]:
data = Data()
gen = Generator()
env = Environment(data,gen)
mcc = Monte_Carlo_Control(data)
vis = Visualizer(data)
agent = Agent()

In [None]:
# Uncomment the below code blocks suitably to carry out tasks you need

In [10]:
# vis.visualize_racetrack()

In [11]:
# for i in range(50000):
#     mcc.control(env,agent)
    
#     if i%10 == 9:
#         mcc.evaluate_target_policy()
    
#     if i%100 == 99:
#         mcc.save_your_work()
#         mcc.plot_rewards()

In [12]:
# ch = 50
# S = sum(data.rewards[:ch])/ch

# R = []

# for i in range(ch,len(data.rewards)):
#     R.append(S)
#     S *= ch
#     S += data.rewards[i]
#     S -= data.rewards[i-ch]
#     S /= ch

# ax, fig = plt.subplots(figsize=(60,30))
# x = np.arange(1,len(R)+1)
# plt.plot(x*10, R, linewidth=1, color = '#BB8FCE')
# plt.xlabel('Episode number', size = 40)
# plt.ylabel('Reward',size = 40)
# plt.title('Plot of Reward vs Episode Number',size=40)
# plt.xticks(size=40)
# plt.yticks(size=40)
# plt.savefig('RewardGraph2.png')
# plt.close()

In [13]:
# gen = Generator()

# data.racetrack = gen.generate_racetrack()

# vis.visualize_racetrack()

# data.save_racetrack()

In [14]:
# data.Q_vals = np.random.rand(ROWS,COLS,5,5,9)*400 - 500

# data.rewards = []

# data.C_vals = np.zeros((ROWS,COLS,5,5,9))

# data.π = np.zeros((ROWS,COLS,5,5),dtype='int')


# data.save_Q_vals()
# data.save_C_vals()
# data.save_rewards()
# data.save_π()

In [15]:
# count = 0

In [16]:
# env.reset()
# state = env.start()
# mcc.data.episode['S'].append(state)
# rew = -1
# while rew!=None:
#     action = agent.get_action(state,mcc.generate_target_policy_action)
#     rew, state = env.step(state,action)

In [17]:
# data.episode['S']

[array([199,   4,   0,   0]),
 array([199,   4,   1,   1]),
 array([198,   5,   2,   0]),
 array([196,   5,   3,   1]),
 array([193,   6,   3,   1]),
 array([190,   7,   4,   1]),
 array([186,   8,   4,   2]),
 array([182,  10,   4,   3]),
 array([178,  13,   4,   3]),
 array([174,  16,   4,   2]),
 array([170,  18,   4,   2]),
 array([166,  20,   4,   2]),
 array([162,  22,   3,   2]),
 array([159,  24,   3,   3]),
 array([156,  27,   4,   2]),
 array([152,  29,   4,   1]),
 array([148,  30,   4,   2]),
 array([144,  32,   4,   3]),
 array([140,  35,   4,   3]),
 array([136,  38,   3,   2]),
 array([133,  40,   4,   3]),
 array([129,  43,   4,   4]),
 array([125,  47,   3,   3]),
 array([122,  50,   3,   3]),
 array([119,  53,   4,   4]),
 array([115,  57,   4,   4]),
 array([111,  61,   4,   4]),
 array([107,  65,   3,   3]),
 array([104,  68,   4,   4]),
 array([100,  72,   4,   4]),
 array([96, 76,  4,  4]),
 array([92, 80,  4,  4]),
 array([88, 84,  4,  4]),
 array([84, 88,  4,  3

In [18]:
# for i in data.episode['S']:
#     if vis.visualize_racetrack(i) == 'stop':
#         break
# vis.close_window()

In [20]:
# import imageio
# filenames = []

# for i in range(37):
#     filenames.append(str(i)+'.png')

# images = []
# for filename in filenames:
#     images.append(imageio.imread(filename))
# imageio.mimsave('movie.gif', images, duration = 0.25)