In [None]:
# !pip install ray ray[tune]

In [1]:
import ray, json, math, re

In [2]:
def available_gpus():
    base = ray.available_resources().get('GPU')
    if base: return int(base)
    return 0

In [3]:
def pprint(data):
    print(json.dumps(data, indent=4))

In [4]:
ray.init(address='ray://193.167.37.127:10001')

2024-11-11 06:38:26,695	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: log_to_driver
    Ray: 2.38.0
    Python: 3.12.7
This process on Ray Client was started with:
    Ray: 2.38.0
    Python: 3.12.3



0,1
Python version:,3.12.7
Ray version:,2.38.0
Dashboard:,http://10.151.173.103:8265


### CLUSTER HARDWARE

In [5]:
pprint(ray.available_resources())

{
    "node:10.130.233.69": 1.0,
    "accelerator_type:G": 4.0,
    "node:__internal_head__": 1.0,
    "node:10.84.60.63": 1.0,
    "node:10.84.60.3": 1.0,
    "node:10.151.173.101": 1.0,
    "node:10.151.173.103": 1.0,
    "CPU": 55.0,
    "object_store_memory": 34444972031.0,
    "memory": 115816633344.0,
    "GPU": 4.0
}


# NAIVE MASS-GPU COMPUTATION
- `num_gpus=1` each func reserves one entire GPU.
- Generic GPU stresstesting through pytorch matrix multiplication.
    - Notice the func-contained torch import.
    - I have pytorch installed on all machines, so we do not need to install it with `runtime_env`.

In [6]:
@ray.remote(num_gpus=1)
def naive_gpu_problem():
    import torch, time
    torch.cuda.set_device(0)

    # FIND WHAT GPU WERE USING
    my_gpu = torch.cuda.get_device_name(0)

    MATRIX_SIZE, ITERATIONS = 4096, 100
    start_time = time.time()
    
    matrix_a = torch.randn((MATRIX_SIZE, MATRIX_SIZE), device='cuda')
    matrix_b = torch.randn((MATRIX_SIZE, MATRIX_SIZE), device='cuda')

    for _ in range(ITERATIONS):
        result = torch.matmul(matrix_a, matrix_b)

        del result
        torch.cuda.empty_cache()

    return (my_gpu, time.time() - start_time)

In [7]:
# MAKE SURE ALL THREE GPUS ARE AVAILABLE
num_gpus = available_gpus()
assert num_gpus == 4, f'EXPECTED AMOUNT OF GPUS NOT AVAILABLE (FOUND {num_gpus}, EXPECTED 3)'

num_tasks = 100
tasks = [naive_gpu_problem.remote() for x in range(num_tasks)]
task_durations = ray.get(tasks)

# WAIT FOR EVERY TASK TO FINISH
total_duration = round(sum([item[1] for item in task_durations]), 3)
print(f'THE EXPERIMENT TOOK {total_duration} SECONDS')

THE EXPERIMENT TOOK 120.851 SECONDS


### GRAFANA OUTPUT

<center><img src="naive5.png" /></center>

### GPU DIFFERENCES

In [8]:
def avg_times(task_durations):
    averages = {}
    ratios = {}

    # AGGREGATE EACH SUBTASK DURATION PER GPU
    for gpu_name, task_duration in task_durations:
        if gpu_name not in averages:
            averages[gpu_name] = []

        averages[gpu_name].append(task_duration)

    # FIND THE AVERAGE TASK DURATION FOR EACH GPU
    for gpu, values in averages.items():
        averages[gpu] = round(sum(values) / len(values), 3)

    # WHAT GPU IS SLOWEST
    slowest_duration = max(averages.values())

    # FIND POWER RATIOS
    for gpu, avg_value in averages.items():
        ratios[gpu] =  round(slowest_duration / avg_value, 3)

    return {
        'avg_durations': averages,
        'ratios': ratios
    }

In [9]:
naive_analysis = avg_times(task_durations)
pprint(naive_analysis)

{
    "avg_durations": {
        "NVIDIA GeForce RTX 4090": 0.461,
        "NVIDIA GeForce RTX 3080": 0.847,
        "NVIDIA GeForce GTX TITAN X": 3.019
    },
    "ratios": {
        "NVIDIA GeForce RTX 4090": 6.549,
        "NVIDIA GeForce RTX 3080": 3.564,
        "NVIDIA GeForce GTX TITAN X": 1.0
    }
}


# OPTIMIZED GPU SOLUTION

### REGISTER ONE GPU_ACTOR ON EACH CLUSTER NODE WITH A GPU

In [10]:
# @ray.remote(num_gpus=1)
# def naive_gpu_problem():
#    DO STUFF...

In [11]:
@ray.remote(num_gpus=1)
class gpu_actor:
    def __init__(self):
        self.my_gpu = self.detect_gpu()

    # DETECT WHAT GPU ACTOR HAS RESERVED
    def detect_gpu(self):
        import torch
        return torch.cuda.get_device_name(0)

    # DO MATRIX MULTIPLICATION TO STRESSTEST
    def perform_work(self):
        import torch, time
        
        MATRIX_SIZE, ITERATIONS = 4096, 100

        matrix_a = torch.randn((MATRIX_SIZE, MATRIX_SIZE), device='cuda')
        matrix_b = torch.randn((MATRIX_SIZE, MATRIX_SIZE), device='cuda')
    
        start_time = time.time()
        for _ in range(ITERATIONS):
            result = torch.matmul(matrix_a, matrix_b)
    
            del result
            torch.cuda.empty_cache()

        return (self.my_gpu, time.time() - start_time)

### BASED ON A RELATIVE POWER RATIO
### FIGURE OUT HOW TO SPLIT n TASKS EVENLY AMONG x GPUS

In [12]:
def task_distribution(total_num_tasks, reserved_gpus, power_ratios):
    container = []

    # HOW MANY UNITS OF WORK IN TOTAL
    total_units = sum([power_ratios[gpu_name] for gpu_name in reserved_gpus])
    work_delegated = 0

    # BASED ON THE POWER RATIO, DELEGATE N TASKS TO EACH GPU
    for gpu_name in reserved_gpus:
        gpu_tasks = math.floor(total_num_tasks  * (power_ratios[gpu_name] / total_units))
        
        work_delegated += gpu_tasks
        container.append(gpu_tasks)

    # IF THERE ARE ANY UNASSIGNED TASKS, DISTRIBUTE THEM EQUALLY AMONG THE GPUS
    for nth in range(total_num_tasks - work_delegated):
        container[nth % len(container)] += 1

    return container

### CREATE n UNIQUE TASKS
### DISTRIBUTE THEM TO x GPUs ON THE CLUSTER

In [13]:
def simulation(total_tasks, gpu_power_ratio):

    # CHECK CLUSTER GPU STATUS
    num_gpus = available_gpus()
    expected_gpus = 4
    assert num_gpus == expected_gpus, f'EXPECTED AMOUNT OF GPUS NOT AVAILABLE (GOT {num_gpus}, EXPECTED {expected_gpus})'
    
    # CREATE AN INSTANCE ON EACH NODE WITH A GPU
    # AND FIND WHICH GPU THEY WERE ASSIGNED
    actors = [gpu_actor.remote() for x in range(num_gpus)]
    actor_gpus = [ray.get(actor.detect_gpu.remote()) for actor in actors]

    try:

        # FIGURE OUT HOW MANY TASKS EACH GPU SHOULD BE ASSIGNED
        # BASED ON RELATIVE COMPUTATIONAL POWER RATIO
        actor_tasks = task_distribution(total_tasks, actor_gpus, gpu_power_ratio)
        tasks = []
    
        # DELEGATE & START THE TASKS
        for nth, actor in enumerate(actors):
            gpu_name = actor_gpus[nth]
            num_tasks = actor_tasks[nth]
            
            print(f'ASSIGNING {num_tasks} TASKS TO GPU {gpu_name}')
            tasks += [actor.perform_work.remote() for x in range(num_tasks)]
    
        # WAIT FOR EVERY TASK TO FINISH
        durations = ray.get(tasks)
        total_duration = round(sum([item[1] for item in durations]), 3)
        print(f'\nTHE EXPERIMENT TOOK {total_duration} SECONDS')
    
        # THEN KILL THE ACTORS
        [ray.kill(actor) for actor in actors]
    
        return durations

    # IF THE PROCESS FAILS, KILL THE ACTORS TO UNALLOCATE RESOURCES
    except Exception as error:
        print(error)
        [ray.kill(actor) for actor in actors]

### SPLITTING WORK BASED ON POWER RATIOS

In [14]:
first_durations = simulation(100, {
    'NVIDIA GeForce GTX TITAN X': 1,
    "NVIDIA GeForce RTX 3080": 3.573,
    'NVIDIA GeForce RTX 4090': 6.51,
})

ASSIGNING 30 TASKS TO GPU NVIDIA GeForce RTX 3080
ASSIGNING 9 TASKS TO GPU NVIDIA GeForce GTX TITAN X
ASSIGNING 8 TASKS TO GPU NVIDIA GeForce GTX TITAN X
ASSIGNING 53 TASKS TO GPU NVIDIA GeForce RTX 4090

THE EXPERIMENT TOOK 90.891 SECONDS


In [None]:
# NAIVE EXPERIMENT DURATION
# 120.851 SECONDS

### GRAFANA OUTPUT

<center><img src="better.png" /></center>

In [15]:
pprint(avg_times(first_durations))

{
    "avg_durations": {
        "NVIDIA GeForce RTX 3080": 0.733,
        "NVIDIA GeForce GTX TITAN X": 2.959,
        "NVIDIA GeForce RTX 4090": 0.351
    },
    "ratios": {
        "NVIDIA GeForce RTX 3080": 4.037,
        "NVIDIA GeForce GTX TITAN X": 1.0,
        "NVIDIA GeForce RTX 4090": 8.43
    }
}


In [17]:
# OLD RATIOS
#    'NVIDIA GeForce GTX TITAN X': 1,
#    "NVIDIA GeForce RTX 3080": 3.573,
#    'NVIDIA GeForce RTX 4090': 6.51,

### OPTIMIZING RATIO FURTHER

In [18]:
second_durations = simulation(100, {
    "NVIDIA GeForce GTX TITAN X": 1.0,
    "NVIDIA GeForce RTX 3080": 4.037,
    "NVIDIA GeForce RTX 4090": 8.43,
})

ASSIGNING 28 TASKS TO GPU NVIDIA GeForce RTX 3080
ASSIGNING 7 TASKS TO GPU NVIDIA GeForce GTX TITAN X
ASSIGNING 7 TASKS TO GPU NVIDIA GeForce GTX TITAN X
ASSIGNING 58 TASKS TO GPU NVIDIA GeForce RTX 4090

THE EXPERIMENT TOOK 82.394 SECONDS


In [None]:
# NAIVE EXPERIMENT DURATION
# 120.851 SECONDS

# SLIGHTLY OPTIMIZED DURATION
# 90.891 SECONDS

### GRAFANA OUTPUT

<center><img src="best.png" /></center>

### HOW TO INTEGRATE RAY WITH MLFLOW

In [None]:
with mlflow.connect() and ray.connect():

    # DEFINE HOW THE MODEL SHOULD BE TRAINED
    # JUST LIKE YOU DID BEFORE
    @ray.remote(num_gpus=1)
    def train_model():
        # FETCH DATASETS
        # PERFORM FEATURE ENGINEERING
        # ...
        # TRAIN THE MODEL
        return model

    # TRAIN THE MODEL RMEOTELY ON THE CLUSTER
    my_model = train_model.remote()

    # LOG METRICS & SAVE ARTIFACTS
    mlflow.log_metrics(...)
    mlflow.save_model(my_model)

### RAY'S DOCUMENTATION IS REALLY GOOD

Data processing:
- Dask integration: https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html
- Pandas Integration: https://docs.ray.io/en/latest/ray-more-libs/modin/index.html


Machine Learning:
- Distributed pytorch: https://docs.ray.io/en/latest/train/getting-started-pytorch.html
- Distibuted sklearn: https://docs.ray.io/en/latest/ray-more-libs/joblib.html
- Distributed RL: https://docs.ray.io/en/latest/rllib/index.html
- Mass hyperparameter tuning: https://docs.ray.io/en/latest/tune/index.html