# Sample 

In [5]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline

In [6]:
p1 = apache_beam.Pipeline()

In [8]:
cubs = ( p1
           | "Create Element" >> Create(range(10))
           | "Find Cube" >> Map(lambda x: x*3)
           | "Print" >> Map(print)
       )

In [9]:
p1.run()

0
3
6
9
12
15
18
21
24
27


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0f521270>

In [12]:
p1 = apache_beam.Pipeline()
cubs = ( p1
           | "Create Element" >> Create(["Hello", "World"])
           | "Uppercase" >> Map(lambda x: x.upper())
           | "Print" >> Map(print)
       )
p1.run()

3
6
9
HELLO
WORLD
0
3
6
9
12
15
18
21
24
27


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0ed91270>

# Create

In [92]:
p1 = apache_beam.Pipeline()
cubs = ( p1
           | "Create Element" >> Create([1,2,3])
           | "Find Cube" >> Map(lambda x: x*3)
           | "Print" >> Map(print)
       )
p1.run()

3
6
9


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0a442ce0>

In [93]:
p1 = apache_beam.Pipeline()
cubs = ( p1
           | "Create Element" >> Create({"foo": "Bar", "hello": "world"})
           | "Key of the dict" >> Map(lambda x: x[1])
           | "Print" >> Map(print)
       )
p1.run()

Bar
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0af2fc40>

In [23]:
# multiple p collections to another p collection
import apache_beam as beam
from apache_beam import Create, Map, Pipeline

p1 = apache_beam.Pipeline()

even = {2, 4, 6, 8}
odd  = {1, 3, 5, 7}

even_p1  = p1 | "Even No" >> beam.Create(even)
odd_p1   = p1 | "Odd No" >> beam.Create(odd)

flat_out = (even_p1 , odd_p1) | beam.Flatten() | beam.Map(print)

p1.run()

8
2
4
6
1
3
5
7


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0f0d15a0>

# Flattern

In [30]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline

def find_cube(x):
    return x**3

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create([1,2,3])
           | "Find Cube" >> Map(find_cube)
           | "Print" >> Map(print)
       )
p1.run()

1
8
27


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c923760>

# FlatMap

In [31]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline,FlatMap

def find_cube(x):
    return x**3

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(["foo bar", "hello world"])
           | "string split" >> FlatMap(str.split)
           | "Print" >> Map(print)
       )
p1.run()

foo
bar
hello
world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0cc80940>

# Filter

In [33]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(range(10))
           | "Even Filter" >> Filter(lambda x: x%2==0)
           | "Print" >> Map(print)
       )

p1.run()

0
2
4
6
8


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0cc2e950>

In [34]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(["hello", "world", "hey"])
           | "Even Filter" >> Filter(lambda x: x.startswith("h"))
           | "Print" >> Map(print)
       )

p1.run()

hello
hey


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c91ebc0>

# ParDo

In [39]:
# Same like flatmap
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, ParDo, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(["foo bar", "hello world"])
           | " Filter" >> Filter(lambda x: x.startswith("f"))
           | "string split" >> ParDo(str.split)
           | "Print" >> Map(print)
       )
p1.run()

foo
bar


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c681a80>

# Keys, Values, String

In [47]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, ParDo, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create({"foo": "Bar", "hello": "world"})
           | "Filter Keys" >> beam.Keys()
           | "Print" >> Map(print)
       )
p1.run()

foo
hello


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c5bd210>

In [44]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, ParDo, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create({"foo": "Bar"})
           | "Filter Keys" >> beam.Values()
           | "Print" >> Map(print)
       )
p1.run()

Bar


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c68cf10>

In [49]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, ParDo, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create({"foo": "Bar", "hello": "world"})
           | "Strings Display" >> beam.ToString.Kvs()
           | "Print" >> Map(print)
       )
p1.run()

foo,Bar
hello,world


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c4cf460>

In [53]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, ParDo, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create({"foo": "Bar" , "hello": "world"})
           | "Strings Display" >> beam.ToString.Element()
           | "Print" >> Map(print)
       )
p1.run()

('foo', 'Bar')
('hello', 'world')


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c522380>

# KwSwap

In [56]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create({"foo": "Bar" , "hello": "world"})
           | "Kv Swap" >> beam.KvSwap()
           | "Print" >> Map(print)
       )

p1.run()

('Bar', 'foo')
('world', 'hello')


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0c368ac0>

# Partition

In [58]:
%pip install sympy

Note: you may need to restart the kernel to use updated packages.


In [67]:
import apache_beam as beam
import sympy
from apache_beam import Create, Map, Pipeline

p1 = apache_beam.Pipeline()

def is_prime_function(element, no_par):
    return  1 if sympy.isprime(element) else 0

is_prime = ( p1
              | "Create Element" >> Create(range(1,20))
              | "Partitioning" >> beam.Partition(is_prime_function, 2)
              #| "Print" >> Map(print)
           )

#is_prime[1] | "Print Prime Number" >> Map(print)

is_prime[0] | "Print non Prime Number" >> Map(print)

p1.run()

1
4
6
8
9
10
12
14
15
16
18


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b2f7b20>

# Regular Expression

In [70]:
import apache_beam as beam
from apache_beam import Create, Map, Pipeline, ParDo, Filter

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(["1", "23", "hello", "world"])
           | "Regex" >> beam.Regex.matches("\d{2}")
           | "Print" >> Map(print)
       )
p1.run()

23


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b273e80>

# Aggregation

In [74]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(["1", "23", "hello", "world"])
           | "Regex" >> beam.combiners.Count.Globally()
           | "Print" >> Map(print)
       )

p1.run()

4


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b270a60>

In [77]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create({"one":[1,3], "two":[2,4]})
           | "Regex" >> beam.CombineValues(sum)
           | "Print" >> Map(print)
       )

p1.run()

('one', 4)
('two', 6)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b1d53c0>

In [78]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create([1,2,3,2,3,4,1])
           | "Regex" >> beam.Distinct()
           | "Print" >> Map(print)
       )

p1.run()

1
2
3
4


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b0ef1c0>

In [82]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(range(1,20))
           | "Regex" >> beam.combiners.Sample.FixedSizeGlobally(5)
           | "Print" >> Map(print)
       )

p1.run()

[7, 2, 8, 13, 12]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b06b7c0>

In [85]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(range(1,20))
           | "Sum" >> beam.CombineGlobally(sum)
           | "Print" >> Map(print)
       )

p1.run()

190


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0ae85600>

In [84]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(range(1,20))
           | "Min" >> beam.CombineGlobally(min)
           | "Print" >> Map(print)
       )

p1.run()

1


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0b237160>

In [87]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create([("foo",3),("foo",4),("bar",5)])
           | "Mean PerKey" >> beam.combiners.Mean.PerKey()
           | "Print" >> Map(print)
       )

p1.run()

('foo', 3.5)
('bar', 5.0)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0af96200>

In [88]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create(range(1,20))
           | "Min" >> beam.combiners.Top.Largest(2)
           | "Print" >> Map(print)
       )

p1.run()

[19, 18]


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0af2f940>

In [90]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create([("foo",3),("foo",4),("bar",5)])
           | "Mean PerKey" >> beam.combiners.Top.LargestPerKey(1)
           | "Print" >> Map(print)
       )

p1.run()

('foo', [4])
('bar', [5])


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0a4abfd0>

In [91]:
import apache_beam as beam
from apache_beam import Create, Pipeline

p1 = apache_beam.Pipeline()

cubs = ( p1
           | "Create Element" >> Create([("foo",3),("foo",4),("bar",5)])
           | "Sum PerKey" >> beam.CombinePerKey(sum)
           | "Print" >> Map(print)
       )

p1.run()

('foo', 7)
('bar', 5)


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f4b0ae871f0>