In [1]:
!pip install apache_beam



In [2]:
import apache_beam as beam

## **Side Inputs**:

•	A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection.

•	In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. 


In [3]:
p1 = beam.Pipeline()

input_list = list()
with open ('students_exclude.txt','r') as exclude_file:
  for stud_id in exclude_file:
    input_list.append(stud_id.rstrip())

print(input_list)

['1', '3', '7', '9']


In [4]:
class SplitRow(beam.DoFn):
  def process(self,element,input_list):
    customer = element.split(',')
    if customer[0] not in input_list:
      return [customer]

customers = (
    p1
    |beam.io.ReadFromText('Students_age.txt')
    |beam.ParDo(SplitRow(),input_list)  #can pass any number of side inputs in this ParDo function
    |beam.io.WriteToText('data/output')
)
p1.run()





<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f9d040a5d10>

In [5]:
!{('head -n 10 data/output-00000-of-00001')}

['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']
['10', 'sai', 'ny', '29']


## **Side Outputs/Additional Outputs:**



*   Additional outputs in parDo transformation

*   While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. 



In [9]:
p1 = beam.Pipeline()

side_list = list()
with open ('students_exclude.txt','r') as exclude_file:
  for cust_id in exclude_file:
    side_list.append(cust_id.rstrip())

print(side_list)

class SplitRow(beam.DoFn):
  def process(self,element,side_list):
    customer = element.split(',')
    if customer[0] not in side_list:
      return [customer]

class ProcessCustomers(beam.DoFn):
  def process(self,element,country,start_char):
    if(element[2]==country):
      yield  element
    else:
      yield  beam.pvalue.TaggedOutput('Other_student',element)
    if(element[1].startswith('r')):
       yield  beam.pvalue.TaggedOutput('Names_r',element)
  


customers = (
    p1
    |beam.io.ReadFromText('Students_age.txt')
    |beam.ParDo(SplitRow(),side_list)
    |beam.ParDo(ProcessCustomers(),'chn','r').with_outputs('Names_r','Other_student',main='Chennai_Cust')
)

chennai_customers = customers.Chennai_Cust
other_cities_customers = customers.Other_student
customer_withname_r = customers.Names_r

chennai_customers | 'Write Chennai Students PCollection' >> beam.io.WriteToText("chennai")
other_cities_customers  | 'Write Students PCollection that lives in other cities' >> beam.io.WriteToText("students_other_cities")
customer_withname_r  | 'Write Students names with r PCollection' >> beam.io.WriteToText("customers_names_r")


p1.run()


['1', '3', '7', '9']




<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7f9d03dfa9d0>

In [10]:
! cat chennai-00000-of-00001

['10', 'sai', 'chn', '29']


In [11]:
!cat students_other_cities-00000-of-00001

['2', 'farooqui', 'hyd', '26']
['4', 'neethu', 'mla', '27', '']
['5', 'joey', 'ny', '57']
['6', 'ross', 'la', '60']
['8', 'lois', 'us', '50']


In [12]:
!cat customers_names_r-00000-of-00001

['6', 'ross', 'la', '60']
