In [1]:
import subprocess

In [2]:
with open("private/flowers.txt") as f:
    machine_names = f.readlines()
    machine_names = [n.strip() for n in machine_names][:4]

In [3]:
def has_conda(machine_name):
    process = subprocess.Popen(['ssh', machine_name, 'conda'],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    return stderr == b""

conda_installation = "/usr/bin/rm -fr /Data/leo &&\
cd /Data &&\
mkdir leo &&\
chmod 700 leo/ &&\
cd leo &&\
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh &&\
chmod +x Miniconda3-latest-Linux-x86_64.sh &&\
./Miniconda3-latest-Linux-x86_64.sh -b -p /Data/leo/miniconda3 &&\
source ~/.bashrc &&\
conda create --name tf2 -y matplotlib pillow tqdm scikit-learn pandas jupyter tensorflow-gpu &&\
conda activate tf2 &&\
pip install tensorflow-gpu==2.3"


def install_conda(machine_name):
    print(f"installing conda on {machine_name}")
    process = subprocess.Popen(['ssh', machine_name, conda_installation],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    #print(stdout)
    assert has_conda(machine_name)

In [4]:
def has_data(machine_name):
    process = subprocess.Popen(['ssh', machine_name, "ls /Data/leo"],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    return b"dogs-face-2015" in stdout

def copy_data(machine_name):
    print(f"Copying data to {machine_name}")
    process = subprocess.Popen(['scp', "/Data/dataset/dogs-face-2015.zip", f"{machine_name}:/Data/leo"],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    assert not len(stderr), stderr
    process = subprocess.Popen(['ssh', machine_name, "cd /Data/leo && unzip -q dogs-face-2015.zip"],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    assert not len(stderr), stderr
    assert has_data(machine_name)

In [5]:
def has_git(machine_name):
    process = subprocess.Popen(['ssh', machine_name, "ls /Data/leo"],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    return b"stylegan2-keras" in stdout

def setup_git(machine_name):
    print(f"Downloading git of project to {machine_name}")
    process = subprocess.Popen(['ssh', machine_name, 
                                "cd /Data/leo && git clone https://github.com/leoHeidel/stylegan2-keras.git"],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    assert not len(stderr), stderr
    assert has_git(machine_name)
    
    
def update_git(machine_name):
    print(f"Updating git of project on {machine_name}")
    process = subprocess.Popen(['ssh', machine_name, 
                                "cd /Data/leo/stylegan2-keras",
                                "&& git fetch --all && git reset --hard origin/multi-worker"],
                     stdout=subprocess.PIPE, 
                     stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    assert not len(stderr), stderr

In [6]:
for machine_name in machine_names:
    if not has_conda(machine_name):
        install_conda(machine_name)
        

In [7]:
for machine_name in machine_names:
    if not has_data(machine_name):
        copy_data(machine_name)

In [8]:
for machine_name in machine_names:
    if not has_git(machine_name):
        setup_git(machine_name)

In [9]:
for machine_name in machine_names:
    update_git(machine_name)

Updating git of project on aerides
Updating git of project on barlia
Updating git of project on calanthe
Updating git of project on diuris


In [18]:
%%writefile multi-worker-training.py
import json
import os
import tqdm
#print(os.environ['TF_CONFIG'])

import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt

import lib_stylegan

def is_chief(task_type, task_id):
    # If `task_type` is None, this may be operating as single worker, which works
    # effectively as chief.
    return task_type is None or task_type == 'chief' or (
        task_type == 'worker' and task_id == 0)


tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

im_size = 128
latent_size = 256 
channels = 24 # Should be at least 32 for good results
#Chosing the number of layer this way, means we start with 4x4
nb_layer = int(np.log2(im_size) - 1) 

path="/Data/leo/dogs-face-2015/*jpg"

per_worker_batch_size = 16
global_batch_size = per_worker_batch_size * num_workers


strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
    model = lib_stylegan.style_gan.StyleGan(im_size=im_size, 
                                            latent_size=latent_size, 
                                            channels=channels,
                                            nb_layer=nb_layer,
                                            global_batch_size=global_batch_size)
    model.compile(run_eagerly=True)

dataset = lib_stylegan.dataset.train_dataset(path, 
                                             n_layers=nb_layer, 
                                             im_size=im_size, 
                                             batch_size=per_worker_batch_size,
                                             latent_size=latent_size
                                            )
for args in tqdm.tqdm(dataset.take(100), total=100):
    per_replica_losses = strategy.run(model.train_step, args=(args,))
for args in tqdm.tqdm(dataset.take(1), total=1):
    per_replica_losses = strategy.run(model.train_step, args=(args,))

Overwriting multi-worker-training.py


In [19]:
import json
import os

port_start = 60110
tf_config = {
    'cluster': {
        'worker': [f'{machine_name}:{port_start + i}' for i,machine_name in enumerate(['ferrari'] + machine_names)]
    },
    'task': {'type': 'worker', 'index': 0} 
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

In [36]:
def run_job(machine_name, idx):
    process = subprocess.Popen(['scp', "multi-worker-training.py", 
                                f"{machine_name}:/Data/leo/stylegan2-keras/multi-worker-training.py"],
         stdout=subprocess.PIPE, 
         stderr=subprocess.PIPE)
    stdout, stderr = process.communicate()
    assert not len(stderr), stderr
    tf_config['task']['index'] = idx
    config_str = json.dumps(tf_config)
    process = subprocess.Popen(['ssh', machine_name, 
                                'conda activate tf2',
                                '&&',
                                f"export TF_CONFIG='{config_str}'",
                                '&&',
                                "cd /Data/leo/stylegan2-keras/",
                                '&&',
                                'python multi-worker-training.py'
                               ],
         stdout=subprocess.PIPE, 
         stderr=subprocess.PIPE)
    #stdout, stderr = process.communicate()
    #print(stdout)
    #print(stderr)
    return process

In [37]:
processes = []
for i,machine_name in enumerate(machine_names):
    print("Running on", machine_name)
    processes.append(run_job(machine_name, i+1))

Running on aerides
Running on barlia
Running on calanthe
Running on diuris


In [39]:
#stdout, stderr = processes[0].communicate()
#print(stdout)
#print(stderr)

b'{"cluster": {"worker": ["ferrari:60100", "aerides:60101", "barlia:60102", "calanthe:60103", "diuris:60104"]}, "task": {"type": "worker", "index": 1}}\n'
b'2020-12-06 22:01:42.511659: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1\n2020-12-06 22:01:43.293177: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1\n2020-12-06 22:01:43.297263: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:982] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero\n2020-12-06 22:01:43.297625: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1716] Found device 0 with properties: \npciBusID: 0000:01:00.0 name: Quadro P2000 computeCapability: 6.1\ncoreClock: 1.4805GHz coreCount: 8 deviceMemorySize: 4.92GiB deviceMemoryBandwidth: 130.53GiB/s\n2020-12-06 22:01:43.297655: I tensorflow/stream_exec

In [38]:
import json
import os
import tqdm

import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt

import lib_stylegan

def is_chief(task_type, task_id):
    # If `task_type` is None, this may be operating as single worker, which works
    # effectively as chief.
    return task_type is None or task_type == 'chief' or (
        task_type == 'worker' and task_id == 0)


tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

im_size = 128
latent_size = 256 
channels = 24 # Should be at least 32 for good results
#Chosing the number of layer this way, means we start with 4x4
nb_layer = int(np.log2(im_size) - 1) 

path="/Data/leo/dogs-face-2015/*jpg"

per_worker_batch_size = 16
global_batch_size = per_worker_batch_size * num_workers


strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
    model = lib_stylegan.style_gan.StyleGan(im_size=im_size, 
                                            latent_size=latent_size, 
                                            channels=channels,
                                            nb_layer=nb_layer,
                                            global_batch_size=global_batch_size)
    model.compile(run_eagerly=True)

dataset = lib_stylegan.dataset.train_dataset(path, 
                                             n_layers=nb_layer, 
                                             im_size=im_size, 
                                             batch_size=per_worker_batch_size,
                                             latent_size=latent_size
                                            )
for args in tqdm.tqdm(dataset.take(100), total=100):
    per_replica_losses = strategy.run(model.train_step, args=(args,))
for args in tqdm.tqdm(dataset.take(1), total=1):
    per_replica_losses = strategy.run(model.train_step, args=(args,))

INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:worker/replica:0/task:0/device:CPU:0', '/job:worker/replica:0/task:0/device:XLA_CPU:0', '/job:worker/replica:0/task:0/device:XLA_GPU:0', '/job:worker/replica:0/task:0/device:GPU:0']
INFO:tensorflow:Using MirroredStrategy with devices ('/job:worker/task:0/device:GPU:0',)
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'worker': ['ferrari:60100', 'aerides:60101', 'barlia:60102', 'calanthe:60103', 'diuris:60104']}, task_type = 'worker', task_id = 0, num_workers = 5, local_devices = ('/job:worker/task:0/device:GPU:0',), communication = CollectiveCommunication.AUTO


  0%|          | 0/100 [00:00<?, ?it/s]

Number of train images found: 52597
INFO:tensorflow:Error reported to Coordinator: 
Traceback (most recent call last):
  File "/Data/leo/miniconda3/envs/tf2/lib/python3.8/site-packages/tensorflow/python/training/coordinator.py", line 297, in stop_on_exception
    yield
  File "/Data/leo/miniconda3/envs/tf2/lib/python3.8/site-packages/tensorflow/python/distribute/mirrored_run.py", line 199, in _call_for_each_replica
    t.has_paused.wait()
  File "/Data/leo/miniconda3/envs/tf2/lib/python3.8/threading.py", line 558, in wait
    signaled = self._cond.wait(timeout)
  File "/Data/leo/miniconda3/envs/tf2/lib/python3.8/threading.py", line 302, in wait
    waiter.acquire()
KeyboardInterrupt


  0%|          | 0/100 [00:57<?, ?it/s]


KeyboardInterrupt: 