In [None]:
import boto3
# the code below assumes that you configure boto3 with your AWS account
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html
ec2 = boto3.resource('ec2')
client = boto3.client('ec2')

In [None]:
experiment_name = "New_reference"
use_internal_routing = True

# ^-- must be unique per experiment
coordinator_type = "c5.large"
dht_port = 31337
worker_type = "g4dn.xlarge"
num_workers = 16

image_id = "ami-0db67995cd75f5a9f"
aws_key_name = "aws"  ## update with your aws key name
subnet = "subnet-fcd1ca86"  ## update with your subnet name or skip entirely
security_group = "sg-a75591d4"  ## you guessed it

data_path = "https://hivemind-data.s3.us-east-2.amazonaws.com/wikitext103.tar.gz"
hivemind_version = "master" # branch, commit or tag for git checkout

In [None]:
# check that the experiment name is unique.
# disable this if you want to add more instances to an existing experiment
existing_instances = ec2.instances.filter(Filters=[
    {'Name': 'instance-state-name', 'Values': ['running']},
    {'Name': 'tag:experiment', 'Values': [experiment_name]},
])
ins = list(existing_instances)
if ins:
    print(f"Already running {experiment_name}: {ins}")
    print(len(ins))
    for i in ins:
        print(i.public_ip_address)

In [None]:
# to remove all instances and spot requests, run this:
existing_instances.terminate()
requests_to_shutdown = []
for request in client.describe_spot_instance_requests()['SpotInstanceRequests']:
    if request['State'] == 'active' and any(
        tag['Key'] == 'experiment' and tag['Value'] == experiment_name
        for tag in request['Tags']):
        requests_to_shutdown.append(request['SpotInstanceRequestId'])
if requests_to_shutdown:
    client.cancel_spot_instance_requests(
        SpotInstanceRequestIds=requests_to_shutdown)

### Stage 1: run coordinator

Coordinator is an instance that welcomes new peers into a decentralized training run. If coordinator is down, new peers can still join by initializing with one of the existing peers.

In [None]:
WandB_API_key = "7cc938e45e63ef7d2f88f811be240ba0395c02dd"

In [None]:
coordinator_script = f'''#!/bin/bash -ex
exec > >(tee /var/log/user-command.log|logger -t user-data -s 2>/dev/console) 2>&1



# note: we configure rsyslog to forward logs from all trainers
sudo sh -c 'cat <<"EOF" >> /etc/rsyslog.conf
$ModLoad imudp
$UDPServerRun 514

$ModLoad imtcp
$InputTCPServerRun 514

$FileCreateMode 0644
$DirCreateMode 0755

$template RemoteLogs,"/var/log/rsyslog/%HOSTNAME%.log"
*.*  ?RemoteLogs
& ~
EOF'
sudo systemctl restart rsyslog


# NOTE: docker run must be called without --it as there is no tty
# check machine's /var/log/user-command.log for details

docker run --name trainer_run --ipc=host --net=host mrbn/hivemind bash -c """
set -euxo pipefail

pip install whatsmyip
pip install torch-optimizer
git clone https://github.com/learning-at-home/hivemind
cd hivemind
git checkout {hivemind_version}
pip install -e .

pip install wandb

sh -c 'cat <<"EOF" >> ~/.netrc
machine api.wandb.ai
  login user
  password {WandB_API_key}
EOF'

cd examples/albert
python ./run_first_peer.py --listen_on [::]:{dht_port}  --experiment_prefix {experiment_name} --wandb_project Demo-run-2
"""
'''

In [None]:
coordinator, = ec2.create_instances(
    ImageId=image_id, InstanceType=coordinator_type,
    MinCount=1, MaxCount=1,
    SecurityGroupIds=[security_group], SubnetId=subnet,
    KeyName=aws_key_name, UserData=coordinator_script,
    TagSpecifications=[{'ResourceType': 'instance', 'Tags': [
        {'Key':'experiment', 'Value': experiment_name},
        {'Key':'role', 'Value': 'first_peer'}
    ]}]
)
coordinator.wait_until_running()
coordinator, = list(ec2.instances.filter(InstanceIds=[coordinator.id]))

print(coordinator.private_ip_address, coordinator.public_ip_address)

if use_internal_routing:
    coordinator_ip = coordinator.private_ip_address
else:
    coordinator_ip = coordinator.public_ip_address

coordinator_endpoint = f"{coordinator_ip}:{dht_port}"
print(coordinator_endpoint)

In [None]:
import time
import hivemind as src

probe = await src.DHTNode.create(listen=False)
for i in range(20):
    ping_response = await probe.protocol.call_ping(f"{coordinator.public_ip_address}:{dht_port}")
    if ping_response is not None:
        print("Coordinator is now accessible to workers!")
        print(f"Use public ip: {coordinator_endpoint}")
        break
    else:
        print("Coordinator is not accessible yet, will retry in 30s...")
        time.sleep(30)
else:
    print("Coordinator failed to launch for some reason.")
    print(f"Check /var/log/user-command.log at ec2-user@{coordinator.public_ip_address}")
    
# this should normally take 3-6 minutes depending on the will of Bezos

### Stage 2: run workers

Workers are preemptible GPU instances that run compute gradients and perform Moshpit averaging. In this example, each worker is a single tesla T4 instance.

In [None]:
worker_script = f'''#!/bin/bash -ex
exec > >(tee /var/log/user-command.log|logger -t user-data -s 2>/dev/console) 2>&1

set -euxo pipefail
cd ~

docker run --name hivemind_run --gpus all --ipc=host --net=host mrbn/hivemind bash -c """

pip install torch-optimizer==0.1.0 scipy==1.5.2
git clone https://github.com/learning-at-home/hivemind
cd hivemind
git checkout {hivemind_version}
pip install -e .
cd examples/albert

ulimit -n 4096

mkdir -p ~/data
wget -qO- {data_path} | tar xzf -

HIVEMIND_THREADS=256 python run_trainer.py \
  --output_dir ./outputs --overwrite_output_dir \
  --logging_dir ./logs --logging_first_step --logging_steps 100 \
  --initial_peers {coordinator_endpoint} --experiment_prefix {experiment_name} --seed 42
"""
'''

In [None]:
workers = []

In [None]:
while True:
    existing_instances = list(ec2.instances.filter(Filters=[
        {'Name': 'instance-state-name', 'Values': ['running']},
        {'Name': 'tag:experiment', 'Values': [experiment_name]},
    ]))
    
    count_needed = num_workers+1-len(existing_instances)
    if count_needed:
        try:
            print("CREATING ONE WORKER!")
            workers += ec2.create_instances(
                ImageId=image_id, InstanceType=worker_type,
                MinCount=1, MaxCount=1,
                UserData=worker_script,
                SecurityGroupIds=[security_group], SubnetId="subnet-36b11c5d", 
                KeyName=aws_key_name,
                InstanceMarketOptions={
                    "MarketType": "spot",
                    "SpotOptions": {
                        "SpotInstanceType": "one-time",
                        "InstanceInterruptionBehavior": "terminate"
                    }
                },
                TagSpecifications=[{'ResourceType': 'instance', 'Tags': [
                    {'Key':'experiment', 'Value': experiment_name},
                    {'Key':'role', 'Value': 'gpu_worker'}
                ]}, {'ResourceType': 'spot-instances-request', 'Tags': [
                    {'Key':'experiment', 'Value': experiment_name},
                    {'Key':'role', 'Value': 'gpu_worker'}
                ]}],
            )

        except BaseException as e:
            print("FAILED", e)
        else:
            print("Added new peer", workers[-1].public_ip_address)
    else:
        print("Enough workers already, check back in 60s...")
    time.sleep(60)