Unfortunately, you cannot use Modal in a notebook in the same way that you might from the command line. In short, this breaks:

In [2]:
import modal
import time 

app = modal.App()


@app.function(concurrency_limit=10)
def func(a, b):
    time.sleep(0.3)
    return f"a={a}, b={b}"

with modal.enable_output(), app.run():
    print("single call")
    print(func.remote(1, 2))

    print("sequential calls")
    for i in range(20): 
        print(func.remote(i, i + 1))

    print("parallel call")
    out = func.starmap([(i, i + 1) for i in range(20)])
    for item in out:
        print(item)


Output()

Output()

Output()

InvalidError: You can't run Function.map() or Function.for_each() from an async function. Use Function.map.aio()/Function.for_each.aio() instead.

This is particularily painful when we consider that it's the parallel stuff that we are usually super interested in!

## Enter joblib

In [3]:
import random
import time
from joblib import Parallel, delayed

def expensive(i):
    time.sleep(random.random())
    return i

p = Parallel(n_jobs=8)(delayed(expensive)(i) for i in range(10))
p[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [5]:
p = Parallel(n_jobs=8, return_as="generator")(delayed(expensive)(i) for i in range(10))

for item in p:
    print(item)

0
1
2
3
4
5
6
7
8
9


In [6]:
p = Parallel(n_jobs=8, return_as="generator_unordered")(delayed(expensive)(i) for i in range(10))

for item in p:
    print(item)

0
1
4
8
7
2
5
9
6
3


The whole point here is that joblib figures out the "parallel stuff". This is quite a bit of stuff too: 

- figuring how to spin up workers
- make sure workers have dependencies
- make sure we serialize to the workers
- make sure that we can serialize stuff back
- try and be fault tolerant about it

But what if we want to use huge cloud resources and many machines? Backends!

- [Dask](https://joblib.readthedocs.io/en/latest/auto_examples/parallel/distributed_backend_simple.html)
- [Ray](https://docs.ray.io/en/latest/ray-more-libs/joblib.html)

But now also Modal!

In [7]:
# This is needed to register the modal backend
import joblib_modal
from joblib import parallel_config, Parallel, delayed

In [8]:
with parallel_config(backend="modal", name="my-test-job"):
    out = Parallel(n_jobs=-1)(delayed(expensive)(i) for i in range(100))



In [9]:
list(out)

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99]

When is this super duper useful? 

In [14]:
import modal
import numpy as np
import joblib
import scipy
import sklearn
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.datasets import make_classification

image = (
  modal.Image.debian_slim()
  .pip_install(f"scikit-learn=={sklearn.__version__}")
  .pip_install(f"numpy=={np.__version__}")
  .pip_install(f"joblib=={joblib.__version__}")
  .pip_install(f"scipy=={scipy.__version__}")
)

param_grid = {'learning_rate': np.logspace(-3, -1, 100), 'max_depth': [3, 5, 7, 9, 11], "random_state": [42, 43, 44]}

clf = HistGradientBoostingClassifier()
grid_search = RandomizedSearchCV(clf, param_grid, cv=5, n_jobs=-1, n_iter=200)
X, y = make_classification()

This benchmark is silly. But notice the speed/scale. 

In [15]:
%%time

with parallel_config(
    backend="modal",
    n_jobs=-1,
    name="test-joblib",
    image=image,
    modal_output=False,
):
    grid_search.fit(X, y)

CPU times: user 2.59 s, sys: 5.82 s, total: 8.42 s
Wall time: 13.7 s


In [17]:
import pandas as pd

pd.DataFrame(grid_search.cv_results_).shape

(200, 16)

The focus is gridsearch for now, but I figured I might try to use it for something else as well!

In [20]:
import pandas as pd

texts = pd.read_csv("sentences.csv", nrows=50_000)["sentence"].tolist()

In [21]:
def batch_items(items, batch_size=1000):
    batch = []
    for item in items:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:  # Don't forget the last partial batch
        yield batch

batches = batch_items(texts, batch_size=1000)

In [18]:
from joblib import parallel_config, Parallel, delayed
import modal
import joblib_modal

EMBEDDING_MODEL = "all-MiniLM-L6-v2"

image = (
    modal.Image.debian_slim()
    .pip_install("sentence-transformers")
    .run_commands(
        f"python -c 'from sentence_transformers import SentenceTransformer; tfm = SentenceTransformer(\"{EMBEDDING_MODEL}\")'"
    )
)


def embed(texts):
    from sentence_transformers import SentenceTransformer

    return SentenceTransformer(EMBEDDING_MODEL).encode(texts)

In [21]:
%%time

batches = batch_items(texts, batch_size=1000)

with parallel_config(
    backend="modal", name="my-emb-job", image=image, modal_output=False
):
    out = Parallel(n_jobs=-1)(delayed(embed)(t) for t in batches)



CPU times: user 2.09 s, sys: 964 ms, total: 3.06 s
Wall time: 41.6 s


In [22]:
print(np.concatenate(out).shape)

(50000, 384)


In this case though, we might also try to run a single GPU function. 

In [24]:
%%time

batches = batch_items(texts, batch_size=500)

with parallel_config(
    backend="modal", name="my-emb-job", image=image, modal_output=False,
):
    out = Parallel(n_jobs=-1)(delayed(embed)(t) for t in batches)

CPU times: user 2.8 s, sys: 1.33 s, total: 4.13 s
Wall time: 46.7 s


In [28]:
%%time

batches = batch_items(texts, batch_size=250)

with parallel_config(
    backend="modal", name="my-emb-job", image=image, modal_output=False
):
    out = Parallel(n_jobs=-1)(delayed(embed)(t) for t in batches)

CPU times: user 2.58 s, sys: 1.54 s, total: 4.11 s
Wall time: 35.8 s


In [25]:
import modal
from sentence_transformers import SentenceTransformer

EMBEDDING_MODEL = 'all-MiniLM-L6-v2'

image = (
    modal.Image.debian_slim()
      .pip_install(f"sentence-transformers")
      .run_commands(f"python -c 'from sentence_transformers import SentenceTransformer; tfm = SentenceTransformer(\"{EMBEDDING_MODEL}\")'")
)

app = modal.App(image=image)

@app.function(gpu="A100")
def func(texts):
    return SentenceTransformer(EMBEDDING_MODEL).encode(texts)

In [26]:
%%time

with app.run():
    out = func.remote(texts)

CPU times: user 504 ms, sys: 349 ms, total: 852 ms
Wall time: 40 s


In [16]:
out.shape

(50000, 384)

In [24]:
%%time

SentenceTransformer(EMBEDDING_MODEL).encode(texts)

CPU times: user 1min 39s, sys: 15.8 s, total: 1min 55s
Wall time: 3min 9s


array([[ 0.00275143,  0.05215697,  0.02795227, ..., -0.00358941,
         0.04294354,  0.03718437],
       [-0.01325271, -0.03367144, -0.07323536, ...,  0.01747229,
         0.02711525,  0.016857  ],
       [-0.03211771,  0.04568438,  0.04654278, ...,  0.02424458,
         0.01672448, -0.02681892],
       ...,
       [-0.03979724, -0.10982806,  0.01440763, ..., -0.03324683,
        -0.03124548,  0.02747727],
       [-0.13533212,  0.00873163,  0.01361843, ..., -0.0047535 ,
        -0.00705758, -0.00325262],
       [-0.07847907,  0.01026217, -0.03207597, ..., -0.03375116,
        -0.08578945,  0.02612685]], dtype=float32)

In [29]:
len(texts)

50000

In [27]:
gpu_per_s = 0.000772
40 * gpu_per_s

0.03088

In [30]:
# This job that 200 workers in total and logs suggest that each worker spent about 2s.

cpu_per_s = 0.000038
200 * 2 * cpu_per_s

0.015200000000000002

Take these numbers with a grain of salt because it is unclear how compute for cold/warm state is calculated. Memory is also not taken into account. But it remains an interesting result. There might be some jobs that are faster on a whole lot of CPUs instead of a GPU.

Things not to forget, joblib integration right now:

- configure timeout?
- configure GPU?
- configure datasets?
- warm start memory? 