GPU based, batch and streaming predictions on Dataflow


Dataflow allows you to run a process in a custom Docker container, optionally with a GPU.

These 2 features bring new possibilities for people who want to run batch and streaming predictions on Dataflow. Thanks to Dockerized environment, you can define any prediction environment you need, and make predictions more efficient using GPU.

BigFlow 1.3 comes with utilities that make your life easier when it comes to writing Dockerized processes and writing prediction logic.

This recipe will show you how to set up a PyTorch prediction process, but the dockerization allows you to define an environment for any framework. We assume that you are already familiar with the BigFlow API.

We are going to create a batch prediction process for the Fashion MNIST model, using Dataflow. You can download the model here. We won't focus on the utility or performance side of the project, but more on the API that BigFlow provides, which makes creating prediction processes easier.

Docker image and packages

To run a prediction process on GPU, you need a Docker file with the CUDA drivers. The image below has all the dependencies for running a PyTorch prediction process, and also it's compatible with BigFlow. Use that image as the image for your BigFlow project.

# Docker base image, includes os-level tools (shell, utilities, services), python interpreter, C-libraries, etc.

# use it if you want ot use custom Docker image on Dataflow workers
FROM apache/beam_python3.7_sdk:2.27.0

# Cherry-pick apache beam binaries from offical image to our custom.
# Use this only in case when you unable to use original image (FROM apache/beam...)
# COPY --from=apache/beam_python3.6_sdk:2.26.0 /opt/apache/beam /opt/apache/beam

# Install CUDA drivers to your docker
# Note - this will increase size of your image dramatically, which has influence
# on building and deployment timings as well. Do not install CUDA unless you are using GPU.
RUN echo "Installing CUDA ..." \
  && wget \
  && sh cuda_10.0.130_410.48_linux --toolkit --silent --override \
  && rm cuda_10.0.130_410.48_linux \
  && echo "CUDA was installed!"
ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/nvidia/lib64:/usr/local/cuda/lib64

# Working directory .

# Install apt packages.  Add custom libraries, dev-packages, c++ compiler etc.
RUN apt-get update && apt-get install -y \
    # libc-dev \
    && rm -rf /var/lib/apt/lists/*

# Preinstall python packages (docker layer caching).
# Manually install package if you can't put it into your `` for some reason.
COPY ./resources/requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# RUN pip install my-awesome-package==1.2.3

# Install bigflow project 'whl' package.
COPY ./dist dist
RUN pip install dist/*.whl

Writing prediction process

First, create a new BigFlow project called fashionmnist, using scaffolding tool and remove automatically-generated workflows.

Next, define the model in fashionmist.model module:

from torch import nn

classes = [
    "Ankle boot",

device = 'cuda'

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.Linear(512, 512),
            nn.Linear(512, 10),

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

Now, let us define the processing logic and workflow:

import logging
import uuid

import pandas as pd
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import bigflow.dataflow
import as bf_io
import as bf_ml

import fashionmist.model as fmnist

logger = logging.getLogger(__name__)

config = bigflow.Config(
        project='<PUT YOUR PROJECT ID HERE>',
        staging_location='gs://<PUT YOUR PROJECT ID HERE>/beam_runner/staging',
        temp_location='gs://<PUT YOUR PROJECT ID HERE>/beam_runner/temp',
        model_path="gs://<PUT YOUR PROJECT ID HERE>/fashion.pth",

class PytorchClassifierModel(bf_ml.BaseModel):

    def load_model(self, path, **kwargs):
        model = fmnist.NeuralNetwork()
        with bf_io.download_file_to_localfs(self.model_path) as m_path:
            model =
        return model

    def predict(self, x: pd.DataFrame):
        X = x['value'].iloc[0]
        X =
        model = self.ensure_model()
        with fmnist.torch.no_grad():
            pred = model(X)
            predicted = fmnist.classes[pred[0].argmax(0)]
  'Prediction sample: "{predicted}"')

class LoadBatchesDoFn(beam.DoFn):
    def process(self, element, *args, **kwargs):
        test_dataset = fmnist.datasets.FashionMNIST(
        test_dataloader = fmnist.DataLoader(test_dataset, batch_size=100)
        for batch, (X, y) in enumerate(test_dataloader):
            yield pd.DataFrame([
                    'batch_id': str(uuid.uuid1()),
                    'value': X

def run_predictions(
        pipeline: beam.Pipeline,
        context: bigflow.JobContext,
        _entry_point_arguments,  # unused
    model = PytorchClassifierModel(model_path=config['model_path'])

    return (pipeline
            | "InitialImpulse" >> beam.Create(['impulse'])
            | "LoadBatches" >> beam.ParDo(LoadBatchesDoFn())
            | "ReshuffleWorkload" >> beam.Reshuffle()
            | "Predict" >> bf_ml.ApplyModel(model=model, key_column='batch_id')

workflow = bigflow.Workflow(

Take a closer look at BeamJob class usage. Because the use_docker_image flag is set to True, the job object uses the Docker image from your project as the execution environment for the job. You can also specify some other Docker image, by setting the use_docker_image as a string that specifies the image address.

The second thing is bf_ml.BaseModel class paired with bf_ml.ApplyModel transform. These 2 classes form the BigFlow API for predictions. The BaseModel class has 2 methods you should override.

The load_model method should load and prepare the model before prediction. It takes the single path argument, it can be pretty much anything, for example, the path to a GCS object. That method should return a ready-to-use model.

The predict method takes the single x: pd.Dataframe argument, which represents a batch of records for prediction. To get the model defined in the load_model method, you should use the ensure_model method. The x: pd.Dataframe the argument should be a batch of records for prediction. You can return the prediction result in any form.

The BaseModel lazily loads a model – on the first ensure_model method call.

There is also a handy method for downloading models to the local file system from GCS bf_io.download_file_to_localfs.

Running the prediction process

To run the process, you first need to build the image and deploy it to Google Artifact Registry, so Dataflow can use it to run a job.

bf build-image
bf deploy-image

Then, you can run the job:

bf run --workflow fmnist

If you make some changes to the code, the BigFlow artifact version changes, so you need to rebuild and redeploy the image (because when you set use_docker_image=True, the BeamJob class infers image version from the BigFlow artifact version). To avoid rebuilding and redeploying the image, you can just set the use_docker_image parameter to a specific image version, for example, user_docker_image='eu.gcr/fashionmnist/python3-ubuntu:3.7-bionic'.