##### Copyright 2022 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# Interactively run pipelines at scale with FlinkRunner on notebook-managed clusters

This notebook demonstrates how to run Beam pipelines with a FlinkRunner hosted on a notebook-managed [Cloud Dataproc](https://cloud.google.com/dataproc) cluster ([Learn more](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development#interactive_flinkrunner_on_notebook-managed_clusters)). The feature enables working with production sized data using thousands of parallel processes from the notebook because workers are distributed to a Google Cloud internal cluster instead of on the single notebook GCE instance itself.

Compared with a single notebook instance as the worker, using this feature:
- With higher capacity, you are unlikely to run into OOM or run out of disk space.
- With higher parallelism, you can inspect the results much faster and have a better interactive experience.

We'll go through 3 examples:
- A modified word count to ease into the configurations needed
- Process tens of millions flight records (~1GB) to find out how many are delayed for each airline
- Classify 50,000 (~280GB) images



## Prerequisites

- The usage of `FlinkRunner` on Cloud Dataproc is supported since Beam v2.40.0.
- The `FlinkRunner` is implicitly hosted on an automatically started and managed Cloud Dataproc cluster based on `GoogleCloudOptions` and `WorkerOptions`. Thus:
  - Cloud Dataproc product/API needs to be enabled
  - The authed user/SA needs to have permissions (`roles/dataproc.admin` or `roles/dataproc.editor`) to manipulate clusters and jobs.

In [None]:
from apache_beam.version import __version__
print(f'Beam version is: {__version__}')
print()

print('Authenticated account is: ')
!gcloud config get account
print()

!gcloud services list | grep -q dataproc && echo "Cloud Dataproc Enabled" || echo "Cloud Dataproc Not enabled"
print()

print('Granted roles are: ')
!gcloud projects get-iam-policy "$(gcloud config get project)" --flatten="bindings[].members" --format="table(bindings.members, bindings.role)" --filter="bindings.members:$(gcloud config get account)"

## Basic Imports

In [None]:
from apache_beam.options.pipeline_options import FlinkRunnerOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.portability.flink_runner import FlinkRunner

import logging
logging.getLogger().setLevel(logging.ERROR)

## Detect the current project
We will use the default project configured in the current environment.

In [None]:
import google.auth
project = google.auth.default()[1]

## Set a distributed cache directory
We can specify a Cloud Storage bucket to cache our pipeline results to as we compute on a distributed cluster.

In [None]:
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
ib.options.cache_root = 'gs://<YOUR-GCS-BUCKET>/flink'

## Create an interactive FlinkRunner and configure pipeline options
Instead of a base `InteractiveRunner` with default `PipelineOptions`, this notebook sets the `underlying_runner` to `FlinkRunner` and configures Google Cloud and Worker specific pipeline options.

In [None]:
# Define an InteractiveRunner that uses the FlinkRunner under the hood.
interactive_flink_runner = InteractiveRunner(underlying_runner=FlinkRunner())

# Set up the Apache Beam pipeline options.
options = PipelineOptions()
options.view_as(GoogleCloudOptions).project = project
# Use cloudpickle to alleviate the burden of staging things in the main module.
options.view_as(SetupOptions).pickle_library = 'cloudpickle'
# As a rule of thumb, the Flink cluster has about vCPU * #TMs = 8 * 40 = 320 slots.
options.view_as(WorkerOptions).machine_type = 'n1-highmem-8'
options.view_as(WorkerOptions).num_workers = 40

## Example 1 - Word Count

The cells below contain a modified version of the sample code provided in the [01-Word_Count](01-Word_Count.ipynb) example. 

In [None]:
import re


class ReadWordsFromText(beam.PTransform):
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))

### Run the Word Count example

The first time running a pipeline with the `FlinkRunner` will take longer than usual because it takes time to start and provision the underlying Cloud Dataproc cluster with workers. Later executions reusing the same cluster will be faster.

In [None]:
p_word_count = beam.Pipeline(interactive_flink_runner, options=options)

counts = (
    p_word_count
    | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
    | 'count' >> beam.combiners.Count.PerElement())

In [None]:
ib.show(counts)

### Inspect the underlying cluster and Flink dashboard
To see more information regarding the pipeline run and the Flink cluster, we can describe a cluster used by a pipeline.

In [None]:
ib.clusters.describe(p_word_count)

## FYI: Reuse an existing cluster managed by Interactive Beam

- By default, Interactive Beam **always reuses** the most recently used cluster to run a pipeline with the `FlinkRunner` if no pipeline options are given.
    - To avoid this behavior, e.g. running another pipeline in the same notebook session with a FlinkRunner not hosted by the notebook, run `ib.clusters.set_default_cluster(None)`.
- When instantiating a new pipeline that uses a project, region and provisioning configuration which map to an existing Dataproc cluster, we will also reuse the cluster (might not be the most recently used though).
- However, whenever a provisioning change (e.g. resizing a cluster) is given, a new cluster will be created to actuate the desired change. Be aware to avoid exhausting  cloud resources by cleaning up unnecessary clusters through `ib.clusters.cleanup(pipeline)` if resizing a cluster is intended.

## Example 2 - Find out how many flights are delayed

The example reads more than 17 million records from a public BigQuery dataset `bigquery-samples.airline_ontime_data.flights` and counts how many flights have been delayed since 2010 for all the airlines.

The data is considered "large" not because of the total size (~1GB) but quantity of rows to read from BigQuery.

This usually takes more than 1 hour for a single worker. Here we explicitly define a higher parallelism (150) to execute it and speed up the process (~4mins) reusing the existing cluster.

### Setup requirements

For the flights example, you need to activate BigQuery service to read data.

In [None]:
!gcloud services enable bigquery.googleapis.com

### Run the flights example

In [None]:
# The parallelism is applied to each step, so if your pipeline has 10 steps, you
# end up having 150 * 10 tasks scheduled that can be executed in parallel by
# the 320 (upper bound) slots/workers/threads theoretically.
options.view_as(FlinkRunnerOptions).parallelism = 150

In [None]:
# The BQ read needs a Cloud Storage bucket as a temporary location.
options.view_as(GoogleCloudOptions).temp_location = ib.options.cache_root
p_bq = beam.Pipeline(runner=interactive_flink_runner, options=options)

delays_by_airline = (
    p_bq
    | 'Read Dataset from BQ' >> beam.io.ReadFromBigQuery(
        project=project, use_standard_sql=True,
        query=('SELECT airline, arrival_delay '
               'FROM `bigquery-samples.airline_ontime_data.flights` '
               'WHERE date >= "2010-01-01"'))
    | 'Rebalance Data to TM Slots' >> beam.Reshuffle(num_buckets=1000)
    | 'Extract Delay Info' >> beam.Map(
        lambda e: (e['airline'], e['arrival_delay'] > 0))
    | 'Filter Delayed' >> beam.Filter(lambda e: e[1])
    | 'Count Delayed Flights Per Airline' >> beam.combiners.Count.PerKey())

In [None]:
# With visualize_data=True, in the rendered facets widget, you can bin airline by counts
# and find out that WN airline has the most delayed flights in the sampled records.
ib.show(delays_by_airline, visualize_data=True)

Similarly, we can display information about the cluster used by this new pipeline. Note that the `master_url` and `dashboard` values are the same as the first pipeline (`p_word_count`). This is because the cluster configuration from pipeline options is still the same. If a different region, number of workers or machine type is used, a new cluster would be created.

In [None]:
ib.clusters.describe(p_bq)

## Example 3 - Computer Vision: run inference to classify images
The RunInference example classifies 50,000 image files (~280GB) from within the notebook.

**Disclaimer**: The example uses the validation image set from ImageNet and the PyTorch pre-trained ImageNetV2 model.
You can download similar dependencies or use your own image dataset and pre-trained model.
Due to restrictions of usage policies and copyrights, we don't have these public datasets hosted on Google Cloud.


This example usually takes half a day for a single worker. Here we further increase the parallelism to speed up (~1min).

### Setup requirements

For the RunInference example, you need to use Cloud Build to build a container image and store it in Container Registry.

In [None]:
# Enable the cloud build service
!gcloud services enable cloudbuild.googleapis.com

# Enable the container registry service
!gcloud services enable containerregistry.googleapis.com

### Build a customer container
Normally, if your pipeline doesn’t require additional Python dependencies or executables, Beam automatically uses its official container images. It comes with many common Python modules and you don’t have to build or explicitly specify it.

For this example, you are going to use a few extra Python dependencies and a pre-trained model, so you have to build it and make it available for the Flink cluster for execution. The advantages of using a custom container are:
Faster setup time for consecutive/interactive executions
Stable configurations/dependencies
More flexibility: you can set up more than Python dependencies

In [None]:
!mkdir -p /home/jupyter/.flink

# IMPORTANT! Adjust to download or copy your model to the directory. The example uses MobileNetV2.
!cp /path/to/your-pre-trained-model /home/jupyter/.flink/mobilenet_v2.pt

Install the extra Python dependencies.

In [None]:
%pip install torch
%pip install torchvision
%pip install pillow
%pip install transformers

Export your dependencies into a requirements file.

- You can either explicitly create a requirements file with the %%writefile notebook magic.
- Or freeze all local dependencies into a requirements file (might introduce unintended deps)

In [None]:
%%writefile /home/jupyter/.flink/requirements.txt
torch
torchvision
pillow
transformers

And create a Dockerfile with the %%writefile notebook magic.
The custom container uses the image of Beam v2.40.0 with Python3.7 as the base, additionally adds a pre-trained MobileNetV2 PyTorch model, and installs the dependencies.

In [None]:
%%writefile /home/jupyter/.flink/Dockerfile
FROM apache/beam_python3.7_sdk:2.40.0

COPY  requirements.txt /tmp/requirements.txt
COPY  mobilenet_v2.pt /tmp/mobilenet_v2.pt
RUN python -m pip install -r /tmp/requirements.txt

Finally, use Cloud Build (do not build the container image on the notebook instance itself) to build the container image and save it to the Container Registry.

In [None]:
!cd /home/jupyter/.flink \
 && gcloud builds submit \
     --tag gcr.io/$(gcloud config get-value project)/flink:latest \
     --timeout=20m

# Use the custom container you just built.
options.view_as(PortableOptions).environment_config = f'gcr.io/{project}/flink'

### Build the pipeline and inspect results

In [None]:
import io
from typing import Iterable
from typing import Optional
from typing import Tuple

import torch
from PIL import Image
from torchvision import models
from torchvision import transforms
from torchvision.models.mobilenetv2 import MobileNetV2

import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
from apache_beam.ml.inference.base import KeyedModelHandler
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor

In [None]:
def filter_empty_text(text: str) -> Iterable[str]:
  if len(text.strip()) > 0:
    yield text

def preprocess_image(data: Image.Image) -> torch.Tensor:
  image_size = (224, 224)
  # Pre-trained PyTorch models expect input images normalized with the
  # below values (see: https://pytorch.org/vision/stable/models.html)
  normalize = transforms.Normalize(
      mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  transform = transforms.Compose([
      transforms.Resize(image_size),
      transforms.ToTensor(),
      normalize,
  ])
  return transform(data)

def read_image(image_file_name: str) -> Tuple[str, torch.Tensor]:
  with FileSystems().open(image_file_name, 'r') as file:
    data = Image.open(io.BytesIO(file.read())).convert('RGB')
    return image_file_name, preprocess_image(data)

class PostProcessor(beam.DoFn):
  def process(self, element: Tuple[str, PredictionResult]) -> Iterable[str]:
    filename, prediction_result = element
    prediction = torch.argmax(prediction_result.inference, dim=0)
    yield str(prediction.item())

In [None]:
# Replace this with a file containing paths to your image files.
image_file_names = 'gs://runinference/it_mobilenetv2_imagenet_validation_inputs.txt'
model_state_dict_path = '/tmp/mobilenet_v2.pt'
model_class = MobileNetV2
model_params = {'num_classes': 1000}

# In this example we pass keyed inputs to RunInference transform.
# Therefore, we use KeyedModelHandler wrapper over PytorchModelHandler.
model_handler = KeyedModelHandler(
  PytorchModelHandlerTensor(
      state_dict_path=model_state_dict_path,
      model_class=model_class,
      model_params=model_params))

In [None]:
p_computer_vision = beam.Pipeline(interactive_flink_runner, options=options)

counts = (
    p_computer_vision
    | 'Read Image File Names' >> beam.io.ReadFromText(
        image_file_names)
    | 'Filter Empty File Names' >> beam.ParDo(filter_empty_text)
    | 'Shuffle Files to Read' >> beam.Reshuffle(num_buckets=900)
    | 'Read Image Data' >> beam.Map(read_image)
    | 'PyTorch Run Inference' >> RunInference(model_handler)
    | 'Process Output' >> beam.ParDo(PostProcessor())
    | 'Count Per Classification' >> beam.combiners.Count.PerElement())

# Further increase the parallelism from the starter example.
options.view_as(FlinkRunnerOptions).parallelism = 300

In [None]:
ib.collect(counts)

### Enrich the data

We can enrich the data with some human-readable labels.

In [None]:
idx_to_label = p_computer_vision | 'A sample class idx to label' >> beam.Create(list({
    '242': 'boxer',
    '243': 'bull mastiff',
    '244': 'Tibetan mastiff',
    '245': 'French bulldog',
    '246': 'Great Dane',
    '247': 'Saint Bernard, St Bernard',
    '248': 'Eskimo dog, husky',
    '249': 'malamute, malemute, Alaskan malamute',
    '250': 'Siberian husky',
    '251': 'dalmatian, coach dog, carriage dog',
    '252': 'affenpinscher, monkey pinscher, monkey dog',
    '253': 'basenji',
    '254': 'pug, pug-dog',
}.items()))

def cross_join(idx_count, idx_labels):
  idx, count = idx_count
  if idx in idx_labels:
    return {'class': idx, 'label': idx_labels[idx], 'count': count}

label_counts = (
    counts
    | 'Enrich with human-readable labels' >> beam.Map(
        cross_join, idx_labels=beam.pvalue.AsDict(idx_to_label))
    | 'Keep only enriched data' >> beam.Filter(lambda x: x is not None))

After an aggregation, the output data size can be tiny compared with the input data.

High parallelism does not help with processing small data and could introduce unnecessary overhead.

We can interactively tune down the parallelism (1) to inspect the result of the newly added transform that only processes a handful of elements.

In [None]:
options.view_as(FlinkRunnerOptions).parallelism = 1
ib.show(label_counts)

`p_computer_vision` reuses the same notebook-managed cluster as the first two pipelines.

In [None]:
ib.clusters.describe(p_computer_vision)

### Cleanup
Once we are done, we can cleanup the clusters. This will delete the created Dataproc cluster, the staging files of the cluster, and all of the mappings pertaining to the cluster.

In [None]:
# Use force=True to clean up all notebook-managed clusters.
ib.clusters.cleanup(force=True)