In [None]:
######### DRAFT ##################
Implementation of GYM's Multi_ENV_Per_Actor parameter with MLxE architecture.

- Implementation with RAY works
- To finalize the work VECTORIZED GYM ENVIRONMENTS needs custom RESET method to allow proper RESET at the end of the episode

See StackOverflow issue:
    https://stackoverflow.com/questions/75551863/vectorized-gym-environments-how-to-block-automatic-environment-reset-on-done-t
        
Possibel ways to fix it:
    - change the guts of the GYM package.
    - checkout gym3

In [None]:
from IPython.display import clear_output
from IPython.core.display import display, HTML

display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML("<style>.output_result { max-width:100% !important; }</style>"))

from warnings import simplefilter 
simplefilter(action='ignore', category=FutureWarning)

import ray, gym, time, math
ray.RAY_memory_monitor_refresh_ms = 0 # Should stop providing memory warinings!!! but it does not !!!! 

from ray.util.queue import Queue
from ray._private.utils import get_num_cpus

cores = get_num_cpus()
collect_obs = Queue()
collect_examples = Queue()

import tensorflow as tf
import tensorflow_addons as tfa 
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras import Model

import numpy as np

np.set_printoptions(formatter={'float': lambda x: "{0:0.20f}".format(x)})
np.set_printoptions(precision=4, floatmode="maxprec",suppress=True, linewidth=200)

args = {# SYSTEM Settings
        "executors_n": 1, #get_num_cpus(), # How many Executors are running in parallel

        # GYM Settings
        "max_episodes": 500, # 500 How many Episodes do you want to run?
        "num_envs_per_worker": 10, # how many environments to run per worker -- tested till 512!!!
        "env_name": 'CartPole-v1', # the name of the GYM Environment to run

        
        # MODEL Definition
        "state_n": 4, # value of env.observation_space() (*2 as current_state and next_state)
        "state_n_adj": 1, # all Task Specific Adjustments (*2 as current_state and next_state)
        "state_n_add": 4, # all additional step descriptions (Reward, Done, Action, Discounted Reward)
        "action_n": 2, # value of env.action_space()
        "common_layers_n": [128,256,128], # Number of Neurons in Common Layers of A3C Model
        "value_layers_n": [64,128,64], # Number of Neurons in Value Layers of A3C Model
        "policy_layers_n": [64,128,64], # Number of Neurons in Policy Layers of A3C Model
         
        # LEARNER Settings
        "batch_size": 128, # Number of examples per model update
        "model_alignment_frequency": 128, # frequency of synchronization of the Target Model with On-Line Model -- OPTIONAL
        "minimum_model_update_frequency": 64, # How Many Model Updates to run in each iteration at minimum.
        "epsilon_decay_policy":1, # Run EDP? 0 No, 1 Yes
        "epsilon": 0.1, # Original 0.1                 # e-greedy when exploring
        "epsilon_decay": 0.995, # Original 0.995         # epsilon decay r
        
        # LEARNING RATE Decceleration
        "lr_alpha": 0.0001, # Inital LR
        "lr_alpha_power": 0.998, #0.998 bast so far # Controls the pace of LR depreciation
        "lr_alpha_limit": 0.000001, # Lower limit on the LR
        
        # EXECUTOR Settings
        "internal_step_counter_limit": 50000, # 500000 limit of steps per episode 
        "experience_batch_size": 1024, # how many steps to save into the memory buffer from each run
        "experience_max_batch_size": 256, # Maximum number of cases to save to Memory Baffer from each experience (most recent) (tested 1024-64: 256 Best)
        
        # advarse STATE Probability
        "prob_advarse_state_initial": 0.2, # Probability of choosing advarse case when starting new episode
        "prob_advarse_state_type_multiplier": 0.0, #Adjustment to the Probability to control the ratio of issue types that are promoted through advarse initialization
        
        # REWARD Incentives
        "reward_negative": -10.0, #-10.0, # Override the GYM's default negative return
        
        # PRIORITY MEMORY BUFFER
        "pmb_cols": 5,  #
        "pmb_alpha": 0.9, # Not Implemented                          # priority parameter, alpha=[0, 0.4, 0.5, 0.6, 0.7, 0.8]
        "pmb_beta": 0.4, # Tested till 0.8 but best reuslts with 1.0 # importance sampling parameter, beta=[0, 0.4, 0.5, 0.6, 1]
        "pmb_beta_increment": 0.001, # Originally 0.001
        "pmb_td_error_margin": 0.01,                                # pi = |td_error| + margin
        "pmb_abs_td_error_upper": 1,
               
        }

@ray.remote
class Learner:
    def __init__(self):
        
        # Set Parameters
        self.executors_n = args["executors_n"]
        self.max_episodes = args["max_episodes"]
        self.env_name = args["env_name"]
        self.state_n = args["state_n"]
        self.action_n = args["action_n"]
        self.common_layers_n = args["common_layers_n"]
        self.value_layers_n = args["value_layers_n"]
        self.policy_layers_n = args["policy_layers_n"]
        self.batch_size = args["batch_size"]
        self.lr_alpha = args["lr_alpha"]
        self.lr_alpha_power = args["lr_alpha_power"]
        self.lr_alpha_limit = args["lr_alpha_limit"]
        self.prob_advarse_state_initial = args["prob_advarse_state_initial"]
        self.prob_advarse_state_type_multiplier = args["prob_advarse_state_type_multiplier"]
        self.internal_step_counter_limit = args["internal_step_counter_limit"]
        self.experience_batch_size = args["experience_batch_size"]
        self.reward_negative = args["reward_negative"]
        self.model_alignment_frequency = args["model_alignment_frequency"]
        self.experience_max_batch_size = args["experience_max_batch_size"]
        self.state_n_adj = args["state_n_adj"]
        self.state_n_add = args["state_n_add"]
        self.pmb_cols = args["pmb_cols"]
        self.pmb_alpha= args["pmb_alpha"]
        self.pmb_beta = args["pmb_beta"]
        self.pmb_beta_increment = args["pmb_beta_increment"]
        self.pmb_td_error_margin = args["pmb_td_error_margin"]
        self.pmb_abs_td_error_upper = args["pmb_abs_td_error_upper"]
        self.minimum_model_update_frequency = args["minimum_model_update_frequency"]
        self.epsilon_decay_policy = args["epsilon_decay_policy"]
        self.epsilon = args["epsilon"]
        self.epsilon_decay = args["epsilon_decay"]
        
        self.collect_examples = []     
        
        
        # Define BASE Model - Target
        self.inputs_base = tf.keras.Input(shape=(self.state_n+self.state_n_adj,))

        self.common_network_base = Dense(self.common_layers_n[0], activation='relu',name="1")(self.inputs_base)
        self.common_network_base = Dense(self.common_layers_n[1], activation='relu',name="2")(self.common_network_base)
        self.common_network_base = Dense(self.common_layers_n[2], activation='relu',name="3")(self.common_network_base)

        self.policy_network_base = Dense(self.policy_layers_n[0], activation='relu',name="7")(self.common_network_base)
        self.policy_network_base = Dense(self.policy_layers_n[1], activation='relu',name="8")(self.policy_network_base)
        self.policy_network_base = Dense(self.policy_layers_n[2], activation='relu',name="9")(self.policy_network_base)

        self.value_network_base = Dense(self.value_layers_n[0], activation='relu',name="4")(self.common_network_base)
        self.value_network_base = Dense(self.value_layers_n[1], activation='relu',name="5")(self.value_network_base)
        self.value_network_base = Dense(self.value_layers_n[2], activation='relu',name="6")(self.value_network_base)

        self.values_base = Dense(1,name="10")(self.value_network_base)            
        self.logits_base = Dense(self.action_n,name="11")(self.policy_network_base)

        self.model_base = Model(inputs=self.inputs_base, outputs=[self.values_base, self.logits_base])


        # Define MAIN Model - Trainable Model
        self.inputs_main = tf.keras.Input(shape=(self.state_n+self.state_n_adj,))
        self.common_network_main = Dense(self.common_layers_n[0], activation='relu')(self.inputs_main)
        self.common_network_main = Dense(self.common_layers_n[1], activation='relu')(self.common_network_main)
        self.common_network_main = Dense(self.common_layers_n[2], activation='relu')(self.common_network_main)

        self.policy_network_main = Dense(self.policy_layers_n[0], activation='relu')(self.common_network_main)
        self.policy_network_main = Dense(self.policy_layers_n[1], activation='relu')(self.policy_network_main)
        self.policy_network_main = Dense(self.policy_layers_n[2], activation='relu')(self.policy_network_main)

        self.value_network_main = Dense(self.value_layers_n[0], activation='relu')(self.common_network_main)
        self.value_network_main = Dense(self.value_layers_n[1], activation='relu')(self.value_network_main)
        self.value_network_main = Dense(self.value_layers_n[2], activation='relu')(self.value_network_main)

        self.values_main = Dense(1)(self.value_network_main)
        self.logits_main = Dense(self.action_n)(self.policy_network_main)
        
        self.model_main = Model(inputs=self.inputs_main, outputs=[self.values_main, self.logits_main])

        # Define Optimizer
        self.optimizer = tfa.optimizers.RectifiedAdam(self.lr_alpha)
        
        self.executor_model = self.model_main.get_weights()

        #if counter_learninig % model_alignment_frequency == 0:
        self.model_base.set_weights(self.model_main.get_weights())    

        #executor_model.append(model_main.get_weights()) # the first call MUST be append to create the entry [0]
        #print("Saved Model", worker, len(executor_model))

        self.memory_buffer = np.full((self.state_n+self.state_n_adj) * 2 + self.state_n_add + self.pmb_cols,0.0)
        self.memory_buffer = []        

        # GLOBAL COUNTERS -- possibly tobe moved to a separate worker
        self.counter_learninig = 0
        self.episode_counter = 0
        self.executor_counter = 0        
        self.steps_counter = 0
        self.internal_step_counter_best = 0
        
    def get_executor_model(self):
        return self.executor_model

    def get_base_model_weights(self):
        return self.model_base.get_weights()

    def get_main_model_weights(self):
        return self.model_main.get_weights()
    
    def increase_counter_learninig(self):
        self.counter_learninig += 1

    def increase_episode_counter(self):
        self.episode_counter += 1

    def increase_executor_counter(self):
        self.executor_counter += 1
        
    def increase_steps_counter(self, value):
        self.steps_counter += value

    def set_internal_step_counter_best(self, value):
        self.internal_step_counter_best = value
    
    def reset_counter_learninig(self):
        self.counter_learninig = 0

    def reset_episode_counter(self):
        self.episode_counter = 0
        
    def reset_executor_counter(self):
        self.executor_counter = 0        
        
    def reset_steps_counter(self):
        self.steps_counter = 0
        
    def get_counter_learninig(self):
        return self.counter_learninig

    def get_episode_counter(self):
        return self.episode_counter
        
    def get_executor_counter(self):
        return self.executor_counter
    
    def get_steps_counter(self):
        return self.steps_counter

    def get_internal_step_counter_best(self):
        return self.internal_step_counter_best

    def train(self):
        #print("LEARNER.train")
        #print("LEARNER RUN", self.episode_counter)

        # Adjust Monotonically Decreasing Learning Rate
        self.next_lr_alpha = self.lr_alpha * np.power(self.lr_alpha_power, self.episode_counter)
        if self.next_lr_alpha < self.lr_alpha_limit:
            self.next_lr_alpha = self.lr_alpha_limit

        self.optimizer.learning_rate = self.next_lr_alpha
        
        #print("next_lr_alpha", self.next_lr_alpha)

        # Initialize LEARNER
        #while self.collect_examples.qsize() > 0: # and episod_counter.value < max_episodes:

        #print("LEARNER len(self.collect_examples)", len(self.collect_examples))
        for i in self.memory_buffer:

            self.example = i
            #print("LEARNER self.example", self.example.shape)
            
            #print("WEIGHTS BASE", self.model_base.get_weights()[0][0][0])
            #print("WEIGHTS MAIN", self.model_main.get_weights()[0][0][0])
            
            with tf.GradientTape() as tape:
                #print("CS", example[:,:5])
                self.values, self.logits = self.model_base(tf.convert_to_tensor(self.example[:,:5], dtype=tf.float32))
                #print("Disc Reward", example[:,-(pmb_cols+1)])
                self.advantage = tf.convert_to_tensor(np.expand_dims(self.example[:,-(self.pmb_cols+1)],axis=1), dtype=tf.float32) - self.values
                self.value_loss = self.advantage ** 2 # this is a term to be minimized in trainig 
                self.policy = tf.nn.softmax(self.logits)
                self.entropy = tf.reshape(tf.nn.softmax_cross_entropy_with_logits(labels=self.policy, logits=self.logits), [-1,1])
                #print("Action", example[:,-(pmb_cols+2)])
                self.policy_loss = tf.reshape(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=list(self.example[:,-(self.pmb_cols+2)].astype(int)), logits=self.logits), [-1,1])            
                self.policy_loss *= tf.stop_gradient(self.advantage) # advantage will be exluded from computation of the gradient; thsi allows to treat the values as constants
                self.policy_loss -= 0.01 * self.entropy # entropy adjustment for better exploration 
                self.total_loss = tf.reduce_mean((0.5 * self.value_loss + self.policy_loss))


            self.grads = tape.gradient(self.total_loss, self.model_base.trainable_weights)
            self.optimizer.apply_gradients(zip(self.grads, self.model_main.trainable_weights))
            
            self.counter_learninig += 1

            self.model_base.set_weights(self.model_main.get_weights()) ### the THREADED IMPLEMENTATION IS SYNCHRONIZED AT EACH STEP!!!

        self.executor_model = self.model_main.get_weights()
        
        #print("LEARNER T BASE", self.model_base.get_weights()[0][0][:5])
        #print("LEARNER T MAIN", self.model_main.get_weights()[0][0][:5])
        
        #print(type(self.executor_model))
        #print("LEARNER T EXECUTOR", self.executor_model[0][0][:5])        

        #if counter_learninig % model_alignment_frequency == 0:
        self.model_base.set_weights(self.model_main.get_weights())

        print("LEARINING ITERATION:", self.counter_learninig,"\n")
        
        #self.steps_counter = 0
        self.executor_counter = 0
        
        return None
    
    def update_memory_buffer(self, experience):
        
        #self.memory_buffer.append(experience)
        self.memory_buffer = experience
        #print("LEARNER UPDATE MB", len(self.memory_buffer))
        
@ray.remote
class Memorizer:
    def __init__(self, learner):
        
        # Set Parameters
        self.executors_n = args["executors_n"]
        self.max_episodes = args["max_episodes"]
        self.env_name = args["env_name"]
        self.state_n = args["state_n"]
        self.action_n = args["action_n"]
        self.common_layers_n = args["common_layers_n"]
        self.value_layers_n = args["value_layers_n"]
        self.policy_layers_n = args["policy_layers_n"]
        self.batch_size = args["batch_size"]
        self.lr_alpha = args["lr_alpha"]
        self.lr_alpha_power = args["lr_alpha_power"]
        self.lr_alpha_limit = args["lr_alpha_limit"]
        self.prob_advarse_state_initial = args["prob_advarse_state_initial"]
        self.prob_advarse_state_type_multiplier = args["prob_advarse_state_type_multiplier"]
        self.internal_step_counter_limit = args["internal_step_counter_limit"]
        self.experience_batch_size = args["experience_batch_size"]
        self.reward_negative = args["reward_negative"]
        self.model_alignment_frequency = args["model_alignment_frequency"]
        self.experience_max_batch_size = args["experience_max_batch_size"]
        self.state_n_adj = args["state_n_adj"]
        self.state_n_add = args["state_n_add"]
        self.pmb_cols = args["pmb_cols"]
        self.pmb_alpha= args["pmb_alpha"]
        self.pmb_beta = args["pmb_beta"]
        self.pmb_beta_increment = args["pmb_beta_increment"]
        self.pmb_td_error_margin = args["pmb_td_error_margin"]
        self.pmb_abs_td_error_upper = args["pmb_abs_td_error_upper"]
        self.minimum_model_update_frequency = args["minimum_model_update_frequency"]
        self.epsilon_decay_policy = args["epsilon_decay_policy"]
        self.epsilon = args["epsilon"]
        self.epsilon_decay = args["epsilon_decay"]
        
        self.memory_buffer = np.full((self.state_n + self.state_n_adj) * 2 + self.state_n_add + self.pmb_cols,0.0)
        self.memory_buffer[-1] = 99 
        self.collect_obs = []
        self.worker = "Memorizer"
        
        # Define BASE Model - Target
        self.inputs_base = tf.keras.Input(shape=(self.state_n+self.state_n_adj,))

        self.common_network_base = Dense(self.common_layers_n[0], activation='relu',name="1")(self.inputs_base)
        self.common_network_base = Dense(self.common_layers_n[1], activation='relu',name="2")(self.common_network_base)
        self.common_network_base = Dense(self.common_layers_n[2], activation='relu',name="3")(self.common_network_base)

        self.policy_network_base = Dense(self.policy_layers_n[0], activation='relu',name="7")(self.common_network_base)
        self.policy_network_base = Dense(self.policy_layers_n[1], activation='relu',name="8")(self.policy_network_base)
        self.policy_network_base = Dense(self.policy_layers_n[2], activation='relu',name="9")(self.policy_network_base)

        self.value_network_base = Dense(self.value_layers_n[0], activation='relu',name="4")(self.common_network_base)
        self.value_network_base = Dense(self.value_layers_n[1], activation='relu',name="5")(self.value_network_base)
        self.value_network_base = Dense(self.value_layers_n[2], activation='relu',name="6")(self.value_network_base)

        self.values_base = Dense(1,name="10")(self.value_network_base)            
        self.logits_base = Dense(self.action_n,name="11")(self.policy_network_base)

        self.model_base = Model(inputs=self.inputs_base, outputs=[self.values_base, self.logits_base])   
        self.base_model_weights = ray.get(learner.get_base_model_weights.remote())
        self.model_base.set_weights(self.base_model_weights)
        
        # Define MAIN Model - Trainable Model
        self.inputs_main = tf.keras.Input(shape=(self.state_n+self.state_n_adj,))
        self.common_network_main = Dense(self.common_layers_n[0], activation='relu')(self.inputs_main)
        self.common_network_main = Dense(self.common_layers_n[1], activation='relu')(self.common_network_main)
        self.common_network_main = Dense(self.common_layers_n[2], activation='relu')(self.common_network_main)

        self.policy_network_main = Dense(self.policy_layers_n[0], activation='relu')(self.common_network_main)
        self.policy_network_main = Dense(self.policy_layers_n[1], activation='relu')(self.policy_network_main)
        self.policy_network_main = Dense(self.policy_layers_n[2], activation='relu')(self.policy_network_main)

        self.value_network_main = Dense(self.value_layers_n[0], activation='relu')(self.common_network_main)
        self.value_network_main = Dense(self.value_layers_n[1], activation='relu')(self.value_network_main)
        self.value_network_main = Dense(self.value_layers_n[2], activation='relu')(self.value_network_main)

        self.values_main = Dense(1)(self.value_network_main)
        self.logits_main = Dense(self.action_n)(self.policy_network_main)
        
        self.model_main = Model(inputs=self.inputs_main, outputs=[self.values_main, self.logits_main])
        self.main_model_weights = ray.get(learner.get_main_model_weights.remote())
        self.model_main.set_weights(self.main_model_weights) 
        
        #self.experience_length_all = np.array([]).reshape(0, self.executors_n)
        self.experience_length_all = []
        self.experience_length = []
        
    
    def get_experience_length_all(self):
        return self.experience_length_all[1:]
    
    def get_collected_obs(self):
        return self.collect_obs
        
    def collect(self, learner, experience):
        #print("MEMORIZER.collect")
        
        #print("MEMORIZER.collect - len(experience)", len(experience), "experience")
        self.collect_obs.append(experience)
        
        #print("MEMORIZER - EXECUTORS", ray.get(learner.get_executor_counter.remote()))
        
        if ray.get(learner.get_executor_counter.remote()) < self.executors_n:
            #print("WAITING IN MEMORIZER 1", ray.get(learner.get_executor_counter.remote()), ray.get(learner.get_steps_counter.remote()), ray.get(learner.get_episode_counter.remote()))
            pass
        else:
            #print("WAITING IN MEMORIZER 2")
            #print("MEMORY UPDATE", self.collect_obs)
            self.memory_update(learner, self.collect_obs)
            #ray.get(learner.update_memory_buffer.remote(self.memory_buffer))
            
    def collect_length(self, experience_length):
        self.experience_length.append(experience_length)
        #print("MEMORIZER.collect_length")
            
    #def memory_update(self, learner, collect_obs, collect_examples):
    def memory_update(self, learner, collect_obs):
        #print("MEMORIZER.memory_update")
        
        while ray.get(learner.get_executor_counter.remote()) < self.executors_n:
            pass
        
        self.base_model_weights = ray.get(learner.get_base_model_weights.remote())
        self.model_base.set_weights(self.base_model_weights)
        
        self.main_model_weights = ray.get(learner.get_main_model_weights.remote())
        self.model_main.set_weights(self.main_model_weights)
        
        #print("MEMORIZER MU BASE", self.model_base.get_weights()[0][0][:5])
        #print("MEMORIZER MU MAIN", self.model_main.get_weights()[0][0][:5])
        
        self.mem_counter = 0
        
        #self.experience_length = np.zeros((1,self.executors_n))
        #self.experience_length = []
        
        #print("BEFORE", len(self.experience_length))
        
        #print("collect_obs", len(collect_obs[0]), "collect_obs")
        #print("BEFORE MEMORY BUFFER", self.memory_buffer.shape, self.memory_buffer)
        for i in collect_obs[0]:
            self.mem_counter += 1
            self.exp_temp = i
            
            #print("self.exp_temp", self.exp_temp.shape, "self.exp_temp", "self.exp_temp")
            
            #self.experience_length[0,self.mem_counter] = self.exp_temp.shape[0]
            #self.experience_length.append(self.exp_temp.shape[0])

            #print(self.worker, "self.exp_temp", type(self.exp_temp), self.exp_temp)

            #print(self.worker, mem_counter, "memory_buffer B_STACK", self.memory_buffer.shape)
            #print(self.worker, mem_counter, "exp_temp B_STACK", self.exp_temp.shape)

            self.memory_buffer = np.vstack((self.memory_buffer, self.exp_temp))
            #print(self.worker, self.mem_counter, "memory_buffer A_STACK", self.memory_buffer.shape)
            #print(self.worker, mem_counter, "MIN INPUTS", self.memory_buffer.shape[0], self.experience_batch_size * self.executors_n) 
            
            self.memory_buffer = self.memory_buffer[-np.minimum(self.memory_buffer.shape[0], self.experience_batch_size * self.executors_n):,:]

            #print(self.worker, self.mem_counter, "memory_buffer A_MIN", self.memory_buffer.shape)
        
        #self.experience_length_all = np.vstack([self.experience_length_all, self.experience_length])
        
        if self.memory_buffer[0,-1] == 99:
            self.memory_buffer = self.memory_buffer[1:,:]
            
        #print("AFTER MEMORY BUFFER", self.memory_buffer.shape, self.memory_buffer)
        #print("AFTER", len(self.experience_length))
        
        self.experience_length_all.append(self.experience_length)
        
        # PRIORITY MEMORY BUFFER

        # Inverse Discounted Reward Probability
        self.dr_min = np.min(self.memory_buffer[:,-(self.pmb_cols+1)])
        self.memory_buffer[:,-(self.pmb_cols)] = self.memory_buffer[:,-(self.pmb_cols+1)] - self.dr_min
        self.dr_max = np.max(self.memory_buffer[:,-(self.pmb_cols)])
        self.memory_buffer[:,-(self.pmb_cols)] = 1 - self.memory_buffer[:,-(self.pmb_cols)] / self.dr_max + 0.01
        self.dr_sum = np.sum(self.memory_buffer[:,-(self.pmb_cols)])
        self.memory_buffer[:,-(self.pmb_cols)] = self.memory_buffer[:,-(self.pmb_cols)] / self.dr_sum

        # Inverse "Age" Probability
        self.memory_buffer[:,-(self.pmb_cols-1)] += 1
        self.age_max = np.max(self.memory_buffer[:,-(self.pmb_cols-1)])
        self.memory_buffer[:,-(self.pmb_cols-2)] = self.age_max - self.memory_buffer[:,-(self.pmb_cols-1)] + 1.0
        self.age_sum = np.sum(self.memory_buffer[:,-(self.pmb_cols-2)])
        self.memory_buffer[:,-(self.pmb_cols-2)] = self.memory_buffer[:,-(self.pmb_cols-2)] / self.age_sum

        # Proportional TD Error Probability                
        self.target_q, self.target_logits = self.model_base(tf.convert_to_tensor(self.memory_buffer[:,5:10], dtype=tf.float32))
        self.td_target = np.expand_dims(self.memory_buffer[:,10],axis = -1) + 0.9 * self.target_q * np.expand_dims((1 - self.memory_buffer[:,11]),axis = -1)
        self.predict_q, self.predict_logits = self.model_main(tf.convert_to_tensor(self.memory_buffer[:,:5], dtype=tf.float32))
        self.abs_td_error = np.abs(self.td_target - self.predict_q) + self.pmb_td_error_margin
        self.clipped_td_error = np.where(self.abs_td_error < self.pmb_abs_td_error_upper, self.abs_td_error, self.pmb_abs_td_error_upper)                
        self.memory_buffer[:,-(self.pmb_cols-3)] = self.clipped_td_error[:,0]
        self.td_error_sum = np.sum(self.memory_buffer[:,-(self.pmb_cols-3)])
        self.memory_buffer[:,-(self.pmb_cols-3)] = self.memory_buffer[:,-(self.pmb_cols-3)] / self.td_error_sum

        self.memory_buffer[:,-1] = np.average(self.memory_buffer[:,[-(self.pmb_cols-2),-(self.pmb_cols-3)]], axis = 1 ) # Best! 280-310
        self.pmb_beta = min(1., self.pmb_beta + self.pmb_beta_increment * self.executors_n)
        self.memory_buffer[:,-1] = np.power(self.memory_buffer[:,-1], self.pmb_beta)
        self.total_error_sum = np.sum(self.memory_buffer[:,-1])
        self.memory_buffer[:,-1] = self.memory_buffer[:,-1] / self.total_error_sum

        self.prob_sum_check1 = np.sum(self.memory_buffer[:,-(self.pmb_cols)])
        self.prob_sum_check2 = np.sum(self.memory_buffer[:,-(self.pmb_cols-2)])
        self.prob_sum_check3 = np.sum(self.memory_buffer[:,-(self.pmb_cols-3)])
        self.prob_sum_check = np.sum(self.memory_buffer[:,-1])

        self.batch_size_min = np.minimum(self.batch_size,self.memory_buffer.shape[0])
        self.runs = self.memory_buffer.shape[0] // np.minimum(self.memory_buffer.shape[0], self.batch_size_min) + 1
        self.runs = np.maximum(self.minimum_model_update_frequency, self.runs)
        
        collect_samples = []
        
        for i in range(self.runs):

            self.sample_index = np.random.choice(self.memory_buffer.shape[0],
                                            np.minimum(self.memory_buffer.shape[0], self.batch_size_min),
                                            p = self.memory_buffer[:,-1],
                                            replace=False)
            
            self.sample = self.memory_buffer[self.sample_index, :]
            
            #print("self.sample", self.sample.shape, "self.sample")

            collect_samples.append(self.sample)
        
        #print("self.sample", self.sample)
        
        self.collect_obs = []
        self.experience_length = []
        
        #print("MEMORIZER len(collect_examples)", len(collect_examples))
        ray.get(learner.update_memory_buffer.remote(collect_samples))
        #print("MEMORIZER Before TRAIN")
        ray.get(learner.train.remote())
        #print("MEMORIZER After TRAIN")
    
@ray.remote
class Executor:
    #def __init__(self, memorizer, learner):
    def __init__(self, i, args):
        
        # Set Parameters
        self.executors_n = args["executors_n"]
        self.max_episodes = args["max_episodes"]
        self.env_name = args["env_name"]
        self.state_n = args["state_n"]
        self.action_n = args["action_n"]
        self.common_layers_n = args["common_layers_n"]
        self.value_layers_n = args["value_layers_n"]
        self.policy_layers_n = args["policy_layers_n"]
        self.batch_size = args["batch_size"]
        self.lr_alpha = args["lr_alpha"]
        self.lr_alpha_power = args["lr_alpha_power"]
        self.lr_alpha_limit = args["lr_alpha_limit"]
        self.prob_advarse_state_initial = args["prob_advarse_state_initial"]
        self.prob_advarse_state_type_multiplier = args["prob_advarse_state_type_multiplier"]
        self.internal_step_counter_limit = args["internal_step_counter_limit"]
        self.experience_batch_size = args["experience_batch_size"]/16
        self.reward_negative = args["reward_negative"]
        self.model_alignment_frequency = args["model_alignment_frequency"]
        self.experience_max_batch_size = args["experience_max_batch_size"]
        self.state_n_adj = args["state_n_adj"]
        self.state_n_add = args["state_n_add"]
        self.pmb_cols = args["pmb_cols"]
        self.pmb_alpha= args["pmb_alpha"]
        self.pmb_beta = args["pmb_beta"]
        self.pmb_beta_increment = args["pmb_beta_increment"]
        self.pmb_td_error_margin = args["pmb_td_error_margin"]
        self.pmb_abs_td_error_upper = args["pmb_abs_td_error_upper"]
        self.minimum_model_update_frequency = args["minimum_model_update_frequency"]
        self.epsilon_decay_policy = args["epsilon_decay_policy"]
        self.epsilon = args["epsilon"]
        self.epsilon_decay = args["epsilon_decay"]
        self.num_envs_per_worker = args["num_envs_per_worker"]
        
        self.collect_examples = collect_examples
        
        # Establish Environment
        #self.env = gym.make(self.env_name).unwrapped #unwrapped to access the behind the scenes elements of the environment
        #self.env = gym.vector.make(self.env_name, num_envs=3).unwrapped #unwrapped to access the behind the scenes elements of the environment
        
        self.env = gym.vector.SyncVectorEnv([lambda: gym.make(self.env_name).env for _ in range(self.num_envs_per_worker)])
        #self.env = gym.vector.SyncVectorEnv([lambda: gym.make(self.env_name) for _ in range(self.num_envs_per_worker)])
        #self.env = gym.vector.AsyncVectorEnv([lambda: gym.make(self.env_name).env for _ in range(self.num_envs_per_worker)])
        
        #print("self.env.state", self.env.state)
        #print("self.env.state", self.env.observations)
        
        #self.env = gym.vector.SyncVectorEnv([
        #    lambda: gym.make(self.env_name),
        #    lambda: gym.make(self.env_name),            
        #    lambda: gym.make(self.env_name)])
        
        # Define A3C Model for Executors
        #self.inputs_executor = tf.keras.Input(shape=(self.num_envs_per_worker,self.state_n + self.state_n_adj,))
        self.inputs_executor = tf.keras.Input(shape=(self.state_n + self.state_n_adj,))
        
        self.common_network_executor = Dense(self.common_layers_n[0], activation='relu')(self.inputs_executor)
        self.common_network_executor = Dense(self.common_layers_n[1], activation='relu')(self.common_network_executor)
        self.common_network_executor = Dense(self.common_layers_n[2], activation='relu')(self.common_network_executor)
 
        self.policy_network_executor = Dense(self.policy_layers_n[0], activation='relu')(self.common_network_executor)
        self.policy_network_executor = Dense(self.policy_layers_n[1], activation='relu')(self.policy_network_executor)
        self.policy_network_executor = Dense(self.policy_layers_n[2], activation='relu')(self.policy_network_executor)
        
        self.value_network_executor = Dense(self.value_layers_n[0], activation='relu')(self.common_network_executor)
        self.value_network_executor = Dense(self.value_layers_n[1], activation='relu')(self.value_network_executor)
        self.value_network_executor = Dense(self.value_layers_n[2], activation='relu')(self.value_network_executor)
        
        self.logits_executor = Dense(self.action_n)(self.policy_network_executor)
        self.values_executor = Dense(1)(self.value_network_executor)
        
        self.model_executor = Model(inputs=self.inputs_executor, outputs=[self.values_executor, self.logits_executor])
        
        self.observations = []
        self.iter_counter = 0
        
        self.internal_step_counter_all = 0
        self.done_indeed = 0
        self.done_done_all = np.zeros(3)
        
        
    def experience_generator(self, i, learner, memorizer):
        #def experience_generator(self, i, learner):        
        
        #print("EG BEOFRE WHILE", i)
        
        self.worker = i
        self.time_start = time.time()
        
        reload_model_weights = 1
        
        #while (ray.get(learner.get_episode_counter.remote()) < self.max_episodes) and (ray.get(learner.get_internal_step_counter_best.remote()) < self.internal_step_counter_limit):
        #print("BEFORE FIRST WHILE", ray.get(learner.get_episode_counter.remote()), self.max_episodes, self.iter_counter, self.internal_step_counter_limit)
        while (ray.get(learner.get_episode_counter.remote()) < self.max_episodes) and (self.iter_counter < self.internal_step_counter_limit):
            #print("AFTER FIRST WHILE", ray.get(learner.get_episode_counter.remote()), self.max_episodes, self.iter_counter, self.internal_step_counter_limit)
            #print("EG AFTER FIRST WHILE", i)
            
            self.iter_counter += 1
            
            #print("reload_model_weights", reload_model_weights)
            
            if reload_model_weights == 1:
                self.model_executor.set_weights(ray.get(learner.get_executor_model.remote()))
                reload_model_weights = 0
            
            #print("EG", self.model_executor.get_weights()[0][0][:5])
                
            # Collect Examples & Save them in the Central Observation Repository
            self.current_state = self.env.reset()
            
            #print(i, "STATE AFTER INITIAL RESET", self.current_state)
            
            
            #print("EPISODE CONTER", ray.get(learner.get_episode_counter.remote()))
            # ENSURE EXPLORATION OF advarse STATES
            if ray.get(learner.get_episode_counter.remote()) <= 1:
                self.prob_advarse_state = self.prob_advarse_state_initial
            else:
                self.prob_advarse_state = np.clip(self.prob_advarse_state_initial/math.log(ray.get(learner.get_episode_counter.remote()),5), 0.05, 0.2)
            
            self.prob_random_state = 1-self.prob_advarse_state*4
            
            # CartPole position_start:
            # 0: Close to the Left Edge
            # 1: Close to the Right Edge
            # 2: Normal, random start (env.restart())
            # 3: Leaning Heavilly to the Left
            # 4: Leaning Heavilly to the Right
            
            #print(i, "self.current_state BEFORE", self.current_state)
            
            # Choose one of the 5 scenarios with probabilities defined in p=()
            self.pos_start = np.random.choice(5,
                                              size = self.num_envs_per_worker, 
                                              p = (self.prob_advarse_state + self.prob_advarse_state_type_multiplier * self.prob_advarse_state,
                                              self.prob_advarse_state + self.prob_advarse_state_type_multiplier * self.prob_advarse_state,
                                              self.prob_random_state,
                                              self.prob_advarse_state - self.prob_advarse_state_type_multiplier * self.prob_advarse_state,
                                              self.prob_advarse_state - self.prob_advarse_state_type_multiplier * self.prob_advarse_state))
            
            #print("self.pos_start", self.pos_start)
            
            rows = np.where((self.pos_start == 0) | (self.pos_start == 5))
            self.current_state[rows,0] = -1.5 # -2.4 MIN
            rows = np.where(self.pos_start == 1)
            self.current_state[rows,0] = 1.5 # 2.4 MAX
            rows = np.where(self.pos_start == 3)
            self.current_state[rows,2] = -0.150 #-0.0.20943951023931953 MIN
            rows = np.where(self.pos_start == 4)            
            self.current_state[rows,2] = 0.150 #0.0.20943951023931953 MAX            

            #print(i, "self.current_state", self.current_state)
            
            #self.env.state = self.current_state
            
            #print(i, "self.current_state UPDATED", self.current_state)
            
            self.env_id = 0
            #print(dir(self.env))
            for self.env_indivdual in self.env.envs:
                #print(self.env_indivdual.state)
                self.env_indivdual.state = self.current_state[self.env_id, :]
                #print(self.env_indivdual.state)
                self.env_id += 1
            #self.env.observations = self.current_state
            #self.env.blablabla = self.current_state
            
            
            #print(i, "self.current_state AFTER", self.current_state)
            #print(i, "self.env.state", self.env.state)
            #print(i, "self.env.observatons AFTER", self.env.observations)
            #print(i, "self.env.blablabla AFTER", self.env.blablabla)            
            
            # Custom State Representation Adjustment to help agent learn to be closer to the center
            self.current_state = np.append(self.current_state,(self.current_state[:,0] * self.current_state[:,0]).reshape((-1,1)), axis = 1) 
            
            #print(i, "STATE AFER APPEND", self.current_state) 
            
            self.observations = np.empty((1,(self.state_n + self.state_n_adj) * 2 + 3, self.num_envs_per_worker))
            self.done = np.array([False for i in range(self.num_envs_per_worker)])
            self.internal_step_counter = 0
            self.collect_obs = []
            self.collect_obs_length = []
            self.done_indeed = 0
            #print(i, "self.observations INITIAL SHAPE", self.observations.shape)
            
            #print("self.done BEFORE", self.done)
            #print(i, "BEFORE SECOND WHILE", self.done_indeed == 0, self.internal_step_counter <= self.internal_step_counter_limit)
            #while (not(self.done.all() == True)) and (self.internal_step_counter <= self.internal_step_counter_limit):
            self.done_ind = 0
            self.done_ind_new = 0
            self.done_done_all = 0
            self.internal_step_counter_joint = []
            
            while (self.done_indeed == 0) and (self.internal_step_counter <= self.internal_step_counter_limit):                
                #print(i, "WITHIN SECOND WHILE", self.done, self.done_indeed == 0, self.internal_step_counter <= self.internal_step_counter_limit)
                #print(i, "self.current_state 2", self.current_state, self.current_state.shape)
                #self.values, self.logits = self.model_executor(tf.convert_to_tensor(np.array(np.expand_dims(self.current_state,axis=0)), dtype=tf.float32))
                self.values, self.logits = self.model_executor(tf.convert_to_tensor(np.array(self.current_state), dtype=tf.float32))
                
                # EPSILON-GREEDY with DECAY POLICY
                
                if self.epsilon_decay_policy == 1:
                    #epsilon *= (2-epsilon_decay)
                    self.epsilon *= self.epsilon_decay
                    if self.epsilon >= np.random.rand(): # Random-Informed
                        #print("self.logits", self.logits)
                        #self.action = np.argmax(self.logits, axis = 2).squeeze()
                        self.action = np.argmax(self.logits, axis = 1).squeeze() #######
                        #print("ACTION", self.action)
                        #action = np.random.choice(action_n)
                        #stochastic_action_probabilities = tf.nn.softmax(logits)
                        #action = np.random.choice(action_n, p=stochastic_action_probabilities.numpy()[0])
                    else: # Greedy
                        self.stochastic_action_probabilities = tf.nn.softmax(self.logits)
                        #print("self.stochastic_action_probabilities.numpy()[0]",self.stochastic_action_probabilities.numpy()[0])
                        
                        self.action = np.array([])
                        #print(self.action, self.action_n, self.stochastic_action_probabilities.numpy())
                        for i in range(self.num_envs_per_worker):
                            #self.action = np.append(self.action, int(np.random.choice((self.action_n), p=self.stochastic_action_probabilities.numpy()[0][i])))
                            self.action = np.append(self.action, int(np.random.choice((self.action_n), p=self.stochastic_action_probabilities.numpy()[i])))      #########                      
                        #print(i, "self.action 1", self.action)
                        #print(1, self.stochastic_action_probabilities, self.action)
                        #action = np.argmax(logits) # Total Collapse
                else:
                    #self.stochastic_action_probabilities = tf.nn.softmax(self.logits)
                    #self.action = np.random.choice(self.action_n, p=self.stochastic_action_probabilities.numpy()[0])
                    
                    self.stochastic_action_probabilities = tf.nn.softmax(self.logits)
                    #print("self.stochastic_action_probabilities.numpy()[0]",self.stochastic_action_probabilities.numpy()[0])

                    self.action = np.array([])
                    for i in range(self.num_envs_per_worker):
                        #self.action = np.append(self.action, int(np.random.choice((self.action_n), p=self.stochastic_action_probabilities.numpy()[0][i])))
                        self.action = np.append(self.action, int(np.random.choice((self.action_n), p=self.stochastic_action_probabilities.numpy()[i]))) ########

                    #print(2, self.stochastic_action_probabilities, self.action)                        
                self.action = self.action.astype(int)
            
                #print(i, "self.action 2", self.action)
                #for i in self.action:
                #    print(type(i))
                
                #print("self.action", self.action)
                self.next_state, self.reward, self.done, self.info = self.env.step(self.action)

                #print("NEXT STATE", self.next_state)
                #print("STEP 1", self.next_state, self.reward, self.done, self.info)
                
                # DOES NOT WORK WITH VECTORIZED ENVIRONMENTS
                #self.maybe_end_state = self.env.monitor.flush(force=True)
                #print("maybe next state", np.round(self.maybe_end_state,4), np.round(self.next_state,4))
                
                #print("current_state/next_state", np.round(self.current_state[:,0],4), np.round(self.next_state[:,0],4))
                
                #print("AFTER STEP", self.done)

                #NEXT STEP (3, 4) (3,) (3,) 3
                
                #print("NEXT STEP", type(self.next_state), type(self.reward), type(self.done), type(self.info))
                #print("NEXT STEP", self.next_state.shape, self.reward.shape, self.done.shape, len(self.info))
                
                #self.next_state = np.append(self.next_state, self.next_state[0] * self.next_state[0])
                self.next_state = np.append(self.next_state,(self.next_state[:,0] * self.next_state[:,0]).reshape((-1,1)), axis = 1)
                
                #print("SQUARED", (self.next_state[:,0] * self.next_state[:,0]).reshape((-1,1)))
                #print("STEP 2", self.next_state, self.reward, self.done, self.info)                

                #print("self.next_state", self.next_state)
                # Add desired-behaviour incentive to the reward function
                self.R_pos = 1*(1-np.abs(self.next_state[:, 0])/2.4) # 2.4 max value ### !!! in documentation it says 4.8 but failes beyound 2.4
                self.R_ang = 1*(1-np.abs(self.next_state[:, 2])/0.20943951023931953) ### !!! in documentation it says 0.418 max value

                #print("SHAPES BEFORE", self.reward.shape, self.R_pos.shape, self.R_ang.shape)
                #print("REWARD BEFORE", self.reward, self.R_pos, self.R_ang)
                self.reward = self.reward + self.R_pos + self.R_ang
                
                #print("self.reward AFTER", self.reward)
                
                # Custom Fail Reward to speed up Learning of conseqences of being in advarse position
                #print("self.done", self.done)
                
                rows = np.where(self.done == True)
                
                #print("BEFORE self.reward NEGATIVE", self.reward, rows)
                self.reward[rows] = self.reward_negative # ST Original -1
                #self.reward[rows-1] = self.reward_negative # ST Original -1
                #print("AFTER self.reward NEGATIVE", self.reward, rows)                
                #if self.done == True: 
                #    self.reward = self.reward_negative # ST Original -1
                        
                #current_observation = np.append(current_state,(reward, done, action))
                
                #(Executor pid=61660) SHAPES BEFORE STACKING 0 (3, 5) (16,)
                #(Executor pid=61660) SHAPES BEFORE STACKING 1 (31,) (3,) (3,) (3,)
                #(Executor pid=61660) SHAPES BEFORE STACKING 2 (1, 13, 3) (40,)
                
                #print("SHAPES BEFORE STACKING 0", self.current_state.shape, self.next_state.shape)
                
                #print("BEFORE STACKING", self.current_state, self.next_state)
                #self.current_observation = np.append(self.current_state, self.next_state)
                self.current_observation = np.concatenate([self.current_state, self.next_state], axis = 1)
                #print("AFTER STACKING", self.current_observation)
                
                #print("SHAPES BEFORE STACKING 1", self.current_observation.shape, self.reward.shape, self.done.shape, self.action.shape)
                #print("BEFORE STACKING 1", self.current_observation, self.reward, self.done, self.action)
                
                #self.current_observation = np.append(self.current_observation,(self.reward, self.done, self.action))
                self.current_observation = np.concatenate([self.current_observation, self.reward.reshape([-1,1]), self.done.reshape([-1,1]), self.action.reshape([-1,1])], axis = 1)
                
                #print("AFTER STACKING 1", self.current_observation)
                
                #print("BEFORE RESHAPE", self.current_observation)
                #print("AFTER RESHAPE", self.current_observation.reshape([1,13,self.num_envs_per_worker]))
                #print("EXPAND DIM", np.expand_dims(np.swapaxes(self.current_observation,0,1), axis = 0))
                
                #print("BEFORE RESHAPE", self.current_observation.shape)
                #print("AFTER RESHAPE", self.current_observation.reshape([1,13,self.num_envs_per_worker]).shape)
                #print("EXPAND DIM", np.expand_dims(np.swapaxes(self.current_observation,0,1), axis = 0).shape)                
                
                #print("SHAPES BEFORE STACKING 2", self.observations.shape, self.current_observation.shape, self.current_observation[0])
                #self.observations = np.vstack((self.observations, self.current_observation.reshape([1,13,self.num_envs_per_worker])))
                
                #print("BEFORE SWAPPING", self.current_observation)
                self.observations = np.vstack((self.observations, np.expand_dims(np.swapaxes(self.current_observation,0,1), axis = 0)))
                #print("AFTER SWAPPING", self.observations)
                
                if (self.observations.shape[0] > 1) and ((self.observations.shape[0] % self.experience_max_batch_size) == 0):
                    self.observations = self.observations[-self.experience_max_batch_size:,:,:]
                #print("SHAPES AFTER STACKING ", self.observations.shape, self.observations[:,:,0])

                self.current_state = self.next_state
                self.internal_step_counter += 1
                  
                if self.internal_step_counter == 1:
                    self.observations = self.observations[1:,:,:]
                    #print("self.observations",self.observations.shape, self.observations)
                
                #print("self.done END", self.done)
                
                if self.done.any() == True:
                    self.done_ind = self.done * 1
                    #print("self.done_ind", self.done_ind, self.done_ind.sum(), self.done_done_all)
                    self.done_ind_new = np.where((self.done_ind - self.done_done_all) == 1)
                    #print("self.done_ind_new", self.done_ind_new, self.done_ind_new[0].shape)
                    
                    if self.done_ind_new[0].shape[0] > 0:
                        for exp in self.done_ind_new[0]:
                            
                            #print("exp", exp)
                            self.observations_current = self.observations[:,:,exp]
                            self.terminal_state = self.info[exp]["terminal_observation"]
                            #print("1 CURRENT STATE [-1]", self.observations_current[-1], "TERMINAL STATE", self.terminal_state)
                            self.terminal_state = np.append(self.terminal_state, self.terminal_state[0] * self.terminal_state[0])
                            #print("self.terminal_state SHAPE", self.terminal_state.shape)
                            self.observations_current[-1,5:10] = self.terminal_state
                            
                            #self.observations_current = self.observations_current[1:,:]
                            
                            #print("CURRENT STATE", self.observations_current, self.done)
                            #print("self.info", self.info)
                            #print("CURRENT STATE [-1]", self.observations_current[-1])
                            #print("2 CURRENT STATE [-1]", self.observations_current[-1], "TERMINAL STATE", self.terminal_state)

                            self.collect_obs_length.append(self.observations_current.shape[0]) ### WRONG as now max length is self.experience_max_batch_size
                            #print("self.collect_obs_length LEN()", len(self.collect_obs_length), "self.observations_current.shape[0]", self.observations_current.shape[0])

                            self.observations_current = self.observations_current[-np.minimum(self.observations_current.shape[0], self.experience_max_batch_size):]
                            #print("self.observations_current", self.observations_current)

                            self.exp_len = self.observations_current.shape[0]
                            #print("self.exp_len", self.exp_len)
                            self.exp_indices = np.array(range(self.exp_len)) + 1
                            #print()
                            self.rewards = np.flip(self.observations_current[:,(self.state_n + self.state_n_adj) * 2 ])
                            
                            # !!!!!!!! POTENTIAL NEGATIVE REWARD UPDATE !!!!!!!!
                            #self.reward[1] = self.reward_negative
                            
                            #print("selfrewards",self.rewards)
                            self.discounted_rewards = np.empty(self.exp_len)
                            #print("self.discounted_rewards", self.discounted_rewards)
                            self.reward_sum = 0

                            if self.observations_current[-1,-2] == 0:
                                # IN CASE THE EPISODE HAS NTO TERMINATED
                                self.observations_current[-1,-2] = 2                        
                                self.gamma = np.full(self.exp_len, 0.99)
                                #print(1)
                            else:
                                # IN CASE THE EPISODE HAS TERMINATED
                                #print(2)
                                #print("exp_indices", exp_indices)
                                self.gamma = np.clip(0.0379 * np.log(self.exp_indices-1) + 0.7983, 0.5, 0.99)
                            #print("END SHAPE", self.observations.shape)    
                            if self.observations_current[-1,-2] == 1:
                                self.gamma[0] = 1
                                #print(3)

                            for step in range(self.exp_len):
                                self.reward_sum = self.rewards[step] + self.gamma[step] * self.reward_sum
                                self.discounted_rewards[step] = self.reward_sum    

                            self.discounted_rewards = np.flip(self.discounted_rewards)
                            
                            #print("BEFORE self.observations_current", np.round(np.float32(self.observations_current),4))
                            #print("BEFORE self.discounted_rewards", np.round(np.float32(self.discounted_rewards),4))

                            self.observations_current = np.hstack((self.observations_current, np.expand_dims(self.discounted_rewards, axis = 1)))
                            #print("AFTER self.observations_current", np.round(np.float32(self.observations_current),4))                            
                            self.observations_current = np.hstack((self.observations_current, np.zeros((self.observations_current.shape[0], self.pmb_cols))))

                            #print(self.worker, ray.get(learner.get_episode_counter.remote()), "observations", self.observations_current.shape)
                            
                            #print(self.observations_current.shape, np.round(self.observations_current,4))
                            
                            # !!!!!!!! POTENTIAL NEGATIVE REWARD UPDATE !!!!!!!!
                            #self.collect_obs.append(self.observations_current[:-1,:])
                            
                            
                            self.collect_obs.append(self.observations_current)
                            
                            #print("EXECUTOR - self.collect_obs LEN", len(self.collect_obs), "self.observations_current.shape", self.observations_current.shape)
                            
                            self.internal_step_counter_all += self.internal_step_counter
                            

                            #print("BEFORE", self.done_done_all)
                            self.done_done_all = np.maximum(self.done_done_all, self.done_ind)
                            
                            self.internal_step_counter_joint.append(self.internal_step_counter)

                            #print("AFTER", self.done_done_all, self.done_done_all.sum(), self.num_envs_per_worker)
                            
                            ray.get(learner.increase_episode_counter.remote())                                                     

                        if self.done_done_all.sum() == self.num_envs_per_worker:

                            #print("1")
                            self.done_indeed = 1
                            ray.get(learner.increase_executor_counter.remote())                                

                            #print("self.collect_obs", self.collect_obs)

                            #print("2")

                            # Update Counters to Track Progress


                            #self.internal_step_counter_all += self.internal_step_counter


                            #print("3")
                            if self.internal_step_counter_all < self.experience_batch_size:
                                pass
                            else:
                                self.internal_step_counter_all = 0  
                                ray.get(learner.increase_executor_counter.remote())

                            print("Ending Executor:", self.worker, "Episode", ray.get(learner.get_episode_counter.remote()), "Initial State", self.pos_start, "Steps:", self.internal_step_counter_joint)

                            #print("4")
                            if self.internal_step_counter >= ray.get(learner.get_internal_step_counter_best.remote()):
                                learner.set_internal_step_counter_best.remote(self.internal_step_counter)

                                print("############################## BEST EPISODE LENGTH:", self.internal_step_counter, "Executor:", self.worker)       

                            #print("5")
                            if ray.get(learner.get_internal_step_counter_best.remote()) >= self.internal_step_counter_limit:

                                self.episod_counter_target = ray.get(learner.get_episode_counter.remote())

                                print("\nREACHED GOAL of", self.internal_step_counter_limit,  "Steps in", self.episod_counter_target, "episodes; Learning Iterations (Not Available); in",time.time()-self.time_start, "seconds \n")  

                            #ray.get(memorizer.collect.remote(learner, self.collect_obs))
                            ray.get(memorizer.collect.remote(learner, self.collect_obs))                             
                            ray.get(memorizer.collect_length.remote(self.collect_obs_length))
                            
                            self.collect_obs = []
                            self.collect_obs_length = []                                

                            #print("6")
                            while ray.get(learner.get_executor_counter.remote()) >= self.num_envs_per_worker:
                                #print("WAITING FOR A NEW MODEL")
                                pass

                            reload_model_weights = 1
                        
            #self.observations = np.empty((1,(self.state_n + self.state_n_adj) * 2 + 3, self.num_envs_per_worker))
            #self.done_indeed = 0
                

@ray.remote
def train(i, memorizer, learner, args):
    #@ray.remote
    #def train(i, learner):    
    #executor = Executor.remote(memorizer, learner)
    executor = Executor.remote(i, args)
    ray.get(executor.experience_generator.remote(i, learner, memorizer))
    #executor.experience_generator.remote(i, learner, memorizer)
    
    return "DONE! " + str(i)
    
learner = Learner.remote()
memorizer = Memorizer.remote(learner)
#executor = Executor.remote(memorizer, learner)    

time_start = time.time()
results = ray.get([train.remote(i, memorizer, learner, args) for i in range(int(args["executors_n"]))])
#results = ray.get([train.remote(i, learner) for i in range(int(cores))])
print("RESULTS", results, time.time()-time_start)
    
#print(1, ray.get(learner.get_episode_counter.remote()))
#ray.get(learner.reset_episode_counter.remote())
#print(2, ray.get(learner.get_episode_counter.remote()))

# Do function from which to run it and initialize executors within each instance of the function.
# within executor run memorizer collect function that will collect experience to the moemory buffer and check if enough experience has been collected and if so it will run the memorization proces - possibly memorization process can be coded in a separate fucntion


print(ray.get(learner.get_executor_counter.remote()))

ray.shutdown()

In [None]:
ray.shutdown()