<a href="https://colab.research.google.com/github/lowat/Pipelines/blob/main/Apache_Beam_Test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# First Beam Pipeline


In [None]:
# Ensure apache beam is installed
# !pip3 install apache-beam

In [3]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1 | "Create Element" >> Create(range(10))| "Find Cube" >> Map(lambda x: x**3) | "Print" >> Map(print))
p1.run()

0
1
8
27
64
125
216
343
512
729


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da85dcb8b0>

In [4]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1 | "Create Element" >> Create([100, 200, 300])| "Find Cube" >> Map(lambda x: x**3) | "Print" >> Map(print))
p1.run()

1000000
8000000
27000000


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79dad021d7e0>

In [5]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1 | "Create Element" >> Create(["cat", "dog"])| "Find Cube" >> Map(lambda x: x.upper()) | "Print" >> Map(print))
p1.run()

CAT
DOG


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da85f850f0>

In [9]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1 | "Create Element" >> Create({"foo":"bar", "hello":"world"})| "Find Cube" >> Map(lambda item: f"'{item[1]}' is the value for the key '{item[0]}'") | "Print" >> Map(print))
p1.run()

'bar' is the value for the key 'foo'
'world' is the value for the key 'hello'


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7ff02d10>

In [13]:
import apache_beam as beam
from apache_beam import Create, Map

def findCube(element):
  return element ** 3

p1 = beam.Pipeline()
result = (p1 | "Create Element" >> Create([100, 200, 300])| "Find Cube" >> Map(findCube) | "Print" >> Map(print))
p1.run()

1000000
8000000
27000000


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da85d90fa0>

## Flatten

In [12]:
import apache_beam as beam

p1 = beam.Pipeline()
even = {2,4,6,8}
odd = {1,3,5,7,9}

even_p1 = p1 | "Create Even PCollection" >> beam.Create(even)
odd_p1 = p1 | "Create Odd PCollection" >> beam.Create(odd)

flat_out = (even_p1, odd_p1) | beam.Flatten() | beam.Map(print)
p1.run()

1
3
5
7
9
8
2
4
6


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d876980>

In [15]:
import apache_beam as beam
from apache_beam import Create, Map, FlatMap

p1 = beam.Pipeline()
result = (p1 | "Create Element" >> Create(["foo bar", "hello world"])| "String Split" >> FlatMap(str.split) | "Print" >> Map(print))
p1.run()

foo
bar
hello
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7da53160>

## Filter

In [16]:
import apache_beam as beam
from apache_beam import Create, Map, Filter

p1 = beam.Pipeline()
filter_even = (p1 | "Create Element" >> Create(range(20))| "Filter Even" >> Filter(lambda x: x%2==0) | "Print" >> Map(print))
p1.run()

0
2
4
6
8
10
12
14
16
18


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d60da80>

In [19]:
import apache_beam as beam
from apache_beam import Create, Map, Filter

p1 = beam.Pipeline()
filter_even = (p1 | "Create Element" >> Create(["Hello", "World", "Hey"])| "Filter H" >> Filter(lambda x: x.startswith("H")) | "Print" >> Map(print))
p1.run()

Hello
Hey


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d80c940>

## ParDo

In [21]:
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(["foo bar", "hello world"])
          | "Filter H" >> Filter(lambda x: x.startswith("f"))
          | "String Split" >> ParDo(str.split)
          | "Print" >> Map(print))
p1.run()

foo
bar


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d652050>

## Keys, Values, String

In [22]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1
            | "Create Element" >> Create({"foo":"bar", "hello":"world"})
            | "Find Keys" >> beam.Keys()
            | "Print" >> Map(print))
p1.run()

foo
hello


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d6680a0>

In [23]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1
            | "Create Element" >> Create({"foo":"bar", "hello":"world"})
            | "Find Values" >> beam.Values()
            | "Print" >> Map(print))
p1.run()

bar
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d82dcc0>

In [25]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1
            | "Create Element" >> Create({"foo":"bar", "hello":"world"})
            #| "String Display" >> beam.ToString.Kvs()
            | "String Display" >> beam.ToString.Element()
            | "Print" >> Map(print))
p1.run()

('foo', 'bar')
('hello', 'world')


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d668e80>

## KVSwap

In [26]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
findCube = (p1
            | "Create Element" >> Create({"foo":"bar", "hello":"world"})
            | "KV swap" >> beam.KvSwap()
            | "Print" >> Map(print))
p1.run()

('bar', 'foo')
('world', 'hello')


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d547fa0>

## Partition

In [35]:
import apache_beam as beam
from apache_beam import Create,Map
p1 = beam.Pipeline()

def isEven(num, no_par):
  return 1 if num % 2 == 0 else 0

is_even = (p1
           | "Create Element" >> Create(range(1, 20))
           | "Partition" >> beam.Partition(isEven, 2)
          )

is_even[1] | "Print Even Numbers" >> Map(print)
p1.run()

2
4
6
8
10
12
14
16
18


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da861e7010>

## Regular Expression

In [36]:
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(["1", "23","hello","world"])
          | "Regex" >> beam.Regex.matches("(\d+)")
          | "Print" >> Map(print))
p1.run()

1
23


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d80e2f0>

## Aggregation

In [37]:
import apache_beam as beam
from apache_beam import Create, Map, ParDo, Filter

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(["1", "23","hello","world"])
          | "Count" >> beam.combiners.Count.Globally()
          | "Print" >> Map(print))
p1.run()

4


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d2b5090>

In [39]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(["1", "1","hello","world"])
          | "Distinct" >> beam.Distinct()
          | "Print" >> Map(print))
p1.run()

1
hello
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d1ee710>

In [42]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(range(20))
          | "Fixed Size" >> beam.combiners.Sample.FixedSizeGlobally(5)
          | "Print" >> Map(print))
p1.run()

[13, 3, 5, 16, 1]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7cd18340>

In [43]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(range(20))
          | "Sum" >> beam.CombineGlobally(sum)
          | "Print" >> Map(print))
p1.run()

190


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da85d904c0>

In [44]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create(range(20))
          | "Top 2" >> beam.combiners.Top.Largest(2)
          | "Print" >> Map(print))
p1.run()

[19, 18]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7cceb8b0>

In [45]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create([("hello", 1),("hello",4),("bye",7)])
          | "Mean per key" >> beam.combiners.Mean.PerKey()
          | "Print" >> Map(print))
p1.run()

('hello', 2.5)
('bye', 7.0)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7d1d7ac0>

In [46]:
import apache_beam as beam
from apache_beam import Create, Map

p1 = beam.Pipeline()
result = (p1
          | "Create Element" >> Create([("hello", 1),("hello",4),("bye",7)])
          | "Sum per key" >> beam.CombinePerKey(sum)
          | "Print" >> Map(print))
p1.run()

('hello', 5)
('bye', 7)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x79da7cdfe110>