In [1]:
!pip install --quiet apache-beam
!pip install --quiet apache-beam[interactive]

In [2]:
import apache_beam as beam
import re

In [3]:
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Colab Notebooks/InputFiles/
%ls

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/Colab Notebooks/InputFiles
input1.txt  input2.txt  input3.txt


**Pipeline IO**

This pipeline will lowercase each word in all of the lines of the input text

In [4]:
p = beam.Pipeline()

(p | 'Read from Text' >> beam.io.ReadFromText('input1.txt')
   | 'Lowercase each Words' >> beam.Map(lambda x: x.lower())
   | 'Write to Text' >> beam.io.WriteToText('output-pipeline', file_name_suffix='.txt'))

p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7d0f04806ce0>

**Pardo**

ParDo is a parallel processing operation. This line tokenizes each line from the text files into individual words.


In [5]:
class SplitIntoWords(beam.DoFn):
    def process(self, element):
        return element.split()

p = beam.Pipeline()

(p | 'Read from Text' >> beam.io.ReadFromText('input1.txt')
   | 'Split into Words' >> beam.ParDo(SplitIntoWords())
   | 'Write to Text' >> beam.io.WriteToText('output-pardo', file_name_suffix='.txt'))

p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7d0f032a27d0>

**Composite Transform**

Composite transform is a powerful concept in Apache Beam that allow you to encapsulate a sequence of transformations into a single, reusable transform.

In this example, we use it to tokenize lines into individual words, convert each word into lowercase, and count the occurance of each word.



In [6]:
class LowerTokenizeCountAndStrip(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Split into Words' >> beam.ParDo(SplitIntoWords())
                | 'Lowercase each Words' >> beam.Map(lambda x: x.lower())
                | 'Strip each Word into AlphaNumeric' >> beam.Map(lambda x: re.sub(r'\W+', '', x))
                | 'Count Words' >> beam.combiners.Count.PerElement())

p = beam.Pipeline()

(p | 'Read from Text' >> beam.io.ReadFromText('input1.txt')
   | 'Tokenize and Count' >> LowerTokenizeCountAndStrip()
   | 'Write to Text' >> beam.io.WriteToText('output-composite-transform', file_name_suffix='.txt'))

p.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x7d0f02990eb0>

**Windowing and Triggers**

In stream processing (and sometimes in batch processing with large datasets), data is often processed as it arrives, rather than waiting for a complete dataset. To handle this continuous flow of data effectively, two primary concepts are introduced in many stream processing systems, including Apache Beam: windowing and triggering.

- **Windowing**:
  - **Purpose**: To divide the unbounded, continuous stream of data into finite chunks or "windows" so that it can be processed.
  - **Types**:
    - Fixed Windows: Divide the data into fixed-size, non-overlapping chunks based on the event time. E.g., every 5 minutes.
    - Sliding Windows: Divide the data into fixed-size chunks, but the start of each window "slides" over time, so windows can overlap. E.g., a window of size 10 minutes that starts every 5 minutes.
    - Session Windows: Data is windowed based on periods of activity. A session window closes when it detects a gap in incoming data that's larger than a specified duration.
    - Global Windows: The default windowing strategy in Beam. It places all data into a single, unbounded window.
- **Triggering**:
  - **Purpose**: To determine when the results of the windowed computations should be emitted. Without triggers, a windowed computation might only produce a result when the window is closed, but often we want results more frequently or under specific conditions.
  - **Types**:
    - Event Time Triggers: Fire based on the event time (the timestamp on the data itself).
    - Processing Time Triggers: Fire based on the processing time (the time the data is processed by the system).
    - Count Triggers: Fire after a certain number of data elements have been collected.
    - Composite Triggers: Combine multiple triggers to form more complex conditions.
  -**Accumulation Modes**:
    - Discarding Mode: After a trigger fires, it discards all the accumulated data.
    - Accumulating Mode: After a trigger fires, it retains the accumulated data and continues accumulating until the window closes.

In this example, we simulate a continuous flow of data by providing multiple imput files. In this example the window is set to 1 second. This example also includes a AfterCount trigger. This means that for every window (which, based on the code, is 1 minute long), if there are 100 elements in that window, the trigger will fire, and the system will process and output the data for that window. If there are fewer than 100 elements, the window will not produce any output until it either gathers 100 elements or the window ends.

In [7]:
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterCount
from apache_beam import combiners

class LowerTokenizeCountAndStrip(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Split into Words' >> beam.ParDo(SplitIntoWords())
                | 'Lowercase each Words' >> beam.Map(lambda x: x.lower())
                | 'Strip each Word into AlphaNumeric' >> beam.Map(lambda x: re.sub(r'\W+', '', x))
                | 'Count Words' >> beam.combiners.Count.PerElement())

file_paths = ['input1.txt', 'input2.txt', 'input3.txt']

class SplitIntoWords(beam.DoFn):
    def process(self, element):
        return re.findall(r'\w+', element)

window_size = 1  # example window size in minutes

p = beam.Pipeline(options=PipelineOptions())

counts = (
    [p | f'Read file {i}' >> beam.io.ReadFromText(file_path) for i, file_path in enumerate(file_paths)]
    | 'Flatten PCollections' >> beam.Flatten()
    | 'Tokenize and Count' >> LowerTokenizeCountAndStrip()
    | 'Window' >> beam.WindowInto(FixedWindows(window_size), trigger=AfterCount(100), accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING)
    | 'Write results' >> beam.io.WriteToText('output-window-trigger', file_name_suffix='.txt')
)



p.run().wait_until_finish()



'DONE'