**Advanced_Windows_and_Triggers**
- reference link: https://github.com/griscz/beam-college/blob/main/day2/B1_Beam_College_Advanced_Windows_and_Triggers_a_practical_guide_v0_9_0.ipynb
- content: window and trigger concept in practice

In [1]:

# Lets set up our imports 
import argparse
import datetime
import time
from typing import Dict, Text, Any, Tuple, List, Iterable


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.typehints.typehints import Optional
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream

In [2]:
# In this cell we are going to run a simple pipeline that computes the Count of
# a list of values, which have the key 'A'. 

our_values = [('A', 1.05), ('A', 1.02), ('A', 1.03), ('A', 1.04), ('A', 1.06), ('A', 1.07)]

with beam.Pipeline() as p:
  _ = (p | beam.Create(our_values) 
         | beam.transforms.combiners.Count.PerKey()
         | beam.Map(lambda x : f'Count is {x[1]}')
         | beam.Map(print)
  )



Count is 6


**Accessing Beam's time metadata**

In [3]:
our_values = [('A', 1.05), ('A', 1.02), ('A', 1.03), ('A', 1.04), ('A', 1.06), ('A', 1.07)]


# This DoFn makes use of TimestampParam to access the elements timestamp
# We also make use of WindowParam to access the window that the element belongs to 
class GetElementTimestamp(beam.DoFn):
  def __init__(self, print_pane_info: bool = False):
    self.print_pane_info = print_pane_info
  
  def process(self, element: Any, 
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              pane_info=beam.DoFn.PaneInfoParam
                )-> Iterable[Tuple[str,str]]:

    timestamp_str = ""
    try:
        timestamp_str = timestamp.to_utc_datetime()
    except:
        timestamp_str = timestamp
    
    window_str = ""

    if window == beam.window.GlobalWindow():
      window_str = "The Global Window"
    else: 
      window_str = f'Window ends at {window.max_timestamp().to_utc_datetime()}'

    if self.print_pane_info:
      yield (window_str , f'The value : {element} has timestamp {timestamp_str} with Pane {pane_info}')
    else: 
      yield (str({window_str}), f'The value : {element} has timestamp {timestamp_str}')

class PrettyPrint(beam.PTransform):
  """ Pretty print all values in a pcollection sorted on the first value in the tuple """
  def expand(self, pcoll):
    return ( pcoll | beam.combiners.ToList().without_defaults() 
                   | beam.Map(lambda x : sorted(x, key=lambda tup: tup[0]))
                   | beam.Map(lambda x : print(*x, sep = "\n") )
    )

In [4]:

our_values = [('A', 1.05), ('A', 1.02), ('A', 1.03), ('A', 1.04), ('A', 1.06), ('A', 1.07)]

with beam.Pipeline() as p:
  _ = (p | beam.Create(our_values) 
         | beam.ParDo(GetElementTimestamp())
         | PrettyPrint()
  )

("{'The Global Window'}", "The value : ('A', 1.05) has timestamp Timestamp(-9223372036854.775000)")
("{'The Global Window'}", "The value : ('A', 1.02) has timestamp Timestamp(-9223372036854.775000)")
("{'The Global Window'}", "The value : ('A', 1.03) has timestamp Timestamp(-9223372036854.775000)")
("{'The Global Window'}", "The value : ('A', 1.04) has timestamp Timestamp(-9223372036854.775000)")
("{'The Global Window'}", "The value : ('A', 1.06) has timestamp Timestamp(-9223372036854.775000)")
("{'The Global Window'}", "The value : ('A', 1.07) has timestamp Timestamp(-9223372036854.775000)")


**Source attached timestamps**

In [6]:
# Lets look at the GlobalWindow's start and end attributes
global_window = beam.transforms.window.GlobalWindow()
print(f'Global window start {global_window.start} end {global_window.end}')

Global window start Timestamp(-9223372036854.775000) end Timestamp(9223371950454.775000)


In [8]:
import time
format = '%Y/%m/%d %H:%M:%S'
start = time.mktime(time.strptime('2000/01/01 10:00:00', format))

our_values = [('A', 1.05), ('A', 1.02), ('A', 1.04), ('A', 1.06)]

# This will create a Tuple of key, value, timestamp_as_date_str
our_values_with_timestamps = [beam.window.TimestampedValue(k, ( int(start + i))) for k, i in zip( our_values, range(0,len(our_values)))]
# Next we will add a value which is several seconds ahead of the last value shown above, this will be used later
our_values_with_timestamps.append(beam.window.TimestampedValue(('A',1.03), int(start+1)))
our_values_with_timestamps.append(beam.window.TimestampedValue(('A',1.07), int(start+8)))

our_values_with_timestamps.sort(key=lambda x:x.timestamp)

for i in our_values_with_timestamps:
  print(f' Value {i.value} has event timestamp {i.timestamp.to_utc_datetime()}')

 Value ('A', 1.05) has event timestamp 2000-01-01 03:00:00
 Value ('A', 1.02) has event timestamp 2000-01-01 03:00:01
 Value ('A', 1.03) has event timestamp 2000-01-01 03:00:01
 Value ('A', 1.04) has event timestamp 2000-01-01 03:00:02
 Value ('A', 1.06) has event timestamp 2000-01-01 03:00:03
 Value ('A', 1.07) has event timestamp 2000-01-01 03:00:08


In [9]:

with beam.Pipeline() as p:
  _ = (p| beam.Create(our_values_with_timestamps)
        # | beam.Map(lambda x : x) # Work around for typing issue
        | beam.WindowInto(beam.window.FixedWindows(1))
        | beam.ParDo(GetElementTimestamp())
        | PrettyPrint()
  )

("{'Window ends at 2000-01-01 03:00:00.999999'}", "The value : ('A', 1.05) has timestamp 2000-01-01 03:00:00")
("{'Window ends at 2000-01-01 03:00:01.999999'}", "The value : ('A', 1.02) has timestamp 2000-01-01 03:00:01")
("{'Window ends at 2000-01-01 03:00:01.999999'}", "The value : ('A', 1.03) has timestamp 2000-01-01 03:00:01")
("{'Window ends at 2000-01-01 03:00:02.999999'}", "The value : ('A', 1.04) has timestamp 2000-01-01 03:00:02")
("{'Window ends at 2000-01-01 03:00:03.999999'}", "The value : ('A', 1.06) has timestamp 2000-01-01 03:00:03")
("{'Window ends at 2000-01-01 03:00:08.999999'}", "The value : ('A', 1.07) has timestamp 2000-01-01 03:00:08")


In [10]:
#Compare the previous code snipe with this we will see the affact of FixedWindows
with beam.Pipeline() as p:
  _ = (p| beam.Create(our_values_with_timestamps)
         | beam.Map(lambda x : x) # Work around for typing issue
         | beam.WindowInto(beam.window.FixedWindows(1))
         | beam.combiners.Count.PerKey()
         | beam.Map(lambda x : f'Count is {x[1]}')
         | beam.ParDo(GetElementTimestamp())
         | PrettyPrint()
  )

("{'Window ends at 2000-01-01 03:00:00.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:00.999000')
("{'Window ends at 2000-01-01 03:00:01.999999'}", 'The value : Count is 2 has timestamp 2000-01-01 03:00:01.999000')
("{'Window ends at 2000-01-01 03:00:02.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:02.999000')
("{'Window ends at 2000-01-01 03:00:03.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:03.999000')
("{'Window ends at 2000-01-01 03:00:08.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:08.999000')


In [11]:

#compare this code snip with above, we will see the different between when the result give out
    #this change default timestamp_combiner which will give ouput at earliest as possible instead of when window time closed
with beam.Pipeline() as p:
  _ = (p| beam.Create(our_values_with_timestamps)
         | beam.Map(lambda x : x) # Work around for typing issue
         | beam.WindowInto(beam.window.FixedWindows(1),timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EARLIEST)
         | beam.combiners.Count.PerKey()
         | beam.Map(lambda x : f'Count is {x[1]}')
         | beam.ParDo(GetElementTimestamp())
         | PrettyPrint()
  )

("{'Window ends at 2000-01-01 03:00:00.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:00')
("{'Window ends at 2000-01-01 03:00:01.999999'}", 'The value : Count is 2 has timestamp 2000-01-01 03:00:01')
("{'Window ends at 2000-01-01 03:00:02.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:02')
("{'Window ends at 2000-01-01 03:00:03.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:03')
("{'Window ends at 2000-01-01 03:00:08.999999'}", 'The value : Count is 1 has timestamp 2000-01-01 03:00:08')
