In [2]:
import apache_beam as beam 

In [3]:
with beam.Pipeline() as pipeline: 
    pass 

In [5]:
# PIPELINE OPTIONS => USE THIS TO CONFIGURE DIFFERENT ASPECTS OF YOUR PIPELINE 
# IE as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner.
from apache_beam.options.pipeline_options import PipelineOptions 
beam_options = PipelineOptions() 

In [9]:
# CREATING CUSTOM OPTIONS 
# FIRST NEED TO UNDERSTAND WHAT PYTHON DECORATORS ARE 
# STATIC VS DYNAMIC METHODS 

# STATIC METHODS => CAN BE EXECUTED THROUGH THE CLASS INSTEAD OF WHEN THE CLASS HAS BEEN INSTANTIATED OR OBJECT CAN BE CREATED FROM THE CLASS 

# WITHOUT STATIC METHOD => no attributes so method run directly without the class being instantiated 
class Formula:
    def pow(self, x, y):
        return x ** y 

m = Formula()
print(m.pow(3, 3))

27


In [10]:
# WITH A STATIC METHOD 
class Formula: 
    @staticmethod 
    def pow(x, y):
        return x ** y 

print(Formula.pow(3, 3))

27


In [15]:
# ANOTHER EXAMPLE 
class Greeting: 
    def display(self, message):
        print(message) 
message = Greeting() 
print(message.display('HelloWorld')) 

HelloWorld
None


In [18]:
class Greeting: 
    @staticmethod
    def display(message):
        print(message) 
print(Greeting.display('Hello World')) 

Hello World
None


In [41]:
# WORKING WITH CLASS METHODS 
# CLASS METHODS => STATIC METHODS THAT ACCESS THE INSTANCE ATTRIBUTES & ALSO CAN BE CALLED WITHOUT INSTANTIATING THE CLASS 
class Greeting: 
    message = "Hello"
    @classmethod
    def display(mes, finalMessage):
        print(mes.message + finalMessage) 

print(Greeting.display(' World'))




Hello World
None


In [42]:
# CREATING CUSTOM OPTIONS IN ADDITION TO THE STANDARD PIPELINE OPTIONS 
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
    @classmethod 
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input', required=True)
        parser.add_argument('--output', required=True)

In [44]:
# SAME AS ABOVE BUT HAS DEFAULT DATAFLOW LINK AND HELP TEXT 
# this allows pipeline to accept input and output as command line arguments 
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
    @classmethod 
    def _add_argparse_args(cls, parser):
        parser.add_argument(
        '--input',
        default= 'gs://dataflow-samples/repairs/repairsfeedback.txt',
        help='file path for the input text to process')
        parser.add_argument(
            '--output', 
            required=True, 
            help='The path prefix for the output files'
        )

In [45]:
# PCOLLECTION => DISRTIBUTED COLLECTION OF DATA 
# BEAM TRANSFORMS USE PCOLLECTIONS AS INPUTS AND OUTPUTS 
# IF YOU WANT TO WORK WITH DATA IN YOUR PIPELINE 
# IT MUST COME FROM A PCOLLECTION 

In [None]:
# READING FROM AN EXTERNAL SOURCE 
# HAVE TO PROVIDE THE LINK 
import apache_beam as beam 

lines = pipeline | 'Read the file' >> beam.io.ReadFromText('gs://dataflow-samples/repairs/repairsfeedback.txt') 

# DO NOT RUN THIS FILE
# JUST READ THE CODE 

In [None]:
# CREATE A PCOLLECTION FROM AN IN MEMORY DATA 
import apache_beam as beam 

with beam.Pipeline() as pipeline:
    lines = (
        pipeline 
        | beam.Create([
            'This is true', 
            'To be, or not to be: that is the question: ',
          "Whether 'tis nobler in the mind to suffer ",
          'The slings and arrows of outrageous fortune, ',
          'Or to take arms against a sea of troubles, ',
        ]))

# NOTE: DO NOT RUN THIS AS IT REQUIRES AN OUTPUT TO WORK 

In [52]:
# PCOLLECTION FEATURES 
# PCOLLECTION BE ALWAYS OWNED BY A SPECIFIC PIPELINE 
# MULTIPLE PIPELINES NO FIT SHARE PCOLLECTIONS 

In [53]:
# ELEMENT TYPE 
# PCOLLECTIONS FIT BE OF ANY TYPE 
# BUT IF WE DEY DO DISTRIBUTED PROCESSING, BEAM FOR ENCODE THE ELEMENT TYPE AS A BYTE STRING 
# SO SAY ELEMENTS FIT BE PASSED AROUND TO DISTRIBUTED WORKERS 

In [54]:
# ELEMENT SCHEMA => JSON, AVRO, DB RECORDS 
# SCHEMA DEY HELP MAKE WE GET MORE EXPRESSIVE AGGREGATES 
# WHAT BE AGGREGATES ?? 

In [55]:
# PCOLLECTIONS BE IMMUTABLE 
# YOU NOT FIT CHANGE, ADD OR REMOVE ELEMENTS IF YOU CREATE AM 
# BEAM TRANSFORM JUST DEY MODIFY EACH ELEMENT DN GENERATE NEW PIPELINE 
# NO DEY MEAN SAY E GO MODIFY THE ORIGINAL PCOLLECTION 

In [56]:
# PCOLLECTION FIT BE ANY SIZE 
# FIT BE BOUNDED OR UNBOUNDED => EITHER THE SIZE WE KNOW OR WE DONT KNOW 

In [57]:
# ELEMENT TIME STAMPS 
# EACH ELEMENT FOR PCOLLECTION GET SOME ASSOCIATED TIME STAMP 
# THIS BE ASSIGNED BY THE SOURCE WEY CREATE THE PCOLLECTION 
# SAY IF PIPELINE DEY READ TWEETS, EACH ELEMENT FIT USE THE TIME THE PERSON TWEET 

In [59]:
# TRANSFORMS 
# WHAT WE WANT TURN THE DATA INTO 
# WE GO GO OVER THIS FOR THE EXPERIMENT SECTION 
# PIPELINE PROCESS DEY LOOK LIKE THIS 
# DATABASE TABLE -> READ TRANSFORM -> (PCOLLECTION) -> TRANSFORM -> (PCOLLECTION) -> WRITE TRANSFORM -> DATABASE TABLE

In [60]:
# PCOLLECTION IS IMMUTABLE BY DEFINITION BUT 
# U FIT APPLY DIFFERENT TRANSFORMS FOR THE SAME PCOLLECTION TO CREATE A BRANCING PIPELINE LIKE SO 
# DATABASE TABLE -> READ DATABASE OF NAMES -> PARDO(EXTRACT STRINGS STARTING WITH A)-> A NAMES 
# DATABASE TABLE -> READ DATABASE OF NAMES -> PARDO(EXTRACT STRINGS STARTING WITH B)-> B NAMES

In [61]:
# MAIN CORE BEAM TRANSFORMS 
# PARDO => PROCESS ELEMENTS IN PARALLEL 
# GROUP BY KEY => GROUP ELEMENTS BY KEY 
# CO GROUP BY KEY => GROUP ELEMENTS BY KEY AND JOIN THEM 
# COMBINE => AGGREGATE ELEMENTS
# FLATTEN => EXPAND A COLLECTION OF COLLECTIONS INTO A SINGLE COLLECTION
# PARTITION => DIVIDE A COLLECTION INTO A NUMBER OF PARTITIONS 

In [None]:
# PARDO 
# FILTERING DATASET 
# TYPE CONVERTING EACH ELEMENT IN THE DATASET 
# EXTRACTING PARTS OF EACH ELEMENT IN THE DATASET 
# PERFORMING COMPUTATIONS ON EACH ELEMENT IN THE DATASET 

words = ... # input pcollection of strings 

# do function to perform on each element in the input pcollection
class ComputeLength(beam.DoFn):
    def process(self, element):
        return [len(element)] 

# apply the do function to the input pcollection 
word_length = words | beam.ParDo(ComputeLength()) 

In [62]:
# WHAT BE DO FUNCTION 
# THEN DEY DEFINE THE PIPLELINE EXTRACT PROCESSING TASKS 
# THE CODE FOR FULFIL THESE TWO REQUIREMENTS 
# MAKE SURE WE DEY FULFUL THIS BEFORE WE USE THE DO FUNCTIONS 
# FUNCTION OBJECT FOR BE SERLIALISABLE 
# FUNCTION OBJECT FOR THE THREAD-COMPATIBLE AND FOR KNOW SAY BEAM SDKs NO BE THREAD SAFE 
# ALSO MAKE THE FUNCTIONS IDEMPOTENT

In [None]:
# IF THE FUNCTIONS BE STRAIGHFOWARD 
# WE FIT PROVIDE LIGHT WEIGHT DO FUNCTIONS ONE LINERS 
word_length = words | beam.ParDo(lambda x: [len(x)]) 

In [63]:
# DO FUNCTION LIFECYCLE 
# DO DEEP INTO SERLLIALISATION AND DESERIALISATION

In [64]:
# GROUP BY KEY AND UNBOUNDED PCOLLECTIONS 


# U FOR USE NON-GLOBAL KEY WINDOWING OR AGGREGATION TRIGGER TO PERFORM GROUPBYKEY OR COGROUPBYKEY 
# UNBOUNDED COLLECTIONS 
# WINDOWING OR TRIGGERS FOR ALLOW GROUPING TO OPERATE ON LOGICAL FINITE BUNDLES OF DATA WITHIN THE 
# UNBOUNDED DATA STREAMS 

# IF WE NO USE AM E GO THROW THIS ERROR =>  IllegalStateException error 

In [65]:
# COGROUPBY KEY => JOIN KEY VALUE PAIRS WEY BE THE SAME TYPE 


In [66]:
# COMBINE => COMBINING ELEMENTS OR VALUES IN PCOLLECTIOONS 
pc = [1, 10, 100, 1000] 

def boundedsum(values, bounds=500):
    return min(sum(values), bounds) 

small_sum = pc | beam.CombineGlobally(boundedsum)
large_sum = pc | beam.CombineGlobally(boundedsum, bounds=10000)

usage: ipykernel_launcher.py [-h] [--runner RUNNER] [--streaming]
                             [--resource_hint RESOURCE_HINTS]
                             [--beam_services BEAM_SERVICES]
                             [--type_check_strictness {ALL_REQUIRED,DEFAULT_TO_ANY}]
                             [--type_check_additional TYPE_CHECK_ADDITIONAL]
                             [--no_pipeline_type_check] [--runtime_type_check]
                             [--performance_runtime_type_check]
                             [--allow_non_deterministic_key_coders]
                             [--allow_unsafe_triggers]
                             [--no_direct_runner_use_stacked_bundle]
                             [--direct_runner_bundle_repeat DIRECT_RUNNER_BUNDLE_REPEAT]
                             [--direct_num_workers DIRECT_NUM_WORKERS]
                             [--direct_running_mode {in_memory,multi_threading,multi_processing}]
                             [--direct_embed_docker_pyth

SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [None]:
# ADVANCED COMBINATIONS USING COMBINEFN 
# NB: NEED TO UNDERSTAND COMBINERS WELL 
# NOTE: ALL COMBINERS SHOULD HAVE A MORE SOPHISTICATED ACCUMULATOR 


# FOR MORE COMPLEX FUNCTIONS, DEFINE A SUBCLASS OF COMBINEFN 
# YOU SHOULD USE A COMBINE FN IF FUNCTION REQUIRES A MORE SOPHISTICATED ACCUMULATOR 

# GENERAL COMBINING OPERATION CONSISTS OF FOUR STEPS 
# 1. CREATE AN ACCUMULATOR 2. ADD INPUT 3. MERGE ACCUMULATORS 4. EXTRACT OUTPUT 

pc = ... 

class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0) 

    def add_input(self, sum_count, input):
        (sum, count) = sum_count 
        return sum + input, count + 1 

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators) 
        return sum(sums), sum(counts) 

    def extract_output(self, sum_count):
        (sum, count) = sum_count 
        return sum / count

In [None]:
# COMBINING ALL PCOLLCTIONS INTO A SINGLE VALUE 

pc = ... 

average = pc | beam.CombineGlobally(AverageFn()) 

In [68]:
# COMBINE AND GLOBAL WINDOWING ?? COME BACK TO THIS PART 
# what is global windowing in the first place ?? 
# => If your input PCollection uses the default global windowing, the default behavior is to return a
#  PCollection containing one item.

In [None]:
# COMBINE VALUES in a keyed PCOLLECTION ?? COME BACK TO THIS PART TOO  
player_accuracies = ...

averageaccuracyperplayer = (
    player_accuracies 
    | beam.CombinePerKey(beam.combiners.MeanCombineFn())
)


In [None]:
# FLATTEN => MERGE MULTIPLE PCOLLECTIONS INTO A SINGLE PCOLLECTION 
merged = (
    (pcoll1, pcoll2, pcoll3) 
    | beam.Flatten() 
)

In [None]:
# DATA ENCODING IN MERGED COLLECTIONS 
#  the coder for the output PCollection is the same as the coder for the first PCollection 
# in the input PCollectionList.

# MERGING WINDOWED COLLECTIONS
# When using Flatten to merge PCollection objects that have a windowing strategy applied, all of the 
# PCollection objects you want to merge must use a compatible windowing strategy and window sizing.
#  For example, all the collections you’re merging must all use (hypothetically) identical 5-minute 
# fixed windows or 4-minute sliding windows starting every 30 seconds.
# If your pipeline attempts to use Flatten to merge PCollection objects with incompatible windows, 
# Beam generates an IllegalStateException error when your pipeline is constructed.

# PARTITION 
# Partition is a Beam transform for PCollection objects that store the same data type. 
# Partition splits a single PCollection into a fixed number of smaller collections.
# Partition divides the elements of a PCollection according to a partitioning function that you provide.
# The partitioning function contains the logic that determines how to split up the elements of the 
# input PCollection into each resulting partition PCollection.

def partition_fn(student, num_partitions):
    return int(get_percenitile(student) * num_partitions / 100) 
by_decile = students | beam.Partition(partition_fn, 10)
fortieth_percentile = by_decile[4] 

In [None]:
# REQUIREMENTS FOR WRITING USER CODE FOR BEAM TRANSFORMS 

# Your function object must be able to be serialized, so it can be sent over the network to different machines.
# Your function object must be thread-compatible, meaning it should be able to run 
# in parallel on multiple threads without any issues.
# It is recommended that your function object be idempotent, meaning that it produces 
# the same result no matter how many times it is run. Non-idempotent functions can still be used with Beam, 
# but they require additional consideration to ensure correct behavior when there are external side effects.

# These requirements apply to subclasses of DoFn (a function used with the ParDo transform), 
# CombineFn (a function used with the Combine transform), 
# and WindowFn (a function used with the Window transform).

In [None]:
# SERIALIZABILITY 
# In order to use a function object with a Beam transform, it must be fully serializable. 
# This means that it can be converted into a format that can be transmitted over 
# the network to a remote worker. 

# The base classes for user code, such as DoFn, CombineFn, and WindowFn, are already set up to be serializable, 
# but your subclass must not contain any non-serializable members.

# FEW OTHER THINGS TO KEEP IN MIND 
# Transient fields in your function object will not be transmitted to worker instances, 
# because they are not automatically serialized.

# You should avoid loading a large amount of data into a field before serialization, as this can impact performance.

# Individual instances of your function object cannot share data.

# Mutating a function object after it has been applied will not have any effect, 
# as the worker instances operate independently.

# It is important to consider these factors when designing your function object, 
# as non-serializable functions will not be able to be transmitted to the worker instances and 
# will cause your pipeline to fail.


In [None]:
# THREAD COMPATIBILITY 
# Your function object should be thread-compatible. Each instance of your function object is accessed by a 
# single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, 
# that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must 
# provide your own synchronization.


In [None]:
# IDEMPOTENCY 
# It’s recommended that you make your function object idempotent–that is, that it can be repeated 
# or retried as often as necessary without causing unintended side effects.
# Non-idempotent functions are supported, however the Beam model provides no guarantees as to the 
# number of times your user code might be invoked or retried;



In [None]:
# SIDE INPUTS 
# Side inputs are an additional source of data that can be accessed by a ParDo transform while
# processing elements in the main input PCollection. 
# They are useful when your ParDo needs to use additional data that is determined at runtime and 
# not hard-coded, such as values that depend on the input data or another branch of the pipeline. 
# Side inputs allow you to provide this additional data to your ParDo transform in the form of 
# a view that can be read while processing each element.

In [None]:
# PASSING SIDE INPUTS TO PARDO 
def filter_using_length(word, lower_bound, upper_bound=float('inf')):
    if lower_bound <= len(word) <= upper_bound:
        yield word 

# construct a deferred side input 
avg_word_len = (
    words 
    | beam.Map(len) 
    | beam.CombinedGlobally(beam.combiners.MeanCombineFn()) 

    # call with explicit side input 
    small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3) 

    # A single deferred side input 
    larger_than_average = (
        words | 'large' >> beam.FlatMap(filter_using_length, lower_bound = pvalue.AsSingleton(avg_word_len)) 
    )

    # Mix and Match 
    small_but_nontrvial = words | beam.FlatMap(
        filter_using_length, 
        lower_bound = 2, 
        upper_bound = pvalue.AsSingleton(avg_word_len) 
    )

    # We can also pass side inputs to a Pardo transform, which will get passed to its process method 
    # The first two arguments for the process method will be self amd element 
    class FilterUsingLength(beam.DoFn):
        def process(self, element, lower_bound, upper_bound=float('inf')):
            if lower_bound <= len(element) <= upper_bound:
                yield element 
    small_words = words | beam.ParDo(filterUsingLength(), 0, 3)
)



In [None]:
# SIDE INPUTS AND WINDOWING 
# A window is a way to divide up a stream of data into chunks, based on some criteria. For example, 
# you could create windows of data based on fixed time intervals (e.g. every 10 minutes), or based 
# on the data itself (e.g. all data belonging to the same user).

# SIDE INPUT, is a way to provide additional data to a Beam pipeline that is needed for processing the main input data.
#  This additional data is typically needed for a limited time, and is not part of the main input stream.

# When you use a side input in a Beam pipeline, the main input and side input may have different windowing. 
# In this case, Beam projects the main input window onto the side input window set, and selects the most 
# appropriate side input value to use.
# For example, if the main input is windowed by fixed-time intervals of one minute, and the side input is windowed by 
# fixed-time intervals of one hour, Beam will select the side input value from the appropriate hour-long window.

# If the main input element exists in more than one window, then the processElement function 
# will be called multiple times, once for each window. In this case, each call to processElement 
# will project the current window for the main input element, and may provide a different 
# view of the side input each time.

# If the side input has multiple trigger firings (i.e. it has been updated multiple times), 
# Beam will use the value from the latest trigger firing. This is useful if you are using 
# a side input with a single global window and a trigger, as it ensures that you are always
#  using the most up-to-date side input value.


In [None]:
# ADDITIONAL OUTPUTS 
# While ParDo always produces a main output PCollection (as the return value from apply),
# you can also have your ParDo produce any number of additional output PCollections. 
# If you choose to have multiple outputs, your ParDo returns all of the output PCollections (including the main output) 
# bundled together

# To emit elements to multiple output PCollections, invoke with_outputs() on the ParDo, and specify the
# expected tags for the outputs. with_outputs() returns a DoOutputsTuple object. Tags specified in
# with_outputs are attributes on the returned DoOutputsTuple object. The tags give access to the
# corresponding output PCollections.

results = (
    words 
    | beam.ParDo(ProcessWords(), cutoff_length=2, marker='X').with_outputs(
        'above cutoff lenghts', 
        'marked strings', 
        main='below_cutoff_strings'
    )
)

below = results.below_cutoff_strings 
above = results.above_cutoff_lengths 
marked = results['marked strings']

below, above, marked = (
    words 
    | beam.ParDo(ProcessWords(), cutoff_length=2, marker='X').with_outputs('above_cutoff_lengths',
                                      'marked strings',
                                      main='below_cutoff_strings')
)

In [None]:
# EMITTING MULTIPLE OUTPUTS IN YOUR DOFN 
# Inside your ParDo's DoFn, you can emit an element to a specific output by wrapping the value and the output tag (str).
# using the pvalue.OutputValue wrapper class.
# Based on the previous example, this shows the DoFn emitting to the main output and two additional outputs
class ProcessWords(beam.DoFn):
    def process(self, element, cutoff_length, marker): 
        if len(element) <= cutoff_length: 
            yield element 
        else: 
            yield pvalue.TaggedOutput('above_cutoff_lengths', len(element))
        if element.startswith(marker):
            yield pvalue.TaggedOutput('marked strings', element)

# Producing multiple products is also available in Map and FlatMap 
# Here is an example that uses FlatMap and shows that the tags do not need to be specified ahead of time.
def even_odd(x):
    yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x)
    if x % 10 == 0:
        yield x 

results = numbers | beam.FlatMap(even_odd).with_outputs() 
evens = result.even 
odds = result.odd 
tens = results[None] 

In [None]:
# ACCESSING ADDITIONAL PARAMETERS IN YOUR DOFNs 
# In addition to the element, Beam will populate other parameters to your DoFn’s process method.
# Timestamp: To access the timestamp of an input element, add a keyword parameter default to DoFn.TimestampParam
class ProcessRecord(beam.FoFn):
    def process(self, element, timestamp=beam.DoFn, TimestampParam):
        # access timestamp of element 
        pass 

In [None]:
# Window => To access the window an input element falls into, add a keyword parameter default to DoFn.WindowParam.
# => If an element falls in multiple windows (for example, this will happen when using SlidingWindows), 
# then the process method will be invoked multiple time for the element, once for each window.
class ProcessRecord(beam.DoFn):
    def process(self, element, window=beam.DoFn.windowParam):
        # access window 
        pass 

In [None]:
# PaneInfo 
# => When triggers are used, Beam provides a DoFn.PaneInfoParam object that contains information about the current firing.
# => Using DoFn.PaneInfoParam you can determine whether this is an early or a late firing, 
# and how many times this window has already fired for this key.

class ProcessRecord(beam.DoFn):
  def process(self, element, pane_info=beam.DoFn.PaneInfoParam):
     # access pane info, e.g. pane_info.is_first, pane_info.is_last, pane_info.timing
     pass

In [None]:
# TIMER AND STATE 
# => In addition to aforementioned parameters, user defined Timer and State parameters can be used in a stateful DoFn.
class StatefulDoFn(beam.DoFn):
  """An example stateful DoFn with state and timer"""

  BUFFER_STATE_1 = BagStateSpec('buffer1', beam.BytesCoder())
  BUFFER_STATE_2 = BagStateSpec('buffer2', beam.VarIntCoder())
  WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK)

  def process(self,
              element,
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):

    # Do your processing here
    key, value = element
    # Read all the data from buffer1
    all_values_in_buffer_1 = [x for x in buffer_1.read()]

    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
        # clear the buffer data if required conditions are met.
        buffer_1.clear()

    # add the value to buffer 2
    buffer_2.add(value)

    if StatefulDoFn._all_condition_met():
      # Clear the timer if certain condition met and you don't want to trigger
      # the callback method.
      watermark_timer.clear()

    yield element

  @on_timer(WATERMARK_TIMER)
  def on_expiry_1(self,
                  timestamp=beam.DoFn.TimestampParam,
                  window=beam.DoFn.WindowParam,
                  key=beam.DoFn.KeyParam,
                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
    # Window and key parameters are really useful especially for debugging issues.
    yield 'expired1'

  @staticmethod
  def _all_condition_met():
      # some logic
      return True

  @staticmethod
  def _is_clear_buffer_1_required(buffer_1_data):
      # Some business logic
      return True


In [None]:
# COMPOSITE TRANSFORMS 
# => Transforms can have a nested structure, where a complex transform performs multiple simpler transforms 
# (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms).
# =>  These transforms are called composite transforms. Nesting multiple transforms inside a single 
# composite transform can make your code more modular and easier to understand.
# => The Beam SDK comes packed with many useful composite transforms.

In [None]:
# EXAMPLE OF COMPOSITE TRANSFORMS 
# The CountWords transform in the WordCount example program is an example of a composite transform. 
# CountWords is a PTransform subclass that consists of multiple nested transforms.

# In its expand method, the CountWords transform applies the following transform operations:
# 1. It applies a ParDo on the input PCollection of text lines, producing an output PCollection of individual words.
# 2. It applies the Beam SDK library transform Count on the PCollection of words, producing a PCollection of key/value pairs.
# Each key represents a word in the text, and each value represents the number of times that word appeared in the original data.
# Note: Because Count is itself a composite transform, CountWords is also a nested composite transform.
@beam.ptransform_fn 
def CountWords(pcoll):
    return (
        pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))
    )

In [None]:
# Creating a composite transform 
# To create your own composite transform, create a subclass of the PTransform class and override the
# expand method to specify the actual processing logic 
# You can then use this transform just as you would a built-in transform from the Beam SDK
# The following code sample shows how to declare a PTransform that accepts a PCollection of Strings for input, 
# and outputs a PCollection of Integers:

class ComputeWordLengths(beam.PTransform):
    def expand(self, pcoll):
        return pcoll | beam.Map(lambda x: len(x)) 

In [None]:
# Within your PTransform subclass, you’ll need to override the expand method. 
# The expand method is where you add the processing logic for the PTransform. 
# Your override of expand must accept the appropriate type of input PCollection 
# as a parameter, and specify the output PCollection as the return value.

# The following code sample shows how to override expand for the ComputeWordLengths class declared 
class ComputeWordlengths(beam.Ptransform):
    def expand(self, pcoll):
        return pcoll | beam.Map(lambda x: len(x)) 


# As long as you override the expand method in your PTransform subclass to accept the 
# appropriate input PCollection(s) and return the corresponding output PCollection(s), 
# you can include as many transforms as you want
# These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libraries.

# Your composite transform’s parameters and return value must match the initial input type and 
# final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.

# Note: The expand method of a PTransform is not meant to be invoked directly by the user of a transform. 
#  Instead, you should call the apply method on the PCollection itself, with the transform as an argument. 
#  This allows transforms to be nested within the structure of your pipeline.




In [2]:
# PTRANSFORM STYLE GUIDE 
# The PTransform Style Guide contains additional information not included here, such as 
# style guidelines, logging and testing guidance, 
# and language-specific considerations.
# The guide is a useful starting point when you want to write new composite PTransforms.



In [None]:
# PIPELINE I/O 
# When you create a pipeline, you often need to read data from some external source, 
# such as a file or a database. Likewise, you may want your pipeline to output 
# its result data to an external storage system. Beam provides read and write transforms 
# for a number of common data storage types.


In [None]:
# READING INPUT FROM DATA 
lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')

In [None]:
# WRITING OUTPUT TO DATA 
output | beam.io.WriteToText('gs://some/outputData')

In [None]:
# LINK TO ALL I/O CONNECTORS 
https://beam.apache.org/documentation/io/connectors/

In [None]:
# WRITING MULTIPLE OUTPUT FILES 
filtered_words | 'WriteToText' >> beam.io.WriteToText(
    '/path/to/numbers', file_name_suffix='.csv')

In [None]:
# SCHEMAS 
# Often, the types of the records being processed have an obvious structure.
#  Common Beam sources produce JSON, Avro, Protocol Buffer, or database row objects; 
# all of these types have well defined structures, structures
#  that can often be determined by examining the type.

In [None]:
# WHAT IS SCHEMA 
# Purchase

# Field Name	Field Type
# userId	STRING
# itemId	INT64
# shippingAddress	ROW(ShippingAddress)
# cost	INT64
# transactions	ARRAY[ROW(Transaction)]

In [None]:
# SCHEMAS FOR PYTHON 
import typing 

class Purchase(typing.NamedTuple):
    userId: str
    itemId: int
    shippingAddress: 'ShippingAddress'
    cost: int
    transactions: typing.List['Transaction']

In [None]:
# CONTINUE DOCUMENTATION 
# BEGINNER UDEMY 
# APACHE BEAM DOCS => WORD COUNT, MOBILE GAMING, 
# APACHE BEAM IN JAVA TUTORIAL 