# Distributed batch_inference with dask 
- With Dask distributed, inference on 1000 rows finished in **1min 42s**
- With Pandas non-distributed, inference on 5 rows took 2.69s, for 1000 rows the estimation is **8 min 46s**. 
- Dask achieves a **5.1 times speed up**, in line with the 6 workers in the Dask cluster. 
- To speed up more we can use more workers

In [None]:
import sys
!{sys.executable} -m pip install -r requirements.txt --quiet



In [1]:
import warnings
pd.options.display.max_rows = 999
warnings.filterwarnings('ignore')
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' 

import gcsfs
import s3fs
import pickle
from transformers import DistilBertTokenizer, TFDistilBertForSequenceClassification
import tensorflow as tf
from tensorflow.keras import activations, optimizers, losses
from scheduler_setup import loaded_models
tf.get_logger().setLevel('ERROR')

Downloading:   0%|          | 0.00/483 [00:00<?, ?B/s]

Downloading:   0%|          | 0.00/347M [00:00<?, ?B/s]

Some layers from the model checkpoint at distilbert-base-uncased were not used when initializing TFDistilBertForSequenceClassification: ['vocab_transform', 'activation_13', 'vocab_projector', 'vocab_layer_norm']
- This IS expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some layers of TFDistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['pre_classifier', 'classifier', 'dropout_19']
You should probably TRAIN this model on a down-stream task to be able to use i

## Start a Dask cluster fror distributed inference

In [7]:
from hyperplane import notebook_common as nc

client, cluster = nc.initialize_cluster(
    nprocs = 3,
    nthreads = 5,
    ram_gb_per_proc = 4,
    cores_per_worker = 15,
    num_workers = 2
)

👉 Hyperplane: selecting worker node pool
👉 Hyperplane: selecting scheduler node pool
Creating scheduler pod on cluster. This may take some time.
👉 Hyperplane: spinning up a dask cluster with a scheduler as a standalone container.
👉 Hyperplane: In a few minutes you'll be able to access the dashboard at https://ds.hyperplane.dev/dask-cluster-c76da13e-b7fa-4d2f-a620-895721e9c986/status
👉 Hyperplane: to get logs from all workers, do `cluster.get_logs()`


### install any necessary package on the cluster workers with dask worker plugin


In [9]:
from dask.distributed import PipInstall
plugin = PipInstall(packages=["s3fs"], pip_options=["--upgrade"])
client.register_worker_plugin(plugin)

CPU times: user 9.14 ms, sys: 2.34 ms, total: 11.5 ms
Wall time: 8.55 s


{'tcp://10.1.113.16:35955': {'status': 'OK'},
 'tcp://10.1.113.16:37409': {'status': 'OK'},
 'tcp://10.1.113.16:41277': {'status': 'OK'},
 'tcp://10.1.114.16:35745': {'status': 'OK'},
 'tcp://10.1.114.16:41753': {'status': 'OK'},
 'tcp://10.1.114.16:45565': {'status': 'OK'}}

### Upload the scheduler_setup script, which loads the model to each dask worker for inference
- this way the inference process does not need to download the model over and over during the inference 

In [10]:
client.upload_file('scheduler_setup.py')

{'tcp://10.1.113.16:35955': {'status': 'OK'},
 'tcp://10.1.113.16:37409': {'status': 'OK'},
 'tcp://10.1.113.16:41277': {'status': 'OK'},
 'tcp://10.1.114.16:35745': {'status': 'OK'},
 'tcp://10.1.114.16:41753': {'status': 'OK'},
 'tcp://10.1.114.16:45565': {'status': 'OK'}}

### read parquet or csv file into Dask dataframe from your cloud bucket
1. Download the data from [Kaggle](https://www.kaggle.com/yelp-dataset/yelp-dataset)
2. Save it to your local or cloud location 
3. change the file path to where your data is stored

In [1]:
import dask.dataframe as dd
file_path = "gs://pipeline_data/ray_data/yelp_review_dask.parquet"
df = dd.read_parquet(file_path)

100


Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date
0,lWC-xP3rd6obsecCYsGZRg,ak0TdVmGKo4pwqdJSTLwWw,buF9druCkbuXLX526sGELQ,4,3,1,1,Apparently Prides Osteria had a rough summer a...,1412998442000000000
1,8bFej1QE5LXp4O05qjGqXA,YoVfDbnISlW0f7abNQACIg,RA4V8pr014UyUbDvI-LW2A,4,1,0,0,This store is pretty good. Not as great as Wal...,1435955905000000000


In [12]:
def clean_text(df):
    df['text'] = df.text.replace(r'\r+|\n+|\t+','', regex=True)
    df['text'] = df.text.str.lower()
    return df

df = df.map_partitions(clean_text)
df['text'] = df.text.astype(str)
df.dtypes

review_id      object
user_id        object
business_id    object
stars           int64
useful          int64
funny           int64
cool            int64
text           object
date            int64
dtype: object

## Distributed inference with Dask
- The most convenient way to run inference in dask is using the `map_partitions` function
- The `predict` function is a normal function that do the inference on a pandas dataframe
- Dask `map_partitions` function maps the `predict` function to each partition of the data dataframe, which are pandas dataframes (read more on [Dask dataframe](https://docs.dask.org/en/latest/dataframe.html)

In [None]:
def predict(df: pd.DataFrame, batch_size : str) -> pd.DataFrame:
    import sys
#     sys.path.append('/root')
    from scheduler_setup import loaded_models
    model, model_name, max_len = loaded_models['model']
    
    tkzr = DistilBertTokenizer.from_pretrained(model_name)
    inputs = tkzr(df.text.tolist(),    
                  padding='max_length',
                  truncation=True, 
                  return_tensors='tf')

    dataset = tf.data.Dataset.from_tensor_slices((inputs['input_ids'],inputs['attention_mask']))
    dataset = dataset.batch(batch_size)

    import numpy as np
    predictions = []
    for i, (token_ids, masks) in enumerate(dataset):
        pred = model(token_ids, attention_mask=masks)
        labels = np.argmax(tf.nn.softmax(pred.logits, axis=0).numpy(), axis = 1)
        predictions.append(labels)
    predictions = np.hstack(predictions)
    df['pred'] = predictions
    return df

In [15]:
%%time
# Use Pandas
df_local = df.head(5)
print(df_local.shape)
predict(df_local, batch_size=8)

(5, 9)
CPU times: user 9.07 s, sys: 1.82 s, total: 10.9 s
Wall time: 2.69 s


Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date,pred
0,lWC-xP3rd6obsecCYsGZRg,ak0TdVmGKo4pwqdJSTLwWw,buF9druCkbuXLX526sGELQ,4,3,1,1,apparently prides osteria had a rough summer a...,1412998442000000000,0
1,8bFej1QE5LXp4O05qjGqXA,YoVfDbnISlW0f7abNQACIg,RA4V8pr014UyUbDvI-LW2A,4,1,0,0,this store is pretty good. not as great as wal...,1435955905000000000,1
2,NDhkzczKjLshODbqDoNLSg,eC5evKn1TWDyHCyQAwguUw,_sS2LBIGNT5NQb6PD1Vtjw,5,0,0,0,i called wvm on the recommendation of a couple...,1369773486000000000,1
3,T5fAqjjFooT4V0OeZyuk1w,SFQ1jcnGguO0LYWnbbftAA,0AzLzHfOJgL7ROwhdww2ew,2,1,1,1,i've stayed at many marriott and renaissance m...,1262917755000000000,0
4,sjm_uUcQVxab_EeLCqsYLg,0kA0PAJ8QFMeveQWHFqz2A,8zehGz9jnxPqXtOc7KaJxA,4,0,0,0,the food is always great here. the service fro...,1311876301000000000,1


In [16]:
## For the purpose of demo here, we sample only 1000 rows from the entire dataset
print(len(df))
df_sample = df.sample(0.001)
len(df_sample)

1000000


1000

In [17]:
from numpy import dtype
meta = df.dtypes.to_dict()
meta['pred'] = 'int'

df_result = df_sample.map_partitions(predict, batch_size=8 , meta = meta)

In [18]:
%%time
df_result_local = df_result.compute()
df_result_local.shape

CPU times: user 743 ms, sys: 19.1 ms, total: 762 ms
Wall time: 1min 40s


(1000, 10)

In [19]:
cluster.close()