In [1]:
# Install Apache Beam
!pip install apache-beam

Collecting apache-beam
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam)
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam)
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.3 (from apache-beam)
  Do

In [2]:
# Simple Apache Beam Pipeline Example
import apache_beam as beam

# Define a basic Apache Beam pipeline
with beam.Pipeline() as pipeline:
    # Read from a sample list
    input_data = pipeline | "Create Data" >> beam.Create([1, 2, 3, 4, 5])

    # Apply a transformation (e.g., multiply each element by 2)
    output_data = input_data | "Multiply by 2" >> beam.Map(lambda x: x * 2)

    # Write the output to a text file
    output_data | "Write to File" >> beam.io.WriteToText("output.txt")

print("Pipeline executed. Check 'output.txt' for the results.")



Pipeline executed. Check 'output.txt' for the results.


In [3]:
# Composite Transform Example
class MultiplyAndFilter(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Multiply by 3" >> beam.Map(lambda x: x * 3)
            | "Filter Even Numbers" >> beam.Filter(lambda x: x % 2 == 0)
        )

with beam.Pipeline() as pipeline:
    input_data = pipeline | "Create Data" >> beam.Create([1, 2, 3, 4, 5, 6, 7, 8, 9])
    processed_data = input_data | "Apply Composite Transform" >> MultiplyAndFilter()
    processed_data | "Write Processed Data" >> beam.io.WriteToText("composite_output.txt")

print("Composite Transform executed. Check 'composite_output.txt' for the results.")


Composite Transform executed. Check 'composite_output.txt' for the results.


In [4]:
# Triggers and Windowing Example
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows

# Define a sample dataset with timestamps
sample_data = [
    {"event": "A", "timestamp": 1},
    {"event": "B", "timestamp": 2},
    {"event": "C", "timestamp": 3},
    {"event": "D", "timestamp": 6},
    {"event": "E", "timestamp": 8},
]

# Apply a pipeline
with beam.Pipeline() as pipeline:
    (
        pipeline
        | "Create Events" >> beam.Create(sample_data)
        | "Add Timestamps" >> beam.Map(lambda x: beam.window.TimestampedValue(x, x["timestamp"]))
        | "Fixed Windowing" >> beam.WindowInto(FixedWindows(3))  # 3-second fixed windows
        | "Extract Events" >> beam.Map(lambda x: x["event"])
        | "Write Windowed Output" >> beam.io.WriteToText("windowed_output.txt")
    )

print("Windowing example executed. Check 'windowed_output.txt' for results.")


Windowing example executed. Check 'windowed_output.txt' for results.


In [5]:
# ParDo Example
class SplitWords(beam.DoFn):
    def process(self, element):
        return element.split()

with beam.Pipeline() as pipeline:
    input_data = pipeline | "Create Sentences" >> beam.Create(["Apache Beam is powerful", "Python is great"])
    words = input_data | "Split into Words" >> beam.ParDo(SplitWords())
    words | "Write Words" >> beam.io.WriteToText("pardo_output.txt")

print("ParDo example executed. Check 'pardo_output.txt' for results.")


ParDo example executed. Check 'pardo_output.txt' for results.


In [7]:
# Simulated Streaming with ParDo
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import time

# Enable streaming mode
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True

# Custom DoFn to generate streaming events
class GenerateEvents(beam.DoFn):
    def process(self, _):
        for i in range(5):  # Simulate 5 events
            yield {"event_id": i, "timestamp": time.time()}
            time.sleep(1)  # Simulate a 1-second delay between events

with beam.Pipeline(options=pipeline_options) as pipeline:
    (
        pipeline
        | "Start Stream" >> beam.Create([None])  # Dummy element to start the stream
        | "Generate Events" >> beam.ParDo(GenerateEvents())
        | "Format Output" >> beam.Map(lambda x: f"Event ID: {x['event_id']} at {x['timestamp']}")
        | "Write Streaming Output" >> beam.io.WriteToText("streaming_output.txt")
    )

print("Streaming example executed. Check 'streaming_output.txt' for results.")




Streaming example executed. Check 'streaming_output.txt' for results.


In [8]:
# Combined Features Example
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode

# Custom DoFn to simulate event generation with timestamps
class GenerateEventsWithTimestamps(beam.DoFn):
    def process(self, _):
        import random
        import time
        for i in range(10):  # Simulate 10 events
            yield beam.window.TimestampedValue(
                {"event_id": i, "value": random.randint(1, 100)},
                time.time()
            )
            time.sleep(0.5)  # Simulate half-second intervals

with beam.Pipeline(options=pipeline_options) as pipeline:
    (
        pipeline
        | "Start Stream" >> beam.Create([None])  # Dummy element to kickstart the stream
        | "Generate Events" >> beam.ParDo(GenerateEventsWithTimestamps())
        | "Fixed Windowing (5s)" >> beam.WindowInto(
            FixedWindows(5),  # Group events into 5-second windows
            trigger=AfterProcessingTime(2),  # Emit results 2 seconds after processing
            accumulation_mode=AccumulationMode.DISCARDING
        )
        | "Process Events" >> beam.Map(lambda x: f"Event {x['event_id']} with value {x['value']}")
        | "Write Results" >> beam.io.WriteToText("combined_output.txt")
    )

print("Combined example executed. Check 'combined_output.txt' for results.")




Combined example executed. Check 'combined_output.txt' for results.
