# Multi Train Gradient Update

## Importing Libraries

In [1]:
from typing import Dict
import threading

import gym
import numpy as np
import torch

from stable_baselines3 import PPO as ALGO
from stable_baselines3.common.evaluation import evaluate_policy

  for external in metadata.entry_points().get(self.group, []):


In [2]:
# Hyper-Parameters
NUM_CLIENT_MODELS = 8
NUM_TRAINING_STEPS = 1000
NUM_ITERATIONS = 10
ENV_NAME = 'LunarLander-v2'


## Init. ENV and Model

In [3]:
env = gym.make(ENV_NAME)
global_model = ALGO(
    "MlpPolicy",
    env
)

client_models = [ALGO("MlpPolicy", gym.make(ENV_NAME)) for i in range(NUM_CLIENT_MODELS)]

## Functions to Evaluate Model and Train Model within Thread

In [4]:
def _eval(model, env, index, fitnesses):
    fitness, _ = evaluate_policy(model, env)
    fitnesses[index] = fitness


def evaluate(models, env, index, eval_results, message='', verbose=0):
    # print('Starting Eval')
    iterations = 10
    fitnesses = [0 for i in range(iterations)]
    eval_threads = []
    for i in range(iterations):
        thread = threading.Thread(target=_eval, args=(
            models[index], gym.make(ENV_NAME), i, fitnesses))
        eval_threads.append(thread)

    # Start Threads
    for thread in eval_threads:
        thread.start()

    # Join Threads (wait until thread is completely executed)
    for thread in eval_threads:
        thread.join()

    mean_fitness = np.mean(sorted(fitnesses))
    eval_results[index] = mean_fitness
    print(f'Type {message} Mean reward: {mean_fitness}')


In [5]:
def train(models, index, timesteps):
    # print('Starting Training')
    models[index] = models[index].learn(reset_num_timesteps=False, total_timesteps=timesteps)
    # print('Completed Training')


In [6]:
def multithread_eval(client_models):
    # Create Threads
    client_threads = []
    eval_results = [0 for i in range(len(client_models))] 
    for ci in range(NUM_CLIENT_MODELS):
        thread = threading.Thread(target=evaluate, args=(client_models, gym.make(ENV_NAME), ci, eval_results, f'Trained Model {ci}'))
        client_threads.append(thread)

    # Start Threads
    for thread in client_threads:
        thread.start()

    # Join Threads (wait until thread is completely executed)
    for thread in client_threads:
        thread.join()

    return eval_results


## Initial Evaluation

In [7]:
for model in client_models:
    model.set_parameters(global_model.get_parameters())

global_model.save('initial')

evaluate([global_model], env, 0, [0])

multithread_eval(client_models)



Type  Mean reward: -561.3184710899275
Type Trained Model 5 Mean reward: -609.9235053293553
Type Trained Model 0 Mean reward: -600.6323700556369
Type Trained Model 7 Mean reward: -530.6762372242749
Type Trained Model 2 Mean reward: -655.4094415738248
Type Trained Model 1 Mean reward: -717.0546008876153
Type Trained Model 3 Mean reward: -597.0404691006283
Type Trained Model 6 Mean reward: -602.9492719170544
Type Trained Model 4 Mean reward: -620.7561053429929


[-600.6323700556369,
 -717.0546008876153,
 -655.4094415738248,
 -597.0404691006283,
 -620.7561053429929,
 -609.9235053293553,
 -602.9492719170544,
 -530.6762372242749]

# Train and Evaluate

In [8]:
# Evaluation Before Iterated Training
evaluate([global_model], env, 0, [0], "Global Initial Model")

for i in range(NUM_ITERATIONS):
    print('Train Iter: ', i)

    # Create Threads
    client_threads = [] 
    for ci in range(NUM_CLIENT_MODELS):
        thread = threading.Thread(target=train, args=(client_models, ci, NUM_TRAINING_STEPS))
        client_threads.append(thread)


    # Start Threads
    for thread in client_threads:
        thread.start()

    # Join Threads (wait until thread is completely executed)
    for thread in client_threads:
        thread.join()

    # Optimization Steps Check
    print('Optim Steps: ', client_models[0].get_parameters()['policy.optimizer']['state'][0]['step'])

    # Evaluation after Training
    results = multithread_eval(client_models)
    results = [(i-min(results))/(max(results)-min(results)) for i in results]
    print(results)
    total = sum(results)

    # Accumulate Client Parameters / Weights
    global_dict = global_model.policy.state_dict()
    for k in global_dict.keys():
        global_dict[k] = torch.stack([client_models[i].policy.state_dict()[k].float()*(NUM_CLIENT_MODELS*results[i]/total) for i in range(len(client_models))], 0).mean(0)

    # Load New Parameters to Global Model
    global_model.policy.load_state_dict(global_dict)

    # Load New Parameters to clients
    for model in client_models:
        model.policy.load_state_dict(global_model.policy.state_dict())

    # Evaluate Updated Global Model
    evaluate([global_model], env, 0, [0], 'Global Updated Model', verbose = 0)


Type Global Initial Model Mean reward: -595.9729889058241
Train Iter:  0
Optim Steps:  320
Type Trained Model 1 Mean reward: -208.0062700217869
Type Trained Model 4 Mean reward: -581.243488788585
Type Trained Model 3 Mean reward: -261.78460338942705
Type Trained Model 5 Mean reward: -298.7087366369495
Type Trained Model 0 Mean reward: -88.45417259461152
Type Trained Model 2 Mean reward: -679.6370829298401
Type Trained Model 6 Mean reward: -745.9259931386108
Type Trained Model 7 Mean reward: -99.06964652752148
[1.0, 0.8181639217202394, 0.10082395646086031, 0.7363682740784236, 0.25047842235088613, 0.68020748954933, 0.0, 0.9838541005086323]


In [None]:
global_model.get_parameters()

In [None]:
global_model.save('a2c_lunar_multiproc')

In [None]:
# Exporting Params as JSON
## Function to Convert Params Dict to Flattened List
def flatten_list(params):
    """
    :param params: (dict)
    :return: (np.ndarray)
    """
    params_ = {}
    for key in params.keys():
        params_[key] = params[key].tolist()
    return params_
## Write Parameters to JSON File
import json

all_params = global_model.get_parameters()
pol_params = flatten_list(all_params['policy'])

all_params['policy'] = pol_params

with open('a2c_lunar_multiproc.json', 'w') as f:
    json.dump(all_params, f, indent='\t')

In [None]:
# model_loaded = ALGO(
#     "MlpPolicy",
#     env
# )

# evaluate(model_loaded,env, verbose=1)

# import json
# with open('a2c_lunar_multiproc.json', 'w') as f:
#     new_params = json.load(f)

# loaded_pol_params = new_params['policy']
# for key in loaded_pol_params.keys():
#     loaded_pol_params[key] = th.tensor(loaded_pol_params[key])

# new_params['policy'] = loaded_pol_params

# model_loaded.set_parameters(new_params)

model_loaded = ALGO.load('a2c_lunar_multiproc', env)

In [None]:
env.reset()
evaluate(model_loaded,env, verbose=1)