In [1]:
import time
import argparse
import sys
import torch
from os.path import join, exists
from os import mkdir
import gym
import import_ipynb
from gym.spaces import Discrete, Box
from typing import List, Any
import numpy as np
import torch.nn as nn
import multiprocess as mp

torch.set_num_threads(1)
gym.logger.set_level(40)

In [2]:
def hebbian_update_ABCD_lr_D_in(heb_coeffs, weights1_2, weights2_3, weights3_4, o0, o1, o2, o3):
       
        heb_offset = 0
        ## Layer 1         
        for i in range(weights1_2.shape[1]): 
            for j in range(weights1_2.shape[0]): 
                idx = (weights1_2.shape[0]-1)*i + i + j
                weights1_2[:,i][j] += heb_coeffs[idx][3] * ( heb_coeffs[idx][0] * o0[i] * o1[j]
                                                           + heb_coeffs[idx][1] * o0[i] 
                                                           + heb_coeffs[idx][2]         * o1[j]  + heb_coeffs[idx][4])

        heb_offset += weights1_2.shape[1] * weights1_2.shape[0]
        # Layer 2
        for i in range(weights2_3.shape[1]): 
            for j in range(weights2_3.shape[0]):
                idx = heb_offset + (weights2_3.shape[0]-1)*i + i+j
                weights2_3[:,i][j] += heb_coeffs[idx][3] * ( heb_coeffs[idx][0] * o1[i] * o2[j]
                                                           + heb_coeffs[idx][1] * o1[i] 
                                                           + heb_coeffs[idx][2]         * o2[j]  + heb_coeffs[idx][4])

        heb_offset += weights2_3.shape[1] * weights2_3.shape[0]
        # Layer 3
        for i in range(weights3_4.shape[1]): 
            for j in range(weights3_4.shape[0]): 
                idx = heb_offset + (weights3_4.shape[0]-1)*i + i+j 
                weights3_4[:,i][j] += heb_coeffs[idx][3] * ( heb_coeffs[idx][0] * o2[i] * o3[j]
                                                           + heb_coeffs[idx][1] * o2[i] 
                                                           + heb_coeffs[idx][2]         * o3[j]  + heb_coeffs[idx][4])
                
        return weights1_2, weights2_3, weights3_4

In [3]:
class MLP_heb(nn.Module):
    "MLP, no bias"
    def __init__(self, input_space, action_space):
        super(MLP_heb, self).__init__()

        self.fc1 = nn.Linear(input_space, 128, bias=False)
        self.fc2 = nn.Linear(128, 64, bias=False)
        self.fc3 = nn.Linear(64, action_space, bias=False)

    def forward(self, ob):
        import pickle
        with open('ob.pkl', 'wb') as f:
            pickle.dump(ob, f)
        new_ob = ob[0][0]

        if(type(new_ob) == np.float32):
            new_ob = ob[0]
        state = torch.as_tensor(new_ob).float().detach()
        # print(state)
        x1 = torch.tanh(self.fc1(state))   
        x2 = torch.tanh(self.fc2(x1))
        o = self.fc3(x2)  
         
        return state, x1, x2, o

In [4]:
def fitness_hebb(hebb_rule : str, environment : str, init_weights = 'uni' , *evolved_parameters: List[np.array]) -> float:
    """
    Evaluate an agent 'evolved_parameters' controlled by a Hebbian network in an environment 'environment' during a lifetime.
    The initial weights are either co-evolved (if 'init_weights' == 'coevolve') along with the Hebbian coefficients or randomly sampled at each episode from the 'init_weights' distribution. 
    Subsequently the weights are updated following the hebbian update mechanism 'hebb_rule'.
    Returns the episodic fitness of the agent.
    """
    
    def weights_init(m):
        if isinstance(m, torch.nn.Linear):
            if init_weights == 'xa_uni':  
                torch.nn.init.xavier_uniform(m.weight.data, 0.3)
            elif init_weights == 'sparse':  
                torch.nn.init.sparse_(m.weight.data, 0.8)
            elif init_weights == 'uni':  
                torch.nn.init.uniform_(m.weight.data, -0.1, 0.1)
            elif init_weights == 'normal':  
                torch.nn.init.normal_(m.weight.data, 0, 0.024)
            elif init_weights == 'ka_uni':  
                torch.nn.init.kaiming_uniform_(m.weight.data, 3)
            elif init_weights == 'uni_big':
                torch.nn.init.uniform_(m.weight.data, -1, 1)
            elif init_weights == 'xa_uni_big':
                torch.nn.init.xavier_uniform(m.weight.data)
            elif init_weights == 'ones':
                torch.nn.init.ones_(m.weight.data)
            elif init_weights == 'zeros':
                torch.nn.init.zeros_(m.weight.data)
            elif init_weights == 'default':
                pass
            
    # Unpack evolved parameters
    try: 
        hebb_coeffs, initial_weights_co = evolved_parameters
    except: 
        hebb_coeffs = evolved_parameters[0]

    
    with torch.no_grad():
                    
        # Load environment
        try:
            env = gym.make(environment, verbose = 0)
        except:
            env = gym.make(environment)
            
        # env.render()  # bullet envs
        print(len(env.observation_space.shape))
        print(isinstance(env.action_space, Box))
        print(isinstance(env.action_space, Discrete))
        
        if len(env.observation_space.shape) == 1:   
            pixel_env = False
            input_dim = env.observation_space.shape[0]
        elif len(env.observation_space.shape) == 0:   
            pixel_env = False
            input_dim = env.observation_space.n
            
        # Determine action space dimension

        if isinstance(env.action_space, Box):
            action_dim = env.action_space.shape[0]
        elif isinstance(env.action_space, Discrete):
            action_dim = env.action_space.n
        else:
            raise ValueError('Only Box and Discrete action spaces supported')
        
        # Initialise policy network
        p = MLP_heb(input_dim, action_dim)          
        
        
        # Initialise weights of the policy network with an specific distribution

        # Randomly sample initial weights from chosen distribution
        p.apply(weights_init)

        p = p.float()
        
        weights1_2, weights2_3, weights3_4 = list(p.parameters())
            
        
        # Convert weights to numpy so we can JIT them with Numba
        weights1_2 = weights1_2.detach().numpy()
        weights2_3 = weights2_3.detach().numpy()
        weights3_4 = weights3_4.detach().numpy()
        
        observation = env.reset() 
              
        # Normalize weights flag for non-bullet envs
        normalised_weights = True

        # Inner loop
        neg_count = 0
        rew_ep = 0
        t = 0
        while True:
            
            # For obaservation ∈ gym.spaces.Discrete, we one-hot encode the observation
            # print(f'length:{env.length},gravity:{env.gravity}')
            if isinstance(env.observation_space, Discrete): 
                observation = (observation == torch.arange(env.observation_space.n)).float()
            
            o0, o1, o2, o3 = p([observation])
            o0 = o0.numpy()
            o1 = o1.numpy()
            o2 = o2.numpy()
            
            # Bounding the action space
            if isinstance(env.action_space, Box):
                action = o3.numpy()                        
                action = np.clip(action, env.action_space.low, env.action_space.high)  
            elif isinstance(env.action_space, Discrete):
                action = np.argmax(o3).numpy()
            o3 = o3.numpy()

            
            # Environment simulation step
            observation, reward, done, dict = env.step(action)  
            rew_ep += reward   
                                       
            if done:
                break
            
            t += 1
            
            #### Episodic/Intra-life hebbian update of the weights
            if hebb_rule == 'ABCD_lr':
                weights1_2, weights2_3, weights3_4 = hebbian_update_ABCD_lr_D_in(hebb_coeffs, weights1_2, weights2_3, weights3_4, o0, o1, o2, o3)
            else:
                raise ValueError('The provided Hebbian rule is not valid')
                

            # Normalise weights per layer
            (a, b, c) = (0, 1, 2) if not pixel_env else (2, 3, 4)
            list(p.parameters())[a].data /= list(p.parameters())[a].__abs__().max()
            list(p.parameters())[b].data /= list(p.parameters())[b].__abs__().max()
            list(p.parameters())[c].data /= list(p.parameters())[c].__abs__().max()
        
            
        env.close()

    return rew_ep

In [5]:

def compute_ranks(x):
  """
  Returns rank as a vector of len(x) with integers from 0 to len(x)
  """
  assert x.ndim == 1
  ranks = np.empty(len(x), dtype=int)
  ranks[x.argsort()] = np.arange(len(x))
  return ranks

def compute_centered_ranks(x):
  """
  Maps x to [-0.5, 0.5] and returns the rank
  """
  y = compute_ranks(x.ravel()).reshape(x.shape).astype(np.float32)
  y /= (x.size - 1)
  y -= .5
  return y

def worker_process_hebb(arg):
    get_reward_func, hebb_rule,  eng,  init_weights, coeffs = arg
    
    wp = np.array(coeffs)
    decay = - 0.01 * np.mean(wp**2)
    r = get_reward_func( hebb_rule,  eng,  init_weights, coeffs) + decay
    
    return r 

def worker_process_hebb_coevo(arg): 
    get_reward_func,  hebb_rule,  eng,  init_weights, coeffs, coevolved_parameters = arg
    
    wp = np.array(coeffs)
    decay = - 0.01 * np.mean(wp**2)
    r = get_reward_func( hebb_rule,  eng,  init_weights, coeffs, coevolved_parameters) + decay

    return r 

            


def worker_process_hebb_coevo(arg): 
    get_reward_func,  hebb_rule,  eng,  init_weights, coeffs, coevolved_parameters = arg
    
    wp = np.array(coeffs)
    decay = - 0.01 * np.mean(wp**2)
    r = get_reward_func( hebb_rule,  eng,  init_weights, coeffs, coevolved_parameters) + decay

    return r 
class EvolutionStrategyHebb(object):
    def __init__(self, hebb_rule,  environment, init_weights = 'uni', population_size=100, sigma=0.1, learning_rate=0.2, decay=0.995, num_threads=1, distribution = 'normal'):
        
        self.hebb_rule = hebb_rule                     
        self.environment = environment                         
        self.init_weights = init_weights               
        self.POPULATION_SIZE = population_size
        self.SIGMA = sigma
        self.learning_rate = learning_rate            
        self.decay = decay
        self.num_threads = mp.cpu_count() if num_threads == -1 else num_threads
        self.update_factor = self.learning_rate / (self.POPULATION_SIZE * self.SIGMA)
        self.distribution = distribution                      

        # The number of hebbian coefficients per synapse
        if hebb_rule == 'ABCD_lr':                                           
            self.coefficients_per_synapse = 5
        else:
            raise ValueError('The provided Hebbian rule is not valid')
            
       
        # Look up observation and action space dimension
        env = gym.make(environment)    

        if len(env.observation_space.shape) == 1:   # State-based environment 
            self.pixel_env = False
            input_dim = env.observation_space.shape[0]
        elif isinstance(env.observation_space, Discrete):
            self.pixel_env = False
            input_dim = env.observation_space.n
        else:
            raise ValueError('Observation space not supported')

        if isinstance(env.action_space, Box):
            action_dim = env.action_space.shape[0]
        elif isinstance(env.action_space, Discrete):
            action_dim = env.action_space.n
        else:
            raise ValueError('Action space not supported')
                    
        # Initialize the values of hebbian coefficients and CNN parameters or initial weights of co-evolving initial weights   
                  
        # State-vector environments (MLP)            
        plastic_weights = (128*input_dim) + (64*128) + (action_dim*64)                                            #  Hebbian coefficients:  MLP x coefficients_per_synapse :plastic_weights x coefficients_per_synapse
        
        # Co-evolution of initial weights                
        
        # Random initial weights
        if self.distribution == 'uniform': 
            self.coeffs = np.random.uniform(-1,1,(plastic_weights, self.coefficients_per_synapse)) 
        elif self.distribution == 'normal':
            self.coeffs = torch.randn(plastic_weights, self.coefficients_per_synapse).detach().numpy().squeeze() 
                    
                    
                    
        # Load fitness function for the selected environment          
        self.get_reward = fitness_hebb
            
            
    def _get_params_try(self, w, p):

        param_try = []
        for index, i in enumerate(p):
            jittered = self.SIGMA * i
            param_try.append(w[index] + jittered)
        param_try = np.array(param_try).astype(np.float32)
        
        return param_try
        # return w + p*self.SIGMA

    def get_coeffs(self):
        return self.coeffs.astype(np.float32)

    def _get_population(self, coevolved_param = False): 
        
        # x_ = np.random.randn(int(self.POPULATION_SIZE/2), self.coeffs.shape[0], self.coeffs[0].shape[0])
        # population = np.concatenate((x_,-1*x_)).astype(np.float32)
        
        population = []
            
        if coevolved_param == False:
            for i in range( int(self.POPULATION_SIZE/2) ):
                x = []
                x2 = []
                for w in self.coeffs:
                    j = np.random.randn(*w.shape)             # j: (coefficients_per_synapse, 1) eg. (5,1)
                    x.append(j)                                                   # x: (coefficients_per_synapse, number of synapses) eg. (92690, 5)
                    x2.append(-j) 
                population.append(x)                                              # population : (population size, coefficients_per_synapse, number of synapses), eg. (10, 92690, 5)
                population.append(x2)
                
        elif coevolved_param == True:
            for i in range( int(self.POPULATION_SIZE/2) ):
                x = []
                x2 = []
                for w in self.initial_weights_co:
                    j = np.random.randn(*w.shape)
                    x.append(j)                    
                    x2.append(-j) 

                population.append(x)               
                population.append(x2)
                
        return np.array(population).astype(np.float32)


    def _get_rewards(self, pool, population):
        if pool is not None:

            worker_args = []
            for p in population:

                heb_coeffs_try1 = []
                for index, i in enumerate(p):
                    jittered = self.SIGMA * i
                    heb_coeffs_try1.append(self.coeffs[index] + jittered) 
                heb_coeffs_try = np.array(heb_coeffs_try1).astype(np.float32)

                worker_args.append( (self.get_reward, self.hebb_rule, self.environment,  self.init_weights,  heb_coeffs_try) )
                
            rewards  = pool.map(worker_process_hebb, worker_args)
            
        else:
            rewards = []
            for p in population:
                heb_coeffs_try = np.array(self._get_params_try(self.coeffs, p))
                rewards.append(self.get_reward( self.hebb_rule, self.environment,  self.init_weights, heb_coeffs_try))
        
        rewards = np.array(rewards).astype(np.float32)
        return rewards
    


    def _update_coeffs(self, rewards, population):
        rewards = compute_centered_ranks(rewards)

        std = rewards.std()
        if std == 0:
            raise ValueError('Variance should not be zero')
                
        rewards = (rewards - rewards.mean()) / std
                
        for index, c in enumerate(self.coeffs):
            layer_population = np.array([p[index] for p in population])
                      
            self.update_factor = self.learning_rate / (self.POPULATION_SIZE * self.SIGMA)                
            self.coeffs[index] = c + self.update_factor * np.dot(layer_population.T, rewards).T 

        if self.learning_rate > 0.001:
            self.learning_rate *= self.decay

        #Decay sigma
        if self.SIGMA>0.01:
            self.SIGMA *= 0.999        
            



    def run(self, iterations, print_step=10, path='heb_coeffs'):                                                    
        
        id_ = str(int(time.time()))
        if not exists(path + '/' + id_):
            mkdir(path + '/' + id_)
            
        print('Run: ' + id_ + '\n\n........................................................................\n')
            
        pool = mp.Pool(self.num_threads) if self.num_threads > 1 else None
        
        generations_rewards = []

        for iteration in range(iterations):                                                                         # Algorithm 2. Salimans, 2017: https://arxiv.org/abs/1703.03864

            population = self._get_population()                                                                 # Sample normal noise:         Step 5
            rewards = self._get_rewards(pool, population)                                                       # Compute population fitness:  Step 6
            self._update_coeffs(rewards, population)                                                            # Update coefficients:         Steps 8->12
                
                
            # Print fitness and save Hebbian coefficients and/or Coevolved / CNNs parameters
            if (iteration + 1) % print_step == 0:
                rew_ = rewards.mean()
                print('iter %4i | reward: %3i |  update_factor: %f  lr: %f | sum_coeffs: %i sum_abs_coeffs: %4i' % (iteration + 1, rew_ , self.update_factor, self.learning_rate, int(np.sum(self.coeffs)), int(np.sum(abs(self.coeffs)))), flush=True)
                
                if rew_ > 100:
                    torch.save(self.get_coeffs(),  path + "/"+ id_ + '/HEBcoeffs__' + self.environment + "__rew_" + str(int(rew_)) + '__' + self.hebb_rule + "__init_" + str(self.init_weights) + "__pop_" + str(self.POPULATION_SIZE) + '__coeffs' + "__{}.dat".format(iteration))
                        
                generations_rewards.append(rew_)
                np.save(path + "/"+ id_ + '/Fitness_values_' + id_ + '_' + self.environment + '.npy', np.array(generations_rewards))
       
        if pool is not None:
            pool.close()
            pool.join()

In [None]:



def main(argv):

    environment = 'CartPole-v1'
    hebb_rule = 'ABCD_lr' # A, AD, AD_lr, ABC, ABC_lr, ABCD, ABCD_lr, ABCD_lr_D_out, ABCD_lr_D_in_and_out
    popsize = 200 # Population size # 200
    lr = 0.2 # Learning rate
    decay = 0.995 # Learning rate decay
    sigma = 0.1 # Modulates the amount of noise used to populate each new generation
    init_weights = 'uni' # The distribution used to sample random weights from at each episode: uni, normal, default, xa_uni, sparse, ka_uni or coevolve to co-evolve the intial weights
    print_every = 1 # Print and save every N steps
    generations = 50 # Number of generations that the ES will run # 50
    threads = -1 # Number of threads used to run evolution in parallel: -1 uses all threads available
    folder = 'heb_coeffs' # Folder to store the evolved Hebbian coefficients
    distribution = 'normal' # Sampling distribution for initialize the Hebbian coefficients: normal, uniform


    if not exists(folder):
        mkdir(folder)
        
    # Initialise the EvolutionStrategy class
    print('\n\n........................................................................')
    print('\nInitilisating Hebbian ES for ' + environment + ' with ' + hebb_rule + ' Hebbian rule\n')
    es = EvolutionStrategyHebb(hebb_rule, environment, init_weights, population_size=popsize, sigma=sigma, learning_rate=lr, decay=decay, num_threads=threads, distribution=distribution)
    
    # Start the evolution
    print('\n........................................................................')
    print('\n ♪┏(°.°)┛┗(°.°)┓ Starting Evolution ┗(°.°)┛┏(°.°)┓ ♪ \n')
    tic = time.time()
    es.run(generations, print_step=print_every, path=folder)
    toc = time.time()
    print('\nEvolution took: ', int(toc-tic), ' seconds\n')
    
    
if __name__ == '__main__':
    main(sys.argv)




........................................................................

Initilisating Hebbian ES for CartPole-v1 with ABCD_lr Hebbian rule


........................................................................

 ♪┏(°.°)┛┗(°.°)┓ Starting Evolution ┗(°.°)┛┏(°.°)┓ ♪ 

Run: 1682844467

........................................................................

111

1
FalseFalse
FalseFalse



TrueTrueTrue1True




1False

FalseTrue
1

True
1False

TrueFalse

True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
1True
False
True

1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False1

FalseTrue

True


KeyboardInterrupt: 

1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
1
False
True
