In [1]:
!pip install apache-beam[gcp]


Collecting apache-beam[gcp]
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[gcp])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m687.1 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting fastavro<2,>=0.2

In [2]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


Use Case: Sensor Data Analysis

I will simulate the processing of temperature readings from multiple sensors in a smart home environment. The readings are coming in real-time, and we need to process this data using Apache Beam with features like composite transforms, pipeline I/O, triggers, windowing, ParDo, and streaming.



1. Composite Transform

We will create a composite transform to process sensor readings by filtering and scaling temperatures.

In [3]:
class ScaleAndFilter(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Scale Temperature' >> beam.Map(lambda x: (x[0], x[1] * 1.8 + 32))  # Convert °C to °F
            | 'Filter Normal' >> beam.Filter(lambda x: 60 <= x[1] <= 80)  # Keep only normal temperatures
        )


2. Pipeline I/O

Here, we'll simulate reading sensor data from a file and writing the output to another file.

In [4]:
def io_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (
            pipeline
            | 'Read from file' >> beam.io.ReadFromText('sensor_data.txt')  # Simulate reading from a file
            | 'Parse sensor data' >> beam.Map(lambda x: (x.split(',')[0], float(x.split(',')[1])))
            | 'Scale and Filter' >> ScaleAndFilter()
            | 'Write to file' >> beam.io.WriteToText('output_sensor_data.txt')
        )


3. Triggers and Windowing

We'll use windowing and triggers to analyze data in fixed time windows and trigger outputs based on processing time.

In [5]:
def windowing_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (
            pipeline
            | 'Create sensor data' >> beam.Create([('sensor1', 65), ('sensor2', 75), ('sensor1', 85)])
            | 'Window into fixed intervals' >> beam.WindowInto(
                beam.window.FixedWindows(10),  # 10-second windows
                trigger=beam.trigger.AfterProcessingTime(10),  # Trigger after 10 seconds
                accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
            )
            | 'Group by sensor' >> beam.GroupByKey()
            | 'Print grouped data' >> beam.Map(print)
        )


4. ParDo

We'll use ParDo to perform a more complex transformation, like flagging abnormal temperatures.

In [6]:
class FlagAbnormalTemps(beam.DoFn):
    def process(self, element):
        sensor, temp = element
        if temp < 60 or temp > 80:
            yield f'Warning: {sensor} reported abnormal temperature {temp}'

def pardo_pipeline():
    with beam.Pipeline(options=PipelineOptions()) as pipeline:
        (
            pipeline
            | 'Create sensor data' >> beam.Create([('sensor1', 50), ('sensor2', 85), ('sensor3', 75)])
            | 'Flag abnormal temps' >> beam.ParDo(FlagAbnormalTemps())
            | 'Print warnings' >> beam.Map(print)
        )


5. Streaming

Simulating streaming data from sensors (in a real-world scenario, we would use Pub/Sub, etc.).

In [7]:
import time

def streaming_pipeline():
    with beam.Pipeline(options=PipelineOptions(streaming=True)) as pipeline:
        (
            pipeline
            | 'Simulate sensor data' >> beam.Create([('sensor1', 65), ('sensor2', 85), ('sensor3', 45)])
            | 'Simulate streaming' >> beam.ParDo(lambda x: (time.sleep(1), print(x)))
        )

streaming_pipeline()






('sensor1', 65)
('sensor2', 85)
('sensor3', 45)


Summary of Approach:

Composite Transform: Scale and filter sensor data.

Pipeline I/O: Read sensor data from a file and write the output.

Triggers and Windowing: Analyze data in fixed intervals with triggers.

ParDo: Detect and flag abnormal temperatures.

Streaming: Simulate real-time sensor data processing.