From: https://www.tensorflow.org/tutorials/structured_data/preprocessing_layers

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf

from tensorflow.keras import layers

2024-09-19 17:05:08.453140: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-19 17:05:08.453178: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-19 17:05:08.454693: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-19 17:05:08.462086: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
print(tf.__version__)

# Enable GPU memory growth
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
    except RuntimeError as e:
        print(e)

2.15.0


In [3]:
dataset_url = 'http://storage.googleapis.com/download.tensorflow.org/data/petfinder-mini.zip'
csv_file = 'datasets/petfinder-mini/petfinder-mini.csv'

tf.keras.utils.get_file('petfinder_mini.zip', dataset_url,
                        extract=True, cache_dir='.')
dataframe = pd.read_csv(csv_file)

In [4]:
dataframe.head()

Unnamed: 0,Type,Age,Breed1,Gender,Color1,Color2,MaturitySize,FurLength,Vaccinated,Sterilized,Health,Fee,Description,PhotoAmt,AdoptionSpeed
0,Cat,3,Tabby,Male,Black,White,Small,Short,No,No,Healthy,100,Nibble is a 3+ month old ball of cuteness. He ...,1,2
1,Cat,1,Domestic Medium Hair,Male,Black,Brown,Medium,Medium,Not Sure,Not Sure,Healthy,0,I just found it alone yesterday near my apartm...,2,0
2,Dog,1,Mixed Breed,Male,Brown,White,Medium,Medium,Yes,No,Healthy,0,Their pregnant mother was dumped by her irresp...,7,3
3,Dog,4,Mixed Breed,Female,Black,Brown,Medium,Short,Yes,No,Healthy,150,"Good guard dog, very alert, active, obedience ...",8,2
4,Dog,1,Mixed Breed,Male,Black,No Color,Medium,Short,No,No,Healthy,0,This handsome yet cute boy is up for adoption....,3,2


In [5]:
# In the original dataset, `'AdoptionSpeed'` of `4` indicates
# a pet was not adopted.
dataframe['target'] = np.where(dataframe['AdoptionSpeed']==4, 0, 1)

# Drop unused features.
dataframe = dataframe.drop(columns=['AdoptionSpeed', 'Description'])

In [6]:
train, val, test = np.split(dataframe.sample(frac=1), [int(0.8*len(dataframe)), int(0.9*len(dataframe))])

  return bound(*args, **kwds)


In [7]:
print(len(train), 'training examples')
print(len(val), 'validation examples')
print(len(test), 'test examples')

9229 training examples
1154 validation examples
1154 test examples


In [8]:
def df_to_dataset(dataframe, shuffle=True, batch_size=32):
    df = dataframe.copy()
    labels = df.pop('target')
    df = {key: np.array(value)[:, tf.newaxis] for key, value in dataframe.items()}
    ds = tf.data.Dataset.from_tensor_slices((dict(df), labels))
    if shuffle:
        ds = ds.shuffle(buffer_size=len(dataframe))
    ds = ds.batch(batch_size)
    ds = ds.prefetch(batch_size)
    return ds

In [9]:
batch_size = 5
train_ds = df_to_dataset(train, batch_size=batch_size)

2024-09-19 17:05:17.651434: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1929] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 31135 MB memory:  -> device: 0, name: Tesla V100-SXM3-32GB-H, pci bus id: 0000:34:00.0, compute capability: 7.0
2024-09-19 17:05:17.652892: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1929] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 31135 MB memory:  -> device: 1, name: Tesla V100-SXM3-32GB-H, pci bus id: 0000:36:00.0, compute capability: 7.0
2024-09-19 17:05:17.654147: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1929] Created device /job:localhost/replica:0/task:0/device:GPU:2 with 31135 MB memory:  -> device: 2, name: Tesla V100-SXM3-32GB-H, pci bus id: 0000:39:00.0, compute capability: 7.0
2024-09-19 17:05:17.655393: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1929] Created device /job:localhost/replica:0/task:0/device:GPU:3 with 31135 MB memory:  -> device: 3, name: Tesla V100-SXM3-32GB-H, pc

In [10]:
[(train_features, label_batch)] = train_ds.take(1)
print('Every feature:', list(train_features.keys()))
print('A batch of ages:', train_features['Age'])
print('A batch of targets:', label_batch )

Every feature: ['Type', 'Age', 'Breed1', 'Gender', 'Color1', 'Color2', 'MaturitySize', 'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Fee', 'PhotoAmt', 'target']
A batch of ages: tf.Tensor(
[[ 2]
 [ 6]
 [ 5]
 [ 2]
 [50]], shape=(5, 1), dtype=int64)
A batch of targets: tf.Tensor([1 0 1 1 0], shape=(5,), dtype=int64)


In [11]:
def get_normalization_layer(name, dataset):
    # Create a Normalization layer for the feature.
    normalizer = layers.Normalization(axis=None)

    # Prepare a Dataset that only yields the feature.
    feature_ds = dataset.map(lambda x, y: x[name])

    # Learn the statistics of the data.
    normalizer.adapt(feature_ds)

    return normalizer

In [12]:
#import os
#os.environ["XLA_FLAGS"] = "--xla_gpu_cuda_data_dir=/usr/lib/cuda" # set to location of cuda/nvvm/libdevice

photo_count_col = train_features['PhotoAmt']
layer = get_normalization_layer('PhotoAmt', train_ds)
layer(photo_count_col)

<tf.Tensor: shape=(5, 1), dtype=float32, numpy=
array([[ 0.43462536],
       [ 0.43462536],
       [ 1.3863401 ],
       [-0.51708937],
       [ 0.11738712]], dtype=float32)>

In [13]:
def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
    # Create a layer that turns strings into integer indices.
    if dtype == 'string':
        index = layers.StringLookup(max_tokens=max_tokens)
    # Otherwise, create a layer that turns integer values into integer indices.
    else:
        index = layers.IntegerLookup(max_tokens=max_tokens)

    # Prepare a `tf.data.Dataset` that only yields the feature.
    feature_ds = dataset.map(lambda x, y: x[name])

    # Learn the set of possible values and assign them a fixed integer index.
    index.adapt(feature_ds)

    # Encode the integer indices.
    encoder = layers.CategoryEncoding(num_tokens=index.vocabulary_size())

    # Apply multi-hot encoding to the indices. The lambda function captures the
    # layer, so you can use them, or include them in the Keras Functional model later.
    return lambda feature: encoder(index(feature))

In [14]:
test_type_col = train_features['Type']
test_type_layer = get_category_encoding_layer(name='Type',
                                              dataset=train_ds,
                                              dtype='string')
test_type_layer(test_type_col)

<tf.Tensor: shape=(5, 3), dtype=float32, numpy=
array([[0., 0., 1.],
       [0., 0., 1.],
       [0., 1., 0.],
       [0., 0., 1.],
       [0., 1., 0.]], dtype=float32)>

In [15]:
test_age_col = train_features['Age']
test_age_layer = get_category_encoding_layer(name='Age',
                                             dataset=train_ds,
                                             dtype='int64',
                                             max_tokens=5)
test_age_layer(test_age_col)

<tf.Tensor: shape=(5, 5), dtype=float32, numpy=
array([[0., 1., 0., 0., 0.],
       [1., 0., 0., 0., 0.],
       [1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [1., 0., 0., 0., 0.]], dtype=float32)>

In [16]:
batch_size = 256
train_ds = df_to_dataset(train, batch_size=batch_size)
val_ds = df_to_dataset(val, shuffle=False, batch_size=batch_size)
test_ds = df_to_dataset(test, shuffle=False, batch_size=batch_size)

In [17]:
all_inputs = []
encoded_features = []

# Numerical features.
for header in ['PhotoAmt', 'Fee']:
    numeric_col = tf.keras.Input(shape=(1,), name=header)
    normalization_layer = get_normalization_layer(header, train_ds)
    encoded_numeric_col = normalization_layer(numeric_col)
    all_inputs.append(numeric_col)
    encoded_features.append(encoded_numeric_col)

In [18]:
age_col = tf.keras.Input(shape=(1,), name='Age', dtype='int64')

encoding_layer = get_category_encoding_layer(name='Age',
                                             dataset=train_ds,
                                             dtype='int64',
                                             max_tokens=5)
encoded_age_col = encoding_layer(age_col)
all_inputs.append(age_col)
encoded_features.append(encoded_age_col)

In [19]:
categorical_cols = ['Type', 'Color1', 'Color2', 'Gender', 'MaturitySize',
                    'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Breed1']

for header in categorical_cols:
    categorical_col = tf.keras.Input(shape=(1,), name=header, dtype='string')
    encoding_layer = get_category_encoding_layer(name=header,
                                                 dataset=train_ds,
                                                 dtype='string',
                                                 max_tokens=5)

    encoded_categorical_col = encoding_layer(categorical_col)
    all_inputs.append(categorical_col)
    encoded_features.append(encoded_categorical_col)

In [20]:
all_features = tf.keras.layers.concatenate(encoded_features)
x = tf.keras.layers.Dense(32, activation="relu")(all_features)
x = tf.keras.layers.Dropout(0.5)(x)
output = tf.keras.layers.Dense(1)(x)

model = tf.keras.Model(all_inputs, output)

In [21]:
model.compile(optimizer='adam',
              loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
              metrics=["accuracy"])

In [22]:
# Use `rankdir='LR'` to make the graph horizontal.
tf.keras.utils.plot_model(model, show_shapes=True, rankdir="LR")

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


In [23]:
model.fit(train_ds, epochs=10, validation_data=val_ds)

Epoch 1/10


  inputs = self._flatten_to_reference_inputs(inputs)
2024-09-19 17:05:29.231853: I external/local_xla/xla/service/service.cc:168] XLA service 0x7fb8506901f0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2024-09-19 17:05:29.231879: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (0): Tesla V100-SXM3-32GB-H, Compute Capability 7.0
2024-09-19 17:05:29.231887: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (1): Tesla V100-SXM3-32GB-H, Compute Capability 7.0
2024-09-19 17:05:29.231894: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (2): Tesla V100-SXM3-32GB-H, Compute Capability 7.0
2024-09-19 17:05:29.231902: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (3): Tesla V100-SXM3-32GB-H, Compute Capability 7.0
2024-09-19 17:05:29.231908: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (4): Tesla V100-SXM3-32GB-H, Compute Capab

Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x7fbe29522a90>

In [24]:
loss, accuracy = model.evaluate(test_ds)
print("Accuracy", accuracy)

Accuracy 0.7192374467849731


## Save and Reload Model

In [25]:
import tensorflow as tf

In [26]:
model.save('my_pet_classifier')

INFO:tensorflow:Assets written to: my_pet_classifier/assets


INFO:tensorflow:Assets written to: my_pet_classifier/assets


In [27]:
reloaded_model = tf.keras.models.load_model('my_pet_classifier')

In [28]:
reloaded_model.inputs

[<KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'PhotoAmt')>,
 <KerasTensor: shape=(None, 1) dtype=float32 (created by layer 'Fee')>,
 <KerasTensor: shape=(None, 1) dtype=int64 (created by layer 'Age')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Type')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Color1')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Color2')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Gender')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'MaturitySize')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'FurLength')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Vaccinated')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Sterilized')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Health')>,
 <KerasTensor: shape=(None, 1) dtype=string (created by layer 'Breed1')>]

In [29]:
sample = {
    'Type': 'Cat',
    'Age': 3,
    'Breed1': 'Tabby',
    'Gender': 'Male',
    'Color1': 'Black',
    'Color2': 'White',
    'MaturitySize': 'Small',
    'FurLength': 'Short',
    'Vaccinated': 'No',
    'Sterilized': 'No',
    'Health': 'Healthy',
    'Fee': 100,
    'PhotoAmt': 2,
}

input_dict = {name: tf.convert_to_tensor([value]) for name, value in sample.items()}
predictions = reloaded_model.predict(input_dict)
prob = tf.nn.sigmoid(predictions[0])

print(
    "This particular pet had a %.1f percent probability "
    "of getting adopted." % (100 * prob)
)

This particular pet had a 79.9 percent probability of getting adopted.


## PySpark

In [30]:
from pyspark.sql import SparkSession

num_threads = 6

# Running Spark locally for demonstration.

_config = {
    "spark.master": f"local[{num_threads}]",
    "spark.driver.host": "127.0.0.1",
    "spark.task.maxFailures": "1",
    "spark.driver.memory": "8g",
    "spark.executor.memory": "20g",
    "spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": "false",
    "spark.sql.pyspark.jvmStacktrace.enabled": "true",
    "spark.sql.execution.arrow.pyspark.enabled": "true",
    "spark.python.worker.reuse": "true",
}
spark = SparkSession.builder.appName("spark-dl-example")
for key, value in _config.items():
    spark = spark.config(key, value)
spark = spark.getOrCreate()

sc = spark.sparkContext

24/09/19 17:05:37 WARN Utils: Your hostname, dgx2h0194.spark.sjc4.nvmetal.net resolves to a loopback address: 127.0.1.1; using 10.150.30.2 instead (on interface enp134s0f0np0)
24/09/19 17:05:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/19 17:05:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [31]:
df = spark.createDataFrame(dataframe)

In [32]:
df.write.mode("overwrite").parquet("datasets/petfinder-mini")

                                                                                

In [33]:
df.show()

+----+---+--------------------+------+------+--------+------------+---------+----------+----------+-------+---+--------+------+
|Type|Age|              Breed1|Gender|Color1|  Color2|MaturitySize|FurLength|Vaccinated|Sterilized| Health|Fee|PhotoAmt|target|
+----+---+--------------------+------+------+--------+------------+---------+----------+----------+-------+---+--------+------+
| Cat|  3|               Tabby|  Male| Black|   White|       Small|    Short|        No|        No|Healthy|100|       1|     1|
| Cat|  1|Domestic Medium Hair|  Male| Black|   Brown|      Medium|   Medium|  Not Sure|  Not Sure|Healthy|  0|       2|     1|
| Dog|  1|         Mixed Breed|  Male| Brown|   White|      Medium|   Medium|       Yes|        No|Healthy|  0|       7|     1|
| Dog|  4|         Mixed Breed|Female| Black|   Brown|      Medium|    Short|       Yes|        No|Healthy|150|       8|     1|
| Dog|  1|         Mixed Breed|  Male| Black|No Color|      Medium|    Short|        No|        No|Healt

## Inference using Spark DL API

In [34]:
import numpy as np
import os

from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.functions import struct, col
from pyspark.sql.types import ArrayType, FloatType

In [35]:
df = spark.read.parquet("datasets/petfinder-mini").cache()
df.show(5)

+----+---+------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+
|Type|Age|            Breed1|Gender|Color1|  Color2|MaturitySize|FurLength|Vaccinated|Sterilized|      Health|Fee|PhotoAmt|target|
+----+---+------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+
| Dog|  7|       Mixed Breed|Female| Black|   Brown|      Medium|    Short|        No|       Yes|     Healthy|  0|       2|     1|
| Dog| 24|       Mixed Breed|Female| Brown|No Color|       Large|   Medium|       Yes|       Yes|     Healthy|  0|       3|     0|
| Dog| 61|      Irish Setter|Female| Brown|  Golden|       Large|     Long|       Yes|       Yes|     Healthy|  0|       2|     1|
| Dog|  3|       Mixed Breed|  Male| Brown|  Golden|      Medium|    Short|       Yes|  Not Sure|Minor Injury|  0|       9|     0|
| Dog|  5|Miniature Pinscher|Female| Black|   Brown|       Small|    Short|       Y

In [36]:
columns = df.columns
print(columns)

['Type', 'Age', 'Breed1', 'Gender', 'Color1', 'Color2', 'MaturitySize', 'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Fee', 'PhotoAmt', 'target']


In [37]:
# remove label column
columns.remove("target")
print(columns)

['Type', 'Age', 'Breed1', 'Gender', 'Color1', 'Color2', 'MaturitySize', 'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Fee', 'PhotoAmt']


In [38]:
# get absolute path to model
model_dir = "{}/my_pet_classifier".format(os.getcwd())

In [39]:
def predict_batch_fn():
    import tensorflow as tf
    import pandas as pd
    
    # Enable GPU memory growth
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(e)

    model = tf.keras.models.load_model(model_dir)

    def predict(t, a, b, g, c1, c2, m, f, v, s, h, fee, p):
        inputs = {
            "Type": t,
            "Age": a,
            "Breed1": b,
            "Gender": g,
            "Color1": c1,
            "Color2": c2,
            "MaturitySize": m,
            "FurLength": f,
            "Vaccinated": v,
            "Sterilized": s,
            "Health": h,
            "Fee": fee,
            "PhotoAmt": p
        }
        # return model.predict(inputs)
        return pd.Series(np.squeeze(model.predict(inputs)))

    return predict

In [40]:
# need to pass the list of columns into the model_udf
classify = predict_batch_udf(predict_batch_fn,
                             return_type=FloatType(),
                             batch_size=100)

In [41]:
%%time
preds = df.withColumn("preds", classify(struct(*columns)))
results = preds.collect()

24/09/19 17:05:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2024-09-19 17:05:46.298305: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-19 17:05:46.298342: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-19 17:05:46.299830: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-19 17:05:46.306514: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical o

CPU times: user 137 ms, sys: 63.3 ms, total: 201 ms
Wall time: 12.1 s


In [42]:
%%time
preds = df.withColumn("preds", classify(*columns))
results = preds.collect()



CPU times: user 81.3 ms, sys: 25.8 ms, total: 107 ms
Wall time: 4.39 s


                                                                                

In [43]:
%%time
preds = df.withColumn("preds", classify(*[col(c) for c in columns]))
results = preds.collect()

                                                                                

CPU times: user 77.9 ms, sys: 24.6 ms, total: 102 ms
Wall time: 4.43 s


In [44]:
preds.show()

+----+---+--------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+------------+
|Type|Age|              Breed1|Gender|Color1|  Color2|MaturitySize|FurLength|Vaccinated|Sterilized|      Health|Fee|PhotoAmt|target|       preds|
+----+---+--------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+------------+
| Dog|  7|         Mixed Breed|Female| Black|   Brown|      Medium|    Short|        No|       Yes|     Healthy|  0|       2|     1| -0.10501782|
| Dog| 24|         Mixed Breed|Female| Brown|No Color|       Large|   Medium|       Yes|       Yes|     Healthy|  0|       3|     0| -0.25644353|
| Dog| 61|        Irish Setter|Female| Brown|  Golden|       Large|     Long|       Yes|       Yes|     Healthy|  0|       2|     1|   0.9123644|
| Dog|  3|         Mixed Breed|  Male| Brown|  Golden|      Medium|    Short|       Yes|  Not Sure|Minor Injury|  0|       9



### Using Triton Inference Server

Note: you can restart the kernel and run from this point to simulate running in a different node or environment.

This notebook uses the [Python backend with a custom execution environment](https://github.com/triton-inference-server/python_backend#creating-custom-execution-environments), using a conda-pack environment created as follows:
```
conda create -n tf-gpu -c conda-forge python=3.8
conda activate tf-gpu
export PYTHONNOUSERSITE=True
pip install tensorflow
pip install conda-pack
conda pack  # tf-gpu.tar.gz
```

In [45]:
import numpy as np
import os
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.functions import col, struct
from pyspark.sql.types import ArrayType, FloatType

In [47]:
%%bash
# copy custom model to expected layout for Triton
rm -rf models
mkdir -p models
cp -r models_config/feature_columns models

# add custom execution environment
cp tf-gpu.tar.gz models

#### Start Triton Server on each executor

In [48]:
num_executors = 1
triton_models_dir = "{}/models".format(os.getcwd())
my_pet_classifier_dir = "{}/my_pet_classifier".format(os.getcwd())
nodeRDD = sc.parallelize(list(range(num_executors)), num_executors)

def start_triton(it):
    import docker
    import time
    import tritonclient.grpc as grpcclient
    
    client=docker.from_env()
    containers=client.containers.list(filters={"name": "spark-triton"})
    if containers:
        print(">>>> containers: {}".format([c.short_id for c in containers]))
    else:
        container=client.containers.run(
            "nvcr.io/nvidia/tritonserver:23.04-py3", "tritonserver --model-repository=/models",
            detach=True,
            device_requests=[docker.types.DeviceRequest(device_ids=["0"], capabilities=[['gpu']])],
            name="spark-triton",
            network_mode="host",
            remove=True,
            shm_size="128M",
            volumes={
                triton_models_dir: {"bind": "/models", "mode": "ro"},
                my_pet_classifier_dir: {"bind": "/my_pet_classifier", "mode": "ro"}
            }
        )
        print(">>>> starting triton: {}".format(container.short_id))

        # wait for triton to be running
        time.sleep(15)
        client = grpcclient.InferenceServerClient("localhost:8001")
        ready = False
        while not ready:
            try:
                ready = client.is_server_ready()
            except Exception as e:
                time.sleep(5)
            
    return [True]

nodeRDD.barrier().mapPartitions(start_triton).collect()

>>>> starting triton: 1e0d8d7b4599                                  (0 + 1) / 1]
                                                                                

[True]

#### Run inference

In [49]:
df = spark.read.parquet("datasets/petfinder-mini")

In [50]:
df.show(5)

+----+---+------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+
|Type|Age|            Breed1|Gender|Color1|  Color2|MaturitySize|FurLength|Vaccinated|Sterilized|      Health|Fee|PhotoAmt|target|
+----+---+------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+
| Dog|  7|       Mixed Breed|Female| Black|   Brown|      Medium|    Short|        No|       Yes|     Healthy|  0|       2|     1|
| Dog| 24|       Mixed Breed|Female| Brown|No Color|       Large|   Medium|       Yes|       Yes|     Healthy|  0|       3|     0|
| Dog| 61|      Irish Setter|Female| Brown|  Golden|       Large|     Long|       Yes|       Yes|     Healthy|  0|       2|     1|
| Dog|  3|       Mixed Breed|  Male| Brown|  Golden|      Medium|    Short|       Yes|  Not Sure|Minor Injury|  0|       9|     0|
| Dog|  5|Miniature Pinscher|Female| Black|   Brown|       Small|    Short|       Y

In [51]:
columns = df.columns
print(columns)

['Type', 'Age', 'Breed1', 'Gender', 'Color1', 'Color2', 'MaturitySize', 'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Fee', 'PhotoAmt', 'target']


In [52]:
# remove label column
columns.remove("target")
print(columns)

['Type', 'Age', 'Breed1', 'Gender', 'Color1', 'Color2', 'MaturitySize', 'FurLength', 'Vaccinated', 'Sterilized', 'Health', 'Fee', 'PhotoAmt']


In [53]:
def triton_fn(triton_uri, model_name):
    import numpy as np
    import tritonclient.grpc as grpcclient
    
    np_types = {
      "BOOL": np.dtype(np.bool8),
      "INT8": np.dtype(np.int8),
      "INT16": np.dtype(np.int16),
      "INT32": np.dtype(np.int32),
      "INT64": np.dtype(np.int64),
      "FP16": np.dtype(np.float16),
      "FP32": np.dtype(np.float32),
      "FP64": np.dtype(np.float64),
      "FP64": np.dtype(np.double),
      "BYTES": np.dtype(object)
    }

    client = grpcclient.InferenceServerClient(triton_uri)
    model_meta = client.get_model_metadata(model_name)
    
    def predict(t, a, b, g, c1, c2, m, f, v, s, h, fee, p):
        # convert input ndarrays into a dictionary of ndarrays
        inputs = {
            "Type": t, 
            "Age": a, 
            "Breed1": b, 
            "Gender": g,
            "Color1": c1,
            "Color2": c2,
            "MaturitySize": m,
            "FurLength": f,
            "Vaccinated": v, 
            "Sterilized": s,
            "Health": h,
            "Fee": fee,
            "PhotoAmt": p
        }
        return _predict(inputs)
        
    def _predict(inputs):
        if isinstance(inputs, np.ndarray):
            # single ndarray input
            request = [grpcclient.InferInput(model_meta.inputs[0].name, inputs.shape, model_meta.inputs[0].datatype)]
            request[0].set_data_from_numpy(inputs.astype(np_types[model_meta.inputs[0].datatype]))
        else:
            # dict of multiple ndarray inputs
            request = [grpcclient.InferInput(i.name, inputs[i.name].shape, i.datatype) for i in model_meta.inputs]
            for i in request:
                i.set_data_from_numpy(inputs[i.name()].astype(np_types[i.datatype()]))
        
        response = client.infer(model_name, inputs=request)
        
        if len(model_meta.outputs) > 1:
            # return dictionary of numpy arrays
            return {o.name: response.as_numpy(o.name) for o in model_meta.outputs}
        else:
            # return single numpy array
            return response.as_numpy(model_meta.outputs[0].name)
        
        
    return predict

In [54]:
from functools import partial

# need to pass the list of columns into the model_udf
classify = predict_batch_udf(partial(triton_fn, triton_uri="localhost:8001", model_name="feature_columns"),
                             input_tensor_shapes=[[1]] * len(columns),
                             return_type=FloatType(),
                             batch_size=1024)

In [55]:
# FAILS: Op type not registered 'DenseBincount' WITHOUT custom python backend
df.withColumn("preds", classify(struct(*columns))).show(truncate=10)



+----+---+----------+------+------+--------+------------+---------+----------+----------+----------+---+--------+------+----------+
|Type|Age|    Breed1|Gender|Color1|  Color2|MaturitySize|FurLength|Vaccinated|Sterilized|    Health|Fee|PhotoAmt|target|     preds|
+----+---+----------+------+------+--------+------------+---------+----------+----------+----------+---+--------+------+----------+
| Dog|  7|Mixed B...|Female| Black|   Brown|      Medium|    Short|        No|       Yes|   Healthy|  0|       2|     1|-0.1050...|
| Dog| 24|Mixed B...|Female| Brown|No Color|       Large|   Medium|       Yes|       Yes|   Healthy|  0|       3|     0|-0.2564...|
| Dog| 61|Irish S...|Female| Brown|  Golden|       Large|     Long|       Yes|       Yes|   Healthy|  0|       2|     1| 0.9123644|
| Dog|  3|Mixed B...|  Male| Brown|  Golden|      Medium|    Short|       Yes|  Not Sure|Minor I...|  0|       9|     0|0.46646258|
| Dog|  5|Miniatu...|Female| Black|   Brown|       Small|    Short|       Ye

                                                                                

In [56]:
%%time
preds = df.withColumn("preds", classify(struct(*columns)))
results = preds.collect()



CPU times: user 51.4 ms, sys: 21.4 ms, total: 72.8 ms
Wall time: 3.18 s


                                                                                

In [57]:
%%time
preds = df.withColumn("preds", classify(*columns))
results = preds.collect()



CPU times: user 53.8 ms, sys: 2.7 ms, total: 56.5 ms
Wall time: 2.47 s


                                                                                

In [58]:
%%time
preds = df.withColumn("preds", classify(*[col(c) for c in columns]))
results = preds.collect()



CPU times: user 49.3 ms, sys: 9.69 ms, total: 59 ms
Wall time: 2.48 s


                                                                                

In [59]:
preds.show()

+----+---+--------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+-----------+
|Type|Age|              Breed1|Gender|Color1|  Color2|MaturitySize|FurLength|Vaccinated|Sterilized|      Health|Fee|PhotoAmt|target|      preds|
+----+---+--------------------+------+------+--------+------------+---------+----------+----------+------------+---+--------+------+-----------+
| Dog|  7|         Mixed Breed|Female| Black|   Brown|      Medium|    Short|        No|       Yes|     Healthy|  0|       2|     1|-0.10501783|
| Dog| 24|         Mixed Breed|Female| Brown|No Color|       Large|   Medium|       Yes|       Yes|     Healthy|  0|       3|     0|-0.25644356|
| Dog| 61|        Irish Setter|Female| Brown|  Golden|       Large|     Long|       Yes|       Yes|     Healthy|  0|       2|     1|  0.9123644|
| Dog|  3|         Mixed Breed|  Male| Brown|  Golden|      Medium|    Short|       Yes|  Not Sure|Minor Injury|  0|       9|     

#### Stop Triton Server on each executor

In [60]:
def stop_triton(it):
    import docker
    import time
    
    client=docker.from_env()
    containers=client.containers.list(filters={"name": "spark-triton"})
    print(">>>> stopping containers: {}".format([c.short_id for c in containers]))
    if containers:
        container=containers[0]
        container.stop(timeout=120)

    return [True]

nodeRDD.barrier().mapPartitions(stop_triton).collect()

>>>> stopping containers: ['1e0d8d7b4599']
                                                                                

[True]

In [61]:
spark.stop()