<a href="https://colab.research.google.com/github/thecodemancer/study-with-me/blob/main/apache-beam/side_inputs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Side Inputs

In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform’s DoFn while processing each element.

Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.

4.4.1. Passing side inputs to ParDo


In [None]:
#!pip install apache-beam

In [None]:
import apache_beam as beam

## 1. beam.pvalue.AsIter

Example 1. This example iterates over elements in a side input PCollection:

In [13]:
def process_with_iter(element, side_input):
  # Iterate through elements in the side input
  for item in side_input:
    # Process the element and side input item
    yield element + item

with beam.Pipeline() as pipeline:
  # Create main PCollection
  main_data = pipeline | 'Create Main' >> beam.Create([1, 2, 3])

  # Create side input PCollection
  side_data = pipeline | 'Create Side' >> beam.Create([4, 5, 6, 7])

  # Apply ParDo with AsIter for side input
  result = main_data | 'Process' >> beam.ParDo(process_with_iter, beam.pvalue.AsIter(side_data))

  # Print the result (might be different due to Beam's non-deterministic nature)
  output = result | 'Print' >> beam.Map(print)

5
6
7
8
6
7
8
9
7
8
9
10


Example 2. This example filters elements in a side input PCollection:

In [14]:
class MyDoFn(beam.DoFn):

    def process(self, element, my_side_input):
        my_list = list(my_side_input)  # access the contents of the side input
        if element in my_list:
            yield element

In [15]:
with beam.Pipeline() as p:
  my_list = p | 'Create side input' >> beam.Create([1, 2, 3, 4, 5])
  output = (p | 'Create PCollection' >> beam.Create([1, 3, 5, 7, 9])
           | 'Filter using side input' >> beam.ParDo(MyDoFn(), beam.pvalue.AsIter(my_list))
           | 'Print Output' >> beam.Map(print))


1
3
5


## 2. beam.pvalue.AsSingleton:

Example 1: This example multiplies each element of the PCollection by the number in the side input.

In [22]:
def process_with_singleton(element, side_input):
  # Access the single element from the side input
  value = side_input
  # Process the element and side input value
  yield element * value

with beam.Pipeline() as pipeline:
  # Create main PCollection
  main_data = pipeline | 'Create Main' >> beam.Create([2, 4])

  # Create side input PCollection with a single element
  side_data = pipeline | 'Create Side' >> beam.Create([3])

  # Apply ParDo with AsSingleton for side input
  result = main_data | 'Process' >> beam.ParDo(process_with_singleton, beam.pvalue.AsSingleton(side_data))

  # Print the result (might be different due to Beam's non-deterministic nature)
  output = result | 'Print' >> beam.Map(print)


6
12


## 3. beam.pvalue.AsDict:

Example 1: This example calculates the sum per each element of a PCollection using the first element of the side input ("key1")

In [24]:
def process_with_dict(element, side_input):
  # Access the value associated with a specific key
  value = side_input["key1"]
  # Process the element and side input value
  yield element + value

with beam.Pipeline() as pipeline:
  # Create main PCollection
  main_data = pipeline | 'Create Main' >> beam.Create([10, 20])

  # Create side input PCollection as key-value pairs
  side_data = pipeline | 'Create Side' >> beam.Create([("key1", 5), ("key2", 3)])

  # Apply ParDo with AsDict for side input
  result = main_data | 'Process' >> beam.ParDo(process_with_dict, beam.pvalue.AsDict(side_data))

  # Print the result (might be different due to Beam's non-deterministic nature)
  output = result | 'Print' >> beam.Map(print)

15
25


Example 2: This example calculates the sum per each element of a PCollection using the first element of the side input ("key1")

In [26]:
def process_with_dict(element, side_input):
  # Access the value associated with a specific key
  value = side_input["key1"]
  # Process the element and side input value
  yield element + value

with beam.Pipeline() as pipeline:
  # Create main PCollection
  main_data = pipeline | 'Create Main' >> beam.Create([10, 20])

  # Create side input PCollection as key-value pairs
  side_data = pipeline | 'Create Side' >> beam.Create({"key1": 5, "key2": 3})

  # Apply ParDo with AsDict for side input
  result = main_data | 'Process' >> beam.ParDo(process_with_dict, beam.pvalue.AsDict(side_data))

  # Print the result (might be different due to Beam's non-deterministic nature)
  output = result | 'Print' >> beam.Map(print)

15
25


## 4. beam.pvalue.AsList:

Example 1: This example does a replace in a given block of text passing a list as a side input.

In [98]:
import apache_beam as beam
import datetime
import json

def process_with_dict(element, side_input):
  example = "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum."
  # Access specific values by key
  output = example.replace("Lorem", str(side_input))
  # Process the element and side input value
  yield output
  #yield element + mtd

# Sample data (modify with your actual data source)
data = [{
    'metric': 'tickets',
    'date': datetime.datetime(2024, 3, 1, 0, 0, tzinfo=datetime.timezone.utc),
    'last_day': 132.0,
    'last_30_days': 82456.0,
    'mtd': 3589.0,
    'mtd_ratio_with_last_year': 191.52,
    'ytd': 170517.0,
    'ytd_ratio_with_last_year': 113.78
  },
  {
    'metric': 'sales',
    'date': datetime.datetime(2024, 3, 2, 0, 0, tzinfo=datetime.timezone.utc),
    'last_day': 43300.99,
    'last_30_days': 33619927.1,
    'mtd': 1428140.41,
    'mtd_ratio_with_last_year': 246.57,
    'ytd': 66144943.69,
    'ytd_ratio_with_last_year': 114.68
  }]

with beam.Pipeline() as pipeline:
  # Create main PCollection
  main_data = pipeline | 'Create Main' >> beam.Create([1, 2, 3, 4, 5])

  # Create side input PCollection as dictionaries
  side_data = (pipeline | 'Create Side' >> beam.Create(data)
                    | 'Format' >> beam.Map(lambda x: ({
                        'metric': x['metric'],
                        'date': x['date'].strftime('%Y-%m-%d %H:%M:%S %Z'),
                        'last_day': x['last_day'],
                        'last_30_days': x['last_day'],
                        'mtd': x['last_day'],
                        'mtd_ratio_with_last_year': x['last_day'],
                        'ytd': x['last_day'],
                        'ytd_ratio_with_last_year': x['last_day']
                        }))
              )
  #output = side_data | 'Print' >> beam.Map(print)
  # Apply ParDo with AsDict for side input
  #result = main_data | 'Process' >> beam.ParDo(process_with_dict, side_data)
  #result = main_data | 'Process' >> beam.ParDo(process_with_dict, beam.pvalue.AsSingleton(side_data))
  #result = main_data | 'Process' >> beam.ParDo(process_with_dict, beam.pvalue.AsIter(side_data))
  #result = main_data | 'Process' >> beam.ParDo(process_with_dict, beam.pvalue.AsDict(side_data))
  result = main_data | 'Process' >> beam.ParDo(process_with_dict, beam.pvalue.AsList(side_data))
  # Print the result (might be different due to Beam's non-deterministic nature)
  output = result | 'Print 2' >> beam.Map(print)


[{'metric': 'tickets', 'date': '2024-03-01 00:00:00 UTC', 'last_day': 132.0, 'last_30_days': 132.0, 'mtd': 132.0, 'mtd_ratio_with_last_year': 132.0, 'ytd': 132.0, 'ytd_ratio_with_last_year': 132.0}, {'metric': 'sales', 'date': '2024-03-02 00:00:00 UTC', 'last_day': 43300.99, 'last_30_days': 43300.99, 'mtd': 43300.99, 'mtd_ratio_with_last_year': 43300.99, 'ytd': 43300.99, 'ytd_ratio_with_last_year': 43300.99}] Ipsum is simply dummy text of the printing and typesetting industry. [{'metric': 'tickets', 'date': '2024-03-01 00:00:00 UTC', 'last_day': 132.0, 'last_30_days': 132.0, 'mtd': 132.0, 'mtd_ratio_with_last_year': 132.0, 'ytd': 132.0, 'ytd_ratio_with_last_year': 132.0}, {'metric': 'sales', 'date': '2024-03-02 00:00:00 UTC', 'last_day': 43300.99, 'last_30_days': 43300.99, 'mtd': 43300.99, 'mtd_ratio_with_last_year': 43300.99, 'ytd': 43300.99, 'ytd_ratio_with_last_year': 43300.99}] Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a ga

---
If you made it this far, follow [David Regalado](https://beacons.ai/davidregalado) for more code!