In [19]:
"""Task: give 3 outputs
Out 1 - live in NY
Out 2 - not live in NY
Out 3 - name starts with M
"""
import apache_beam as beam

p1 = beam.Pipeline()

customers = [
    [1, 'John', 'NY', 22],
    [2, 'Mary', 'LA', 25],
    [3, 'Maria', 'NY', 27],
    [4, 'Shreya', 'SF', 15],
]

customer_pc = beam.Create(customers)

class ProcessCustomers(beam.DoFn):
    def process(self, element, city, start_char):
        if element[2] == city:
            yield element
        else:
            yield beam.pvalue.TaggedOutput('non_ny_customers', element)
        if element[1].startswith(start_char):
            yield beam.pvalue.TaggedOutput('name_start_m', element)

transformed_customers = (
    p1 
    | customer_pc
    | beam.ParDo(
        ProcessCustomers(), 
        city='NY', 
        start_char='M'
    ).with_outputs(
        'name_start_m', 
        'non_ny_customers', 
        main='ny'
    )
)


ny_customers = transformed_customers.ny
non_ny_customers = transformed_customers.non_ny_customers
customer_startname_m = transformed_customers.name_start_m

ny_customers | 'Write NY customers' >> beam.io.WriteToText('results/side_out_ny')
non_ny_customers | 'Write non NY customers' >> beam.io.WriteToText('results/side_out_non_ny')
customer_startname_m | 'Write customers start name with M' >> beam.io.WriteToText('results/side_out_startname_m')



p1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1334650d0>

In [20]:
!cat results/side_out_ny-00000-of-00001

[1, 'John', 'NY', 22]
[3, 'Maria', 'NY', 27]


In [21]:
!cat results/side_out_non_ny-00000-of-00001

[2, 'Mary', 'LA', 25]
[4, 'Shreya', 'SF', 15]


In [22]:
!cat results/side_out_startname_m-00000-of-00001

[2, 'Mary', 'LA', 25]
[3, 'Maria', 'NY', 27]
