In [None]:
# in MacOS Catalina
!pip install pyarrow==0.13.0 apache-beam

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

print("Apache Beam Version: {}".format(beam.__version__))

Apache Beam Version: 2.16.0


# A Simple Example: WordCount

In [21]:
with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | 'Create' >> beam.Create(['hello world', 'apache beam', '! Nice beam !'])
    counts = (
        lines
        | 'Split' >> beam.FlatMap(lambda x: x.split(' '))
        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
        | 'GroupAndSum' >> beam.CombinePerKey(sum)
    )
    counts | 'Print' >> beam.ParDo(lambda w: print("{}: {}".format(w[0], w[1])))  # w is a tuple ('word': count)

hello: 1
world: 1
apache: 1
beam: 2
!: 2
Nice: 1


In Apache Beam, three components are the keys to complete the whole job, these are `Pipeline`, `PCollection` and `Transform`.
* Pipeline is used to construct the flow of processing data. It is also a DAG (Directed acyclic graph). The idea in MapReduce is `Job`.
* PCollection is a type of data structure. We can edit or manipulate them. The similar idea in Spark is `RDD`.
* Transform is progress transforming from one PCollection to another.

They are shown below.
```python
[Output PCollection] = [Input PCollection] | [Label] >> [Transform]
```

`|` is a symbol to indicate adding a new transform.

# Other Connectors: file

In [22]:
with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | 'ReadFromFile' >> beam.io.ReadFromText("/Users/jiankaiwang/Desktop/apache_beam/Dengue_Daily.csv")
    lines | "WriteToFile" >> beam.io.WriteToText("/Users/jiankaiwang/Desktop/apache_beam/", file_name_suffix='.csv')

# Callable, DoFn, ParDo

`ParDo` requires a callable function.

## Example.1

In [39]:
class SplitFn(beam.DoFn):
    def process(self, element):
        """Necessary implementations."""
        
        # only return a element whose index is 6
        # notice the return value's type is a list [value]
        return [element.split(",")[6]]

In [40]:
with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | 'ReadFromFile' >> beam.io.ReadFromText("/Users/jiankaiwang/Desktop/apache_beam/Dengue_Daily_small.csv")
    extract = (
        lines 
        | "SplitCols" >> beam.ParDo(SplitFn())
        | "PairWithValue" >> beam.Map(lambda x: (x, 1))
        | "GroupAndSum" >> beam.CombinePerKey(sum)
    )
    extract | "WriteToFile" >> beam.io.WriteToText("/Users/jiankaiwang/Desktop/apache_beam/", file_name_suffix='.csv')



## Example.2

In [41]:
class SplitFnAndMap(beam.DoFn):
    def process(self, element):
        """Necessary implementations."""
        
        # only return a element whose index is 6
        # notice the return value's type is a tuple (key, value)
        yield (element.split(",")[6], 1)

In [43]:
with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | 'ReadFromFile' >> beam.io.ReadFromText("/Users/jiankaiwang/Desktop/apache_beam/Dengue_Daily_small.csv")
    extract = (
        lines 
        | "SplitColsAndMap" >> beam.ParDo(SplitFnAndMap())
        | "GroupAndSum" >> beam.CombinePerKey(sum)
    )
    extract | "WriteToFile" >> beam.io.WriteToText("/Users/jiankaiwang/Desktop/apache_beam/", file_name_suffix='.csv')

## TODO
http://shzhangji.com/cnblogs/2017/09/13/apache-beam-quick-start-with-python/