In [1]:
import gym
import numpy as np
import torch, torch.nn as nn
import torch.nn.functional as F
from tqdm import trange

my_agent = nn.Sequential(
    nn.Linear(4, 1024), nn.ReLU(), 
    nn.Linear(1024, 1024), nn.ReLU(),
    nn.Linear(1024, 2)
).to(device='cuda')

def play_a_game(predict_action_probs, steps=1000):
    """ Agent-environment interaction loop. Assume that it's so complicated that you don't wanna change it. """
    env = gym.make("CartPole-v0")
    obs = env.reset()
    total_reward = 0.0
    for i in range(steps):
        probs = predict_action_probs(obs)
        action = np.random.random() > probs[0]
        obs, r, done, _ = env.step(action)
        total_reward += r
        if done: obs = env.reset()
    return total_reward


env: CUDA_VISIBLE_DEVICES=3


### Naive approach

Since running several custom threads in torch is ~~for masoch...~~ problematic, we gonna play one game at a time. 

This takes 1 CPU thread and GPU util for my old gtx __680m__ is __around 24%__.

In [2]:
def predict_action_probs(observation):
    with torch.no_grad():
        obs = torch.tensor(observation[None, :], dtype=torch.float32, device='cuda')
        logits = my_agent(obs)
        return F.softmax(logits, dim=-1).cpu().numpy()[0]

In [3]:
%%time
rewards = [play_a_game(predict_action_probs)
           for i in trange(128)]

100%|██████████| 128/128 [00:44<00:00,  2.84it/s]

CPU times: user 44.8 s, sys: 220 ms, total: 45 s
Wall time: 45 s





### Batch-parallel approach

Group several calls together and process them as a batch.

__Note:__ despite using 64 threads this code only uses 3.5-4 CPU cores, most threads are waiting for GPU. Meanwhile, GPU util is __around 95%__

In [4]:
import joblib
from as_a_service import BatchedService
def predict_action_probs_batch(observations):
    with torch.no_grad():
        obs = torch.tensor(observations, dtype=torch.float32, device='cuda')
        logits = my_agent(obs)
        return F.softmax(logits, dim=-1).cpu().numpy()

predict_action_probs_service = BatchedService(predict_action_probs_batch, batch_size=64, max_delay=1e-3)

In [5]:
# works just like naive function
env = gym.make("CartPole-v0")
predict_action_probs_service(env.reset())

array([0.49511108, 0.50488895], dtype=float32)

In [6]:
%%time
tasks = [joblib.delayed(play_a_game, check_pickle=False)(predict_action_probs_service)
         for i in range(128)]

rewards = joblib.Parallel(n_jobs=64, backend='threading')(tasks)

CPU times: user 12.8 s, sys: 1.46 s, total: 14.3 s
Wall time: 11.6 s


### Same approach with decorators

The code does exactly the same as above using decorators.

In [7]:
from as_a_service import as_batched_service
@as_batched_service(batch_size=64, max_delay=1e-3)
def predict_action_probs_service(observations):
    with torch.no_grad():
        obs = torch.tensor(observations, dtype=torch.float32, device='cuda')
        logits = my_agent(obs)
        return F.softmax(logits, dim=-1).cpu().numpy()

In [8]:
%%time
tasks = [joblib.delayed(play_a_game, check_pickle=False)(predict_action_probs_service)
         for i in range(128)]

rewards = joblib.Parallel(n_jobs=64, backend='threading')(tasks)

CPU times: user 12.7 s, sys: 1.56 s, total: 14.3 s
Wall time: 11.7 s
