<a href="https://colab.research.google.com/github/xuanhao44/beam_learn/blob/main/learn_beam_transforms_by_doing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the "License")

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Learn Beam PTransforms

After this notebook, you should be able to:
1. Use user-defined functions in your `PTransforms`
2. Learn Beam SDK composite transforms
3. Create you own composite transforms to simplify your `Pipeline`

For basic Beam `PTransforms`, please check out [this Notebook](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb).

Beam Python SDK also provides [a list of built-in transforms](https://beam.apache.org/documentation/transforms/python/overview/).


## How To Approach This Tutorial

This tutorial is designed for someone who likes to learn by doing. There will be code cells where you can write your own code to test your understanding.

As such, to get the most out of this tutorial, we strongly recommend typing code by hand as you’re working through the tutorial and not using copy/paste. This will help you develop muscle memory and a stronger understanding.

### Prerequisites

We'll assume you have familiarity with Python or Pandas. It is highly recommended to finish [this beginner tutorial](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/get-started/learn_beam_basics_by_doing.ipynb) first. 

To begin, run the cell below to install and import Apache Beam.

In [2]:
# Run a shell command and import beam
%pip install --quiet apache-beam
import apache_beam as beam
beam.__version__

In [None]:
# Set the logging level to reduce verbose information
import logging

logging.root.setLevel(logging.ERROR)



---



---



## 1. Simple User-Defined Function (UDF)

Some `PTransforms` allow you to run your own functions and user-defined code to specify how your transform is applied. 
For example, the below [`CombineGlobally`](https://beam.apache.org/documentation/transforms/python/aggregation/combineglobally/) transform 
defines a custom `bounded_sum` function to aggregate the elements,

注意到 `bound=5000` 的参数是在 `bounded_sum` 的外面的，居然是这种写法。

In [None]:
pc = [1, 10, 100, 1000]

# User-defined function
def bounded_sum(values, bound=500):
  return min(sum(values), bound)

small_sum = pc | beam.CombineGlobally(bounded_sum)  # [500]
large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000)  # [1111]

print(small_sum, large_sum)

## 2. Transforms: ParDo and Combine

A `ParDo` transform considers each element in the input `PCollection`, performs your user code to process each element, and emits zero, one, or multiple elements to an output `PCollection`. `Combine` is another Beam transform for combining collections of elements or values in your data.
Both allow flexible UDFs to define how you process the data.

`ParDo` 转换考虑输入 `PCollection` 中的每个元素，执行用户代码来处理每个元素，并向输出 `PCollection` 发出零、一个或多个元素。`Combine` 是另一个 `Beam` 转换，用于组合数据中的元素或值集合。两者都允许灵活的 UDF 定义如何处理数据。

### 2.1 DoFn

DoFn - a Beam Python class that defines a distributed processing function (used in [ParDo](https://beam.apache.org/documentation/programming-guide/#pardo))

就这个例子而言，似乎不需要写的如此复杂，比如把 ParDo 那一行换成 `beam.Map(lambda num: num*5)` 也行。

In [None]:
data = [1, 2, 3, 4]

# create a DoFn to multiply each element by five
# you can define the processing code under `process`
# which is required for a DoFn
class MultiplyByFive(beam.DoFn):
  def process(self, element):
    yield element*5

with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create values' >> beam.Create(data)
      | 'Multiply by 5' >> beam.ParDo(MultiplyByFive())
  )

  outputs | beam.Map(print)

简要介绍 yield。

yield 是 Python 中的一个关键字，用于定义生成器函数。生成器函数是一种可以像迭代器一样使用的函数，但是不会一次性把所有的值都存储在内存中，而是在需要的时候动态地生成值。yield 语句类似于 return 语句，但是不会终止函数的执行，而是暂停函数的执行，并返回一个值给调用者。当函数再次被调用时，它会从上次 yield 的地方继续执行。

### 2.2 CombineFn

CombineFn - define associative and commutative aggregations (used in [Combine](https://beam.apache.org/documentation/programming-guide/#combine))

下面的例子是一个累乘器。`ProductFn` 由于继承 `CombineFn`，故需要完成四个方法：

- `create_accumulator`
- `add_input`
- `merge_accumulators`
- `extract_output`

In [None]:
data = [1, 2, 3, 4]

# create a CombineFn to get the product of each element
# you need to provide four operations
class ProductFn(beam.CombineFn):
  def create_accumulator(self):
    # creates a new accumulator to store the initial value
    return 1

  def add_input(self, current_prod, input):
    # adds an input element to an accumulator
    return current_prod*input

  def merge_accumulators(self, accumulators):
    # merge several accumulators into a single accumulator
    prod = 1
    for accu in accumulators:
      prod *= accu
    return prod

  def extract_output(self, prod):
    # performs the final computation
    return prod

with beam.Pipeline() as pipeline:
  outputs = (
      pipeline
      | 'Create values' >> beam.Create(data)
      | 'Multiply by 2' >> beam.CombineGlobally(ProductFn())
  )
  outputs | beam.LogElements()


Note: The above `DoFn` and `CombineFn` examples are for demonstration purposes. You could easily achieve the same functionality by using the simple function illustrated in section 1.

确实如此！



---



## 3. Composite Transforms 复合变换

Now that you've learned the basic `PTransforms`, Beam allows you to simplify the process of processing and transforming your data through [Composite Transforms](https://beam.apache.org/documentation/programming-guide/#composite-transforms).

Composite transforms can nest multiple transforms into a single composite transform, making your code easier to understand.

现在您已经学习了基本的 `PTransforms`，Beam 允许您通过 Composite Transforms（复合变换） 简化处理和转换数据的过程。

复合转换可以将多个转换嵌套到单个复合转换中，从而使代码更易于理解。

To see an example of this, let's take a look at how we can improve the `Pipeline` we built to count each word in Shakespeare's *King Lear*.

In [None]:
!mkdir -p data
!gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/

In [None]:
import re

# Function used to run and display the result
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  word_count = (
      pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
        | 'Group and sum' >> beam.CombinePerKey(sum)
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))

Although the code above is a viable way to design your `Pipeline`, you can see that we use multiple transforms to perform one process:
1. `FlatMap` is used to find words in each line
2. `Map` is used to create key-value pairs with each word where the value is 1
3. `CombinePerKey` is used so that we can then group by each word and count up the sums

All of these `PTransforms`, in combination, are meant to count each word in *King Lear*. You can simplify the process and combine these three transforms into one by using composite transforms.

There's two ways you can follow:
1. Using Beam SDK's built-in composite transforms
2. Creating your own composite transforms

虽然上面的代码是一个可行的方式来设计你的管道，你可以看到，我们使用多个转换来执行一个过程：

1. `FlatMap` 用于查找每一行中的单词
2. `Map` 用于为每个值为 1 的单词创建键值对
3. 使用 `CombinePerKey`，这样我们就可以按每个单词分组并计算总和

所有这些 `PTransforms` 组合起来，就是用来计算《李尔王》中的每个单词。您可以简化这个过程，并通过使用复合转换将这三个转换合并为一个。

你可以遵循以下两种方法：

1. 使用 Beam SDK 的内置复合变换
2. 创建您自己的复合转换

个人理解：很显然，首先积极的使用内置复合变换是聪明的办法。比如 `CombinePerKey(sum)` 就是比较复杂的过程，找到相同的键并聚合，最后进行累加。

### 3.1 Beam SDK Composite Transforms
Beam allows combining a sequence of transforms into a composite transform. 
Many of the Beam's handy pre-written transforms are composite transforms under the hood. 
In this tutorial, we will cover one example of how to create a composite transform. 
However, to see other composite transforms you can use, 
check out the following API reference pages: [Beam Transforms Package](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.html), [Beam ML Package](https://beam.apache.org/releases/pydoc/current/apache_beam.ml).

Beam 允许将一系列转换组合成一个复合转换。Beam 的许多方便的预先编写的变换都是底层的复合变换。在本教程中，我们将介绍一个如何创建复合转换的示例。但是，要查看您可以使用的其他复合转换，请查看 API 参考页面。

By using a Beam SDK composite transform, you're able to easily combine multiple transforms into one line.

通过使用 Beam SDK 复合转换，您可以轻松地将多个转换组合成一行。

For this tutorial, we will use the SDK-provided [`Count` transform](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.combiners.html#apache_beam.transforms.combiners.Count), which counts each element in the `PCollection`.

在本教程中，我们将使用 SDK 提供的 `Count` 转换，该转换对 `PCollection` 中的每个元素进行计数。


```
beam.combiners.Count.PerElement()
```



This `Count` transform performs the work that both the `Map` and `CombinePerKey` transforms from our Word Count `Pipeline` but do it in one line.

这个 `Count` 转换执行 `Map` 和 `CombinePerKey` 从 Word Count Pipeline 转换的工作，但是在一行中完成。

Edit the Word Count `Pipeline` below to use a composite transform by implementing Beam's `Count` transform (see above). Applying a composite transform is just like applying a `PTransform` to your `PCollection`.

Below the code cell you will edit is a hidden answer code cell to check your work. If you're stuck, try opening the hint first!

In [None]:
#@title Open code to show the hint

#Hint: Replace the `Map` and `CombinePerKey` transforms with Beam's `Count` transform (see above)*italicized text*

In [None]:
#@title EDIT THIS CODE CELL TO USE beam.combiners.Count.PerElement
# EDIT THIS CODE CELL

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/userans'

with beam.Pipeline() as pipeline:
  word_count = (
      pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Find words' >>
        beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
        | 'Group and sum' >> beam.CombinePerKey(sum)
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# After you're done, check to see if your code outputs
# the same PCollection by uncommenting the code below
'''
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))
'''

Below is our answer to check your work. It is the Word Count example from above, but they now combine `Map` and `CombinePerKey` into one line using the `Count` composite transform.

In [None]:
#@title Answer
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part2'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  word_count = (
      pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Find words' >>
        beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        # Count composite transform from Beam SDK
        | 'Count words' >> beam.combiners.Count.PerElement()
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))

> Summary: Applying a composite transform is just like applying a `PTransform` to your `PCollection`, but it simplifies the process by combining multiple `PTransforms` in one line.

### 3.2 Creating Your Own Composite Transform

We simplified the original code using a Beam SDK composite transform, but we can simplify it further by creating our own composite transform function.

我们使用 Beam SDK 复合转换简化了原始代码，但是我们可以通过创建自己的复合转换函数进一步简化它。

Below is an example of a composite transform you can create that the Beam SDK does not cover. The function combines the `Count` composite transform you implemented above, as well as the `FlatMap` transform that converts lines of texts into individual words.

下面是一个复合转换的示例，您可以创建 Beam SDK 不涵盖的复合转换。该函数结合了上面实现的 `Count` 复合转换，以及将文本行转换为单个单词的 `FlatMap` 转换。

Note that because `Count` is itself a composite transform, `CountWords` is also a nested composite transform.

注意，因为 Count 本身是一个复合转换，所以 CountWords 也是一个嵌套复合转换。

In [None]:
# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
  )

You can then use this `CountWords` composite transform in your `Pipeline`, making your pipeline more visually easy to parse through.

然后，您可以在您的 `Pipeline` 中使用这个 `CountWords` 复合转换，使您的管道更容易在视觉上进行解析。

Try editing the Word Count `Pipeline` below to incoporate this transform into the pipeline.

尝试编辑下面的字数统计管道，以将此转换合并到管道中。

Below the code cell you will edit is a hidden answer code cell to check your work. If you're stuck, try opening the hint first!

In [None]:
#@title Open code to show the hint

#Hint: The newly defined transform combines the Count and FlatMap transform.
#Replace the `FlatMap` and `Count` transforms with CountWords() (see above)*italicized text*

In [None]:
#@title EDIT THIS CODE CELL TO USE YOUR `CountWords`
# EDIT THIS CODE CELL

inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part3'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  word_count = (
      pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        | 'Count words' >> beam.combiners.Count.PerElement()
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

pipeline.run()
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))

In [None]:
#@title Answer
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part3'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  word_count = (
      pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        # The composite transform function you created above
        | 'Count Words' >> CountWords()
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))

### 3.3 Creating Your Own Composite Transform With `PTransform` Directly

To create your own composite transform, create a subclass of the `PTransform` class and override the `expand` method to specify the actual processing logic
([more details](https://beam.apache.org/documentation/programming-guide/#composite-transform-creation)).

要创建您自己的复合转换，请创建 `PTransform` 类的一个子类，并覆盖 `expand` 方法来指定实际的处理逻辑(更多细节)。

For example, if we wanted to create our own composite transform that counted the length of each word.

例如，如果我们想要创建自己的复合转换来计算每个单词的长度。

The following code sample shows how to declare a `PTransform` that accepts a `PCollection` of Strings for input, and outputs a `PCollection` of integers
to show the string lengths.

下面的代码示例展示了如何声明一个 `PTransform`，该 `PTransform` 接受字符串的 `PCollection` 作为输入，并输出整数的 `PCollection` 来显示字符串长度。

In [None]:
class ComputeWordLengths(beam.PTransform):
  def expand(self, pcoll):
    # Transform logic goes here.
    return pcoll | beam.Map(lambda x: (x, len(x)))

Within the above `PTransform` subclass, you’ll need to override the `expand` method. The `expand` method is where you add the processing logic for the `PTransform`. 
Your override of `expand` must accept the appropriate type of input `PCollection` as a parameter, and specify the output `PCollection` as the return value.

As long as you override the `expand` method in your `PTransform` subclass to accept the appropriate input `PCollection`(s) and 
return the corresponding output `PCollection`(s), you can include as many transforms as you want. 
These transforms can include core transforms (`ParDo`), composite transforms, or the transforms included in the Beam SDK libraries.

Your composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.

Note: The `expand` method of a `PTransform` is not meant to be invoked directly by the user of a transform. 
Instead, you should call the apply method on the PCollection itself, with the transform as an argument. 
This allows transforms to be nested within the structure of your pipeline.

在上面的 `PTransform` 子类中，您需要重写 `expand` 方法。`expand` 方法是为 `PTransform` 添加处理逻辑的地方。您对 `expand` 的重写必须接受适当类型的输入 `PCollection` 作为参数，并指定输出 `PCollection` 作为返回值。

只要您在 `PTransform` 子类中重写 `expand` 方法以接受适当的输入 `PCollection` 并返回相应的输出 `PCollection`，您就可以包含任意多的转换。这些转换可以包括核心转换(`ParDo`)、复合转换或 Beam SDK 库中包含的转换。

复合转换的参数和返回值必须匹配整个转换的初始输入类型和最终返回类型，即使转换的中间数据多次更改类型也是如此。

注意：`PTransform` 的 `expand` 方法不能由转换的用户直接调用。相反，您应该调用 `PCollection` 本身的 `apply` 方法，并将转换作为参数。这允许将转换嵌套在管道的结构中。

In [None]:
# quickly test it works
["KING", "OF"] | ComputeWordLengths()

In [None]:
#@title Click to check how to use your composite transform to build the pipeline

# put this into the Beam pipeline to compute the length of each word
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part33'

# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
  word_count = (
      pipeline
        | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
        | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
        | 'Count Word Lengths' >> ComputeWordLengths()
        | 'Write results' >> beam.io.WriteToText(outputs_prefix)
  )

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 {}-00000-of-*'.format(outputs_prefix))

## Final Reading

The PTransform Style Guide contains additional information not included here, such as style guidelines, logging and testing guidance, and language-specific considerations. The guide is a useful starting point when you want to write new composite PTransforms.

PTransform 风格指南包含这里没有包含的其他信息，比如风格指南、日志记录和测试指南，以及特定于语言的注意事项。当您想要编写新的复合 PTransforms 时，该指南是一个有用的起点。

https://beam.apache.org/contribute/ptransform-style-guide/