Skip to content

Commit

Permalink
Added Demo job with 2 levels of hierarchy, and fixed some bugs and ad…
Browse files Browse the repository at this point in the history
…ded startup delay
  • Loading branch information
mukherjeearnab committed Jan 19, 2024
1 parent 79cdd24 commit a1037b1
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 10 deletions.
7 changes: 6 additions & 1 deletion node/apps/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
'''
The Client Process Module
'''
from time import time
from time import time, sleep
from copy import deepcopy
from env import env
from helpers.argsparse import args
from helpers.logging import logger
from helpers import p2p_store, perflog
Expand Down Expand Up @@ -50,6 +51,10 @@ def client_process(job_name: str, cluster_id: str) -> None:
'global_extra_data': None
}

startup_delay = env['DELAY'] * 5
logger.info(f'[{node_type}] Sleeping for {startup_delay} seconds')
sleep(startup_delay)

# 0. Wait for JobsheetDownload Flag
listeners.wait_for_jobsheet_flag(job_name, cluster_id, node_type)

Expand Down
6 changes: 0 additions & 6 deletions node/apps/job/check_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,11 @@ def get_jobs_from_server(logicon_url: str, job_set: set, not_job_set: set) -> No
# add job ID to job set.
job_set.add(job_id)

if job_id not in not_job_set:
logger.info(f'Fetching Job Manifest for [{job_id}].')

# get the job manifest
is_my_job = get_job_manifest(job_id, logicon_url)

if not is_my_job:
job_set.remove(job_id)
not_job_set.add(job_id)
else:
not_job_set.remove(job_id)


def get_job_manifest(job_id: str, logicon_url: str) -> bool:
Expand Down
10 changes: 7 additions & 3 deletions node/apps/worker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
'''
The Worker Process Module
'''
from time import time
from copy import deepcopy
from time import time, sleep
from env import env
from helpers.argsparse import args
from helpers.logging import logger
from helpers import p2p_store, perflog
from helpers.converters import get_base64_state_dict, set_base64_state_dict
from helpers.converters import get_base64_state_dict
from apps.worker import handlers
from apps.common import getters, setters, listeners

Expand Down Expand Up @@ -52,6 +52,10 @@ def worker_process(job_name: str, cluster_id: str) -> None:
'global_extra_data': None
}

startup_delay = env['DELAY'] * 5
logger.info(f'[{node_type}] Sleeping for {startup_delay} seconds')
sleep(startup_delay)

# 0. Wait for JobsheetDownload Flag
listeners.wait_for_jobsheet_flag(job_name, cluster_id, node_type)

Expand Down
210 changes: 210 additions & 0 deletions templates/job/demo_8c_2tree.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
dataset:
cluster_configs:
- &def_dir_dist
prep: cifar_def
preprocessor: default
distribution:
distributor: diritchlet_dist
# chunks:
# - 0.5
# - 0.5
extra_params:
diritchlet:
alpha: 0.5
seed: 10
train_test_split:
train: 0.8
test: 0.2

consensus:
- &default_2_3
runnable: majority_2_3

training:
client_configs:
- &cifar_train_def
learning_rate: 0.0001
batch_size: 64
local_epochs: 5
extra_params:
fed_prox:
mu: 0.01
lr_decay: 1

worker_configs:
- &fed_avg_def
aggregator: fed_avg
batch_size: 256
extra_params:
fed_avg_momentum: 0.9

model_params:
- &cifar_simple_cnn
model_file: cifar10CNN_wp
parameter_mixer: default
training_loop_file: cel_simple
test_file: multi_cel_simple

client_configs:
- &def_client
model_params:
<<: *cifar_simple_cnn
train_params:
<<: *cifar_train_def
dataset_params:
<<: *def_dir_dist

worker_configs:
- &def_worker
model_params:
<<: *cifar_simple_cnn
aggregator_params:
<<: *fed_avg_def
dataset_params:
<<: *def_dir_dist

clients:
node_1:
<<: *def_client
node_2:
<<: *def_client
node_3:
<<: *def_client
node_4:
<<: *def_client
node_5:
<<: *def_client
node_6:
<<: *def_client
node_7:
<<: *def_client
node_8:
<<: *def_client

workers:
node_0:
<<: *def_worker
node_1:
<<: *def_worker
node_2:
<<: *def_worker
node_3:
<<: *def_worker
node_4:
<<: *def_worker
node_5:
<<: *def_worker
node_6:
<<: *def_worker
node_7:
<<: *def_worker
node_8:
<<: *def_worker

clusters:
cluster_0:
upstream_cluster: null
clients:
- cluster_0_0
- cluster_0_1
workers:
- node_0
- node_2
- node_8
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

cluster_0_0:
upstream_cluster: cluster_0
clients:
- cluster_0_0_0
- cluster_0_0_1
workers:
- node_0
- node_1
- node_3
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

cluster_0_1:
upstream_cluster: cluster_0
clients:
- cluster_0_1_0
- cluster_0_1_1
workers:
- node_0
- node_5
- node_7
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

cluster_0_0_0:
upstream_cluster: cluster_0_0
clients:
- node_1
- node_2
workers:
- node_0
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

cluster_0_0_1:
upstream_cluster: cluster_0_0
clients:
- node_3
- node_4
workers:
- node_0
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

cluster_0_1_0:
upstream_cluster: cluster_0_1
clients:
- node_5
- node_6
workers:
- node_0
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

cluster_0_1_1:
upstream_cluster: cluster_0_1
clients:
- node_7
- node_8
workers:
- node_0
consensus_params:
<<: *default_2_3
train_params:
cluster_epochs: 1
dataset_params:
<<: *def_dir_dist

job_params:
rounds: 3

0 comments on commit a1037b1

Please sign in to comment.