In [3]:
!pip install --quiet apache-beam

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/89.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━[0m [32m81.9/89.7 kB[0m [31m3.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m43.5/43.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... [?25l[?25hdone
  Preparing metadata (setup.py) ... 

# Apache Beam Pipeline for Transforming and Printing Data

In [4]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def process_word(word):
    return f"{word}: {len(word)} characters"

# Set up pipeline options
pipeline_options = PipelineOptions()

# Create the pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:

    # Create initial PCollection
    words = pipeline | "Generate Words" >> beam.Create([
        "Data", "Processing", "with", "Apache", "Beam"
    ])

    # Apply transformations
    processed_words = (words
        | "Add Length Info" >> beam.Map(process_word)
        | "To Lowercase" >> beam.Map(str.lower)
    )

    # Output results
    processed_words | "Display Results" >> beam.Map(print)

# Pipeline runs automatically due to the 'with' statement





data: 4 characters
processing: 10 characters
with: 4 characters
apache: 6 characters
beam: 4 characters


# Composite Transform
I've implemented composite transform called **TextAnalyzer** using beam.PTransform.

*   It takes a PCollection of strings.
*   Calculates the length of each string using beam.Map.
*   Formats the output as "string: length letters" using beam.Map.

In [5]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

class TextAnalyzer(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Count Characters' >> beam.Map(lambda x: (x, len(x)))
                | 'Format Output' >> beam.Map(lambda x: f"{x[0]}: {x[1]} letters"))

# Set up pipeline options
options = PipelineOptions()

# Create and run the pipeline
with beam.Pipeline(options=options) as pipeline:
    result = (pipeline
              | 'Create Input' >> beam.Create(['python', 'apache', 'beam', 'dataflow'])
              | 'Analyze Text' >> TextAnalyzer()
              | 'Display Results' >> beam.Map(print))



python: 6 letters
apache: 6 letters
beam: 4 letters
dataflow: 8 letters


# Pipeline IO

In [6]:
with beam.Pipeline(options=options) as p:
    (p
     | 'Read File' >> beam.io.ReadFromText('/content/drive/MyDrive/ProjectsData/KDD_Detailed_description.txt')
     | 'Process Text' >> beam.Map(lambda x: x.upper())
     | 'Write Results' >> beam.io.WriteToText('/content/drive/MyDrive/ProjectsData/KDD_Detailed_description_uppercase.txt')
    )





# Triggers
Using triggers in Apache Beam pipelines to control when data is processed within a window.

*   It simulates a data stream using data\_generator which yields data points with a random integer and sleeps for 0.8 seconds between each yield to simulate real-time data arrival.

*   The pipeline uses a sliding window of 4 seconds with a 2-second period, meaning a new window starts every 2 seconds and considers the last 4 seconds of data.

In [8]:
import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
import random
import time

def data_generator():
    for i in range(15):
        yield f"data_point_{random.randint(1, 100)}"
        time.sleep(0.8)  # Simulates data arriving every 0.8 seconds

pipeline_config = PipelineOptions()

with beam.Pipeline(options=pipeline_config) as pipeline:
    (pipeline
     | 'Ingest Data' >> beam.Create(data_generator())
     | 'Apply Windowing' >> beam.WindowInto(
         window.SlidingWindows(4, 2),
         trigger=trigger.AfterWatermark(early=trigger.AfterCount(3)),
         accumulation_mode=trigger.AccumulationMode.ACCUMULATING
     )
     | 'Process and Log' >> beam.Map(lambda element: print(f"Processed: {element}"))
    )



Processed: data_point_57
Processed: data_point_71
Processed: data_point_36
Processed: data_point_1
Processed: data_point_52
Processed: data_point_26
Processed: data_point_90
Processed: data_point_33
Processed: data_point_83
Processed: data_point_34
Processed: data_point_56
Processed: data_point_97
Processed: data_point_73
Processed: data_point_99
Processed: data_point_87


# Windowing
*   The pipeline employs sliding windows
with a 4-second window duration and a 2-second sliding period.
*   This means a new window starts every 2 seconds and encompasses the last 4 seconds of data.

In [9]:
def data_stream_simulator():
    for _ in range(12):
        yield f"item_{random.randint(100, 999)}"
        time.sleep(0.8)  # Simulates data arriving every 0.8 seconds

pipeline_settings = PipelineOptions()

with beam.Pipeline(options=pipeline_settings) as data_pipeline:
    (data_pipeline
     | 'Ingest Streaming Data' >> beam.Create(data_stream_simulator())
     | 'Apply Sliding Windows' >> beam.WindowInto(window.SlidingWindows(4, 2))
     | 'Process and Display' >> beam.Map(lambda element: print(f"Processed in window: {element}"))
    )



Processed in window: item_349
Processed in window: item_170
Processed in window: item_360
Processed in window: item_955
Processed in window: item_777
Processed in window: item_442
Processed in window: item_210
Processed in window: item_182
Processed in window: item_199
Processed in window: item_124
Processed in window: item_552
Processed in window: item_346


# ParDo
*   It allows you to apply a user-defined function to each element in a PCollection.
*   The process method within EnhanceData takes an element, capitalizes it, adds "[enhanced]" to it, and yields the modified element.

In [10]:
class EnhanceData(beam.DoFn):
    def process(self, item):
        enhanced = f"{item.capitalize()} [enhanced]"
        yield enhanced

pipeline_config = PipelineOptions()

with beam.Pipeline(options=pipeline_config) as data_pipeline:
    (data_pipeline
     | 'Generate Input' >> beam.Create(['apple', 'banana', 'cherry', 'date'])
     | 'Enhance Items' >> beam.ParDo(EnhanceData())
     | 'Display Results' >> beam.Map(lambda x: print(f"Processed: {x}"))
    )



Processed: Apple [enhanced]
Processed: Banana [enhanced]
Processed: Cherry [enhanced]
Processed: Date [enhanced]
