## **Apache Beam Basics**

Description: This notebook will teach you the basics of the Apache Beam Programming Model:

- Apache Beam Basics: Pipeline, PCollection, Transforms & PTransforms.
   - GroupByKey
   - CoGroupByKey
   - Flatten
   - Partition
- Introducing complexity: DoFn.
   - DoFn vs Map
   - DoFn LifeCycle
   - DoFn Stateful Processing
- Advanced: Streaming.
   - Fixed Windows
   - Sliding Windows
   - PubSub to Bigquery


EDEM. Master Big Data & Cloud 2024/2025<br>
Professor: Javi Briones

In [None]:
# Import Libraries
import apache_beam as beam

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [None]:
# Set Logs

import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format="serverless-edem-%(message)s")

# Suppress Apache Beam logs
logging.getLogger("apache_beam").setLevel(logging.WARNING)

## Apache Beam Basics

Apache Beam is a unified programming model for parallel data processing that provides a rich set of transformations to efficiently manipulate and process both batch and streaming data across multiple execution environments

### **Apache Beam Core Concepts** 

##### **Pipeline**
A **Pipeline** represents the entire workflow for processing data in Apache Beam. It defines the sequence of transformations applied to input data to produce outputs.

##### **PCollection**
A **PCollection** is an immutable, distributed dataset that serves as the input and output for transformations in a pipeline. It can handle bounded (batch) or unbounded (streaming) data.

##### **Transformations**
A **Transformation** is an operation applied to a PCollection to produce one or more output PCollections. Common transformations include Map, FlatMap, GroupByKey, and Combine.

##### **PTransform**
A **PTransform** is a reusable and composable abstraction that encapsulates one or more transformations. It simplifies pipelines by modularizing complex workflows.


In [None]:
"""
Exercise 01: What is a PCollection?

PCollection: A distributed dataset in Apache Beam
"""

#  Create and explore a simple PCollection

with beam.Pipeline(InteractiveRunner()) as pipeline:
    
    # Create a PCollection
    numbers = pipeline | "Create Numbers" >> beam.Create([1, 2, 3, 4, 5])

    # Log output
    numbers | "Log Elements" >> beam.Map(logging.info)

In [None]:
"""
Exercise 02: Declaring a Pipeline

Pipeline: Defines the workflow of data transformations.
"""

# Interactive pipeline
with beam.Pipeline(InteractiveRunner()) as pipeline:
    
    # Create a PCollection and perform a transformation
    squared_numbers = (
        pipeline
        | "Create Numbers" >> beam.Create([1, 2, 3, 4, 5])
        | "Square Numbers" >> beam.Map(lambda x: x * x)
    )

    # Log output
    squared_numbers | "Log Squared Numbers" >> beam.Map(logging.info)

In [None]:
"""
Exercise 03: Transformations

Transforms one or more input PCollections into one or more output PCollections.
"""

# Interactive pipeline
with beam.Pipeline(InteractiveRunner()) as pipeline:

    (
        pipeline
            # Input
            | "Input PCollection: Read Text From File" >> beam.io.ReadFromText('./input_text.txt')
            # Transformations
            | "FlatMap" >> beam.FlatMap(lambda z: z.split())
            | "Map" >> beam.Map(lambda x: (x,1))
            | "Combine" >> beam.CombinePerKey(sum)
            # Output
            | "Output PCollection: Print" >> beam.Map(logging.info)
    )

In [None]:
"""
Exercise 04: Reuse Transformations (PTransforms)

PTransform: A collection of transformations packaged as a reusable unit for complex workflows.
"""

class MyCustomTransform(beam.PTransform):
    
    def expand(self, pcoll):
        return (
            pcoll
            | "FlatMap" >> beam.FlatMap(lambda z: z.split())
            | "Map" >> beam.Map(lambda x: (x,1))
            | "Combine" >> beam.CombinePerKey(sum)
        )

with beam.Pipeline(InteractiveRunner()) as pipeline:

    (
        pipeline
            | "Input PCollection: Read Text From File" >> beam.io.ReadFromText('./input_text.txt')
            | "Apply Custom Transform" >> MyCustomTransform()
            | "Output PCollection: Print" >> beam.Map(logging.info)
    )

#### **Apache Beam Transformations** 

**- GroupByKey**

It groups the elements of a data bundle by a common key, producing a collection where each key is unique and associated with a list of values that share that key.


In [None]:
""" GroupByKey """

with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')]))

    (data 
        | "Combined" >> beam.GroupByKey()
        | "Print" >> beam.Map(logging.info))

**- CoGroupByKey** 

It merges two PCollections by key, producing pairs where each key is associated with lists of elements from both input collections. This is useful for operations involving data from two different sources.


In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')])
    p2 = p | "PCollection 02" >> beam.Create([('Spain', 'Madrid'), ('Spain','Alicante'), ('France', 'Lyon')])

    data = ((p1,p2) | beam.CoGroupByKey())

    data | "Print" >> beam.Map(logging.info)

**- Flatten**  

It merges multiple PCollections into a single PCollection, combining their elements into one flat structure.

In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create(['New York', 'Los Angeles', 'Miami', 'Chicago'])
    p2 = p | "Pcollection 02" >> beam.Create(['Madrid', 'Barcelona', 'Valencia', 'Malaga'])
    p3 = p | "Pcollection 03" >> beam.Create(['London','Manchester', 'Liverpool'])

    merged = ((p1,p2,p3)| beam.Flatten())

    merged | beam.Map(logging.info)

**- Partition**  

It splits a PCollection into multiple partitions based on defined criteria, enabling parallel and distributed processing of subsets of the data.


In [None]:
countries = ['Spain', 'USA', 'Switzerland']

def partition_fn(country,num_countries):
    return countries.index(country['country'])

with beam.Pipeline(InteractiveRunner()) as p:

        p1,p2,p3 = (
                p 
                | "PCollection" >> beam.Create([
                        {'country': 'Spain', 'city': 'Valencia'},
                        {'country': 'Spain', 'city': 'Barcelona'},
                        {'country': 'USA', 'city': 'New York'},
                        {'country': 'Switzerland', 'city': 'Zurich'},
                        {'country': 'Switzerland', 'city': 'Geneva'}  
                ])
                | "partition" >> beam.Partition(partition_fn, len(countries))
        )

        p2 |  beam.Map(logging.info)

## Introducing complexity

#### A. When to Use DoFn Instead of Map

In [None]:
"""
Exercise 05: This exercise shows how to achieve the same transformation
    using both Map and ParDo with DoFn.

Use Map for simple transformations where the logic is inline and does not require lifecycle methods.
Use ParDo with DoFn for more complex operations requiring setup, cleanup, or state management.
"""

In [None]:
""" Map """
with beam.Pipeline(InteractiveRunner()) as pipeline:

    (
        pipeline
            | "Input PCollection: Read Text From File" >> beam.io.ReadFromText('./input_text.txt')
            | "FlatMap" >> beam.FlatMap(lambda z: z.split())
            | "Map" >> beam.Map(lambda x: x.upper())
            | "Output PCollection: Print" >> beam.Map(logging.info)
    )

In [None]:
"""ParDo with DoFn"""

class UpperCaseDoFn(beam.DoFn):

    def setup(self):
        logging.info("Loading model...")
        self.model = lambda x: x.upper()

    def process(self, element):
        yield self.model(element)

    def teardown(self):
        logging.info("Releasing model resources.")

with beam.Pipeline(InteractiveRunner()) as pipeline:

    (
        pipeline
            | "Input PCollection: Read Text From File" >> beam.io.ReadFromText('./input_text.txt')
            | "FlatMap" >> beam.FlatMap(lambda z: z.split())
            | "DoFn" >> beam.ParDo(UpperCaseDoFn())
            | "Output PCollection: Print" >> beam.Map(logging.info)
    )


### **Exploring the DoFn LifeCycle**

The Lifecycle of a **DoFn** in Apache Beam refers to the stages that an instance of the `DoFn` class goes through from initialization to completion within the context of a *ParDo* transformation. Below are the key phases of the DoFn life cycle:

1. **Setup (Initialization):**  
   - **Goal:** This phase occurs once for each instance of `DoFn` before processing begins.  
   - **Process:** Initialization tasks, such as allocating resources, establishing connections, or loading models, are performed in the `setup(self)` method.

2. **Processing Elements (Process):**  
   - **Goal:** This phase executes the main logic for each input element in the data bundle.  
   - **Process:** The core processing logic is implemented in the `process(self, element)` method. This method is called for each element in the input `PCollection` and defines how the data is transformed.

3. **Start Bundle and Finish Bundle:**  
   - **Goal:** These phases occur once before (`start_bundle(self)`) and after (`finish_bundle(self)`) processing all elements in a bundle. A bundle represents a chunk of data processed in parallel.  
   - **Process:** Bundle-specific setup and cleanup tasks, such as initializing or finalizing temporary states or resources used within the bundle, are performed here.

4. **Teardown (Finalization):**  
   - **Goal:** This phase occurs once after all elements have been processed.  
   - **Process:** The `teardown(self)` method handles final resource cleanup, such as closing database connections, releasing memory, or terminating external processes.

---

The lifecycle of a **DoFn** provides a structured framework for managing resources and processing logic in *ParDo* transformations. Each instance is created, initialized, processes elements, and performs cleanup in a controlled and efficient manner. This ensures proper resource management and facilitates robust, scalable data processing pipelines.


Use ParDo with DoFn for tasks requiring state, lifecycle, or multiple outputs.

In [None]:
"""
Exercise: Understanding the lifecycle methods (setup, start_bundle, process, finish_bundle, and teardown)
 and how they are invoked during pipeline execution
"""

class LifecycleDoFn(beam.DoFn):

    def setup(self):
        logging.info("Setting up resources.")

    def start_bundle(self):
        logging.info("Starting a bundle.")

    def process(self, element):
        logging.info(f"Processing element: {element}")
        yield element.upper()

    def finish_bundle(self):
        logging.info("Finishing a bundle.")

    def teardown(self):
        logging.info("Tearing down resources.")

with beam.Pipeline(InteractiveRunner()) as pipeline:

    (
        pipeline
            | "Input PCollection: Read Text From File" >> beam.io.ReadFromText('./input_text.txt')
            | "FlatMap" >> beam.FlatMap(lambda z: z.split())
            | "Apply Lifecycle DoFn" >> beam.ParDo(LifecycleDoFn())
            | "Output PCollection: Print" >> beam.Map(logging.info)
    )



### **DoFn: Stateful Processing**

- Allows operations that depend on previous or cumulative data.
- Reduces the need to rely on external systems, such as databases or caching systems.
- Efficient, as the state is stored locally for each key and window.


In [None]:
import apache_beam as beam
from apache_beam.transforms.userstate import CombiningValueStateSpec

"""
Exercise: The state is persisted across multiple calls to process()
"""

class StatefulDoFn(beam.DoFn):
    
    # Define a state spec to maintain counts
    state_spec = CombiningValueStateSpec("count", sum)

    def process(self, element, state=beam.DoFn.StateParam(state_spec)):
        k,v = element
        state.add(1)
        yield f"Key: {k}, Count: {state.read()}"
    
# Interactive pipeline
with beam.Pipeline(InteractiveRunner()) as pipeline:

    (
        pipeline
            | "Input PCollection: Read Text From File" >> beam.io.ReadFromText('./input_text.txt')
            | "FlatMap" >> beam.FlatMap(lambda z: z.split())
            | "Map" >> beam.Map(lambda x: (x.lower(),1))
            | "Count Events Per Key" >> beam.ParDo(StatefulDoFn())
            | "Output PCollection: Print" >> beam.Map(logging.info)
    )

### **Streaming** 

#### GCP Setup

- PubSub Topics

```
gcloud pubsub topics create <TOPIC_NAME>
```

- PubSub Subscriptions

```
gcloud pubsub subscriptions create <SUBSCRIPTION_NAME> --topic=<TOPIC_NAME>
```

- Google Cloud Storage Bucket  

```
gcloud storage mb gs://<BUCKET_NAME> --location=<REGION_ID>
```


In [20]:
# Variables
project_id = ""
subscription_name = ""
bq_dataset = ""
bq_table = ""
bucket_name = ""

**Window in Apache Beam**
  
A window in Apache Beam defines a time frame for **organizing and grouping data elements** during processing, enabling time-based operations.

---

**Types of Windows:**

- **Fixed Window:** 

  Divides elements into fixed time intervals, segmenting the PCollection into evenly spaced, time-based windows.

- **Sliding Window:**  

  Creates overlapping windows with a specified size and stride, enabling continuous analysis of data over time.

- **Session Window:**  

  Groups elements based on **contiguous temporal activity**, where windows are dynamically defined by an **inactivity gap** between events, making it ideal for capturing logical sessions in streaming data.


##### - **Fixed Windows**

In [None]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)
    
    return json.loads(output)['temp']

class OutputDoFn(beam.DoFn):

    def process(self, element):
        yield element

def run():
    with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "Decode msg" >> beam.Map(decode_message)
            | "Fixed Window" >> beam.WindowInto(beam.window.FixedWindows(10))
            | "Combine" >> beam.CombineGlobally(sum).without_defaults()
            | "Print" >> beam.Map(print)
        )

# Run Process
logging.info("The process started")
run()

##### - **Sliding Windows**

In [None]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)

    return output['temp']

class OutputDoFn(beam.DoFn):

    def process(self, element):
        yield element

def run():
    with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "Decode msg" >> beam.Map(decode_message)
            | "Sliding Window" >> beam.WindowInto(beam.window.SlidingWindows(size=60, period=20))
            | "Combine" >> beam.CombinePerKey(sum)
            | "Print" >> beam.Map(print)
        )

# Run Process
logging.info("The process started")
run()

#### PubSub to BigQuery

In [None]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)

    return json.loads(output)

def run():
    with beam.Pipeline(options=PipelineOptions(
        streaming=True,
        save_main_session=True,
        job_name = "pubsub-to-bigquery-edem",
        project=project_id,
        runner="DataflowRunner",
        temp_location=f"gs://{bucket_name}/tmp",
        staging_location=f"gs://{bucket_name}/staging",
        region="europe-southwest1"
    )) as p:
        
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "decode msg" >> beam.Map(decode_message)
            | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                table = f"{project_id}:{bq_dataset}.{bq_table}", # Required Format: PROJECT_ID:DATASET.TABLE
                schema='name:STRING', # Required Format: field_name:TYPE
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
        )

# Run Process
logging.info("The process started")
run()