In [72]:
FIRTS_RUN = False

ctrl_svr_ip = "129.114.108.5"

svr_ips = [
    "10.52.3.84",
    "10.52.3.34",
    "10.52.3.164",
    "10.52.0.30",
    "10.52.3.162",
    "10.52.0.170",
    "10.52.0.58",
    # "10.52.2.100", # bad routing 
    "10.52.2.115",
    "10.52.3.4",
    "10.52.2.188",
    "10.52.2.75",
    "10.52.3.26",
    "10.52.0.34",
    "10.52.2.47",
    "10.52.3.118",
    "10.52.2.198",
    "10.52.3.58",
    "10.52.2.86",
    "10.52.2.151",
    "10.52.3.46",
    "10.52.3.129",
    "10.52.1.106",
    "10.52.0.217",
    "10.52.3.141",
    "10.52.2.123",
    "10.52.3.142",
    "10.52.2.245",
    "10.52.2.58",
    "10.52.3.196",
    "10.52.2.210",
    "10.52.3.3",
    "10.52.3.171",
    "10.52.0.185",
    # "10.52.1.100", # bad routing
    "localhost"
]

## Prepare Connections

- `ctrl` is the client connection to the main server.
- `workers` is the client connections to all worker servers.

In [73]:
import paramiko
from concurrent.futures import ThreadPoolExecutor

pkey = paramiko.RSAKey.from_private_key_file("/Users/ybyan/.ssh/ddb-access.pem")

# Jump host details
jump_host = ctrl_svr_ip
jump_user = "cc"

# Worker node details
worker_ips = [ ip for ip in svr_ips if ip ]
worker_user = "cc"

workers = []
ctrl = None

try:
    # Connect to the jump server
    ctrl = paramiko.SSHClient()
    ctrl.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ctrl.connect(jump_host, username=jump_user, pkey=pkey)
except Exception as e:
    print(f"Controller server {jump_host} failed with error: {e}")

# for w_ip in worker_ips:
def connect_worker(w_ip):
    try:
        # Open a channel to the worker node
        jump_transport = ctrl.get_transport()
        dest_addr = (w_ip, 22)
        local_addr = (jump_host, 22)
        worker_channel = jump_transport.open_channel("direct-tcpip", dest_addr, local_addr)

        # Connect to the worker node via the channel
        worker_client = paramiko.SSHClient()
        worker_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        worker_client.connect(
            w_ip,
            username=worker_user,
            pkey=pkey,
            sock=worker_channel
        )

        return worker_client
    except Exception as e:
        print(f"Worker {w_ip} failed with error: {e}")
        return None

with ThreadPoolExecutor(max_workers=len(worker_ips)) as executor:
    # futures = [executor.submit(execute_command_on_worker, ip) for ip in worker_ips]
    futures = [ 
        executor.submit(connect_worker, ip) for ip in worker_ips
    ]

    # Gather and print results
    for future in futures:
        # print(future.result())
        if future.result():
            workers.append(future.result())
print(f"Expected {len(worker_ips)} workers, got {len(workers)}")

Expected 34 workers, got 34


Define some functions.

In [68]:
def execute_command_on_worker(worker: paramiko.SSHClient, command: str = "hostname") -> str:
    """Connect to a worker node via the jump server and execute a command."""
    try:
        # Execute the command on the worker node
        stdin, stdout, stderr = worker.exec_command(command)
        output = stdout.read().decode()
        error = stderr.read().decode()

        return f"{output}{error}"
    except Exception as e:
        return f"Worker {worker} failed with error: {e}"

# Run the workflow on all workers in parallel
def exec_cmd(command: str, silent: bool = False):
    with ThreadPoolExecutor(max_workers=len(worker_ips)) as executor:
        futures = [
            executor.submit(execute_command_on_worker, w, command) 
            for w in workers
        ]

        # Gather and print results
        for future in futures:
            r = future.result()
            if not silent:
                print(r)

## Setup

1. prepare a python environment at the coordinator server.
2. prepare a NFS (mounted at `/shared`) for sharing some DDB configs.
3. prepare coordinator to worker ssh connections.
4. prepare each server runtime dependencies.

In [None]:
GEN_KEY = False
ADD_KEY_TO_CTRL = False
if FIRTS_RUN:
    GEN_KEY = True
    ADD_KEY_TO_CTRL = True

In [None]:
# install uv for python environment on coordinator server.
if FIRTS_RUN:
    output = ctrl.exec_command("curl -LsSf https://astral.sh/uv/install.sh | sh")[1].read().decode()
    print(output)

no checksums to verify
installing to /home/cc/.local/bin
  uv
  uvx
everything's installed!

To add $HOME/.local/bin to your PATH, either restart your shell or run:

    source $HOME/.local/bin/env (sh, bash, zsh)
    source $HOME/.local/bin/env.fish (fish)



In [None]:
# Prepare a tmp directory on all worker nodes for the shared ddb configs
ctrl.exec_command("mkdir -p /shared/tmp/ddb")[1].read().decode()
exec_cmd("rm -rf /tmp/ddb; ln -sf /shared/tmp/ddb /tmp/", silent=True)

In [70]:
# set up ssh key for all servers
error = None
if GEN_KEY:
    stdin, stdout, stderr = ctrl.exec_command('yes y | ssh-keygen -t rsa -b 2048 -f ~/.ssh/id_rsa -q -N ""')
    output = stdout.read().decode()
    error = stderr.read().decode()

if error:
    print(f"Error: failed to gen ssh key: {error}")
else:
    error = None
    if ADD_KEY_TO_CTRL:
        _, stdout, stderr = ctrl.exec_command("cat ~/.ssh/id_rsa.pub | tee -a ~/.ssh/authorized_keys")
        output = stdout.read().decode()
        error = stderr.read().decode()
    else:
        _, stdout, stderr = ctrl.exec_command("cat ~/.ssh/id_rsa.pub")
        output = stdout.read().decode()
        error = stderr.read().decode()
    if error:
        print(f"Error: failed to add ssh key to authorized_keys: {error}")
    else:
        print(f"Adding ssh key: {output}")
        exec_cmd(f"echo '{output.strip()}' >> ~/.ssh/authorized_keys", silent=True)

Adding ssh key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCMSfpnB0E997gqQNh6UV7uPXGBLpAh/Hp5PsSPYCpbs1r5RxoyGTrw1SYl8r+w4TMvTFsOPxnbDF1DYDrhp/G9aZzNttT+6GoQVKK42QDn1/Omdd9ujdijBWydguRgjOfLrSleG04CmLK5XE3mvMxLZuHGjE8US2lDXzWztaHheegXd1Gc140AFbP0QtgU5DRMnpRCGY78doZKCvcoX7PFZfMp3MoMrrFR2TGGxYtdrB31dmS9WYU3NcMyV2RnD+jHwZPXkooRZ9DQ9MH7c/dSK586VkaF/+nwJnjJu1bsag0U2igXRuwdiTI+aKoSYXFA4fNPlTH6pjOsxAMKzwY1 cc@node



In [71]:
exec_cmd("sudo apt-get update; sudo apt-get install -y gdb build-essential cmake", silent=True)
exec_cmd("cd /shared/distributed-debugger && make setup-machine", silent=True)