<a href="https://colab.research.google.com/github/schumbar/SJSU_CMPE255/blob/main/assignment_04/C_ApacheBeam/Part_C_ApacheBeam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 04: Apache Beam Data Engineering Assignment
### Part C: Apache Beam Features
By Shawn Chumbar
  
Please note that I have used ChatGPT to assist me with this assignment.
You can view all of the code that I have written for each task within the **Apache Beam Code** section. Alternatively, I have also written up a summary of each step within the **Conclusion** section with supporting code snippets.

Tasks:
1. Composite transform
2. Pipeline IO
3. triggers
4. windowing
5. ParDo

Sources:
1. [About Beam ML](https://beam.apache.org/documentation/ml/about-ml/)
2. [Get started with AI/ML pipelines](https://beam.apache.org/documentation/ml/overview/)
3. [Use RunInference with Sklearn](https://beam.apache.org/documentation/transforms/python/elementwise/runinference-sklearn/)
4. [Apache Beam Tutorial](https://www.macrometa.com/event-stream-processing/apache-beam-tutorial)
5. [Intro to Apache Beam - Python](https://colab.research.google.com/drive/1qrqbpRpfMtwosjcZQ3_qAWvBCXtzs-8D?usp=sharing)

Dataset Link:
[Healthcare Insurance](https://www.kaggle.com/datasets/willianoliveiragibin/healthcare-insurance)

In [1]:
!pip install apache_beam

Collecting apache_beam
  Downloading apache_beam-2.51.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.7/14.7 MB[0m [31m54.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting crcmod<2.0,>=1.7 (from apache_beam)
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache_beam)
  Downloading orjson-3.9.9-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (138 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m138.7/138.7 kB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache_beam)
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m18.6 MB/s[0m eta [36m0:00:

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

In [4]:
# Load the datasets
file_path = '/content/drive/MyDrive/SJSU/CMPE_255/assignment_04/datasets/insurance.csv'
data = pd.read_csv(file_path)
data.head()

Unnamed: 0,age,sex,bmi,children,smoker,region,charges
0,19,female,27.9,0,yes,southwest,16884.924
1,18,male,33.77,1,no,southeast,1725.5523
2,28,male,33.0,3,no,southeast,4449.462
3,33,male,22.705,0,no,northwest,21984.47061
4,32,male,28.88,0,no,northwest,3866.8552


#### Manual EDA

In [5]:
data.columns

Index(['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges'], dtype='object')

This data seems to have the following columns:
* Age
* sex
* bmi
* children
* smoker
* region
* charges




### Apache Beam Code

In [6]:


# Function to parse each CSV row into a dictionary
def parse_csv(line):
    fields = line.split(',')
    return {
        'age': int(fields[0]),
        'sex': fields[1],
        'bmi': float(fields[2]),
        'children': int(fields[3]),
        'smoker': fields[4],
        'region': fields[5],
        'charges': float(fields[6])
    }

# Function to format results into a CSV string. It now accepts two parameters: key and value.
def to_csv_string(key, value):
    # Assuming you want to write the key and value separated by a comma
    return f"{key},{value}"


# Composite transform to calculate average charge by a grouping key (e.g., smoker status)
class CalculateAverageChargeByGroup(beam.PTransform):
    def __init__(self, group_key):
        self.group_key = group_key

    def expand(self, pcoll):
        return (
            pcoll
            | 'Extract Key Value' >> beam.Map(lambda elem: (elem[self.group_key], elem['charges']))
            | 'Group By Key' >> beam.GroupByKey()
            | 'Calculate Average' >> beam.Map(lambda elem: (elem[0], sum(elem[1]) / len(elem[1])))
        )

# Define the pipeline
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
    csv_lines = (
        p
        | 'Read from CSV' >> beam.io.ReadFromText(file_path, skip_header_lines=1)
        | 'Parse CSV to Dict' >> beam.Map(parse_csv)
    )

    # Calculate average charge by 'smoker' status as an example
    average_charge_by_smoker = (
        csv_lines
        | 'Average Charge by Smoker' >> CalculateAverageChargeByGroup('smoker')
    )
    # Convert the results to CSV format and write them to a file
    (
        average_charge_by_smoker
        | 'Format as CSV' >> beam.MapTuple(to_csv_string)
        | 'Write to File' >> beam.io.WriteToText('/content/drive/MyDrive/SJSU/CMPE_255/assignment_04/datasets/output.csv')
    )






###

In [7]:
file_path = '/content/drive/MyDrive/SJSU/CMPE_255/assignment_04/datasets/insurance.csv'

In [8]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
import logging

# Custom ParDo class to filter data based on some condition.
class FilterData(beam.DoFn):
    def process(self, element):
        try:
            if float(element['charges']) > 1000:  # Assuming 'charges' is a valid field
                yield element
        except ValueError as e:
            logging.error(f"Error processing {element}: {e}")
            # Consider how you want to handle exceptions in your pipeline.

# Custom composite transform that combines multiple transforms
class ComputeAverageCharge(beam.PTransform):
    def expand(self, input_coll):
        return (
            input_coll
            | 'Extract and Convert Data' >> beam.Map(lambda elem: (elem['region'], float(elem['charges'])))
            | 'Combine by Key' >> beam.CombinePerKey(sum)  # This is a simplistic approach for the sake of demonstration.
        )

# Function to safely convert CSV lines to dictionaries
def safe_dict_read(line, headers):
    try:
        values = line.split(',')
        return dict(zip(headers, values))  # Pairing headers with corresponding values
    except Exception as e:
        logging.error(f"Failed to parse line: {line}, Error: {e}")
        return None  # Returning None to indicate failure of parsing this line

def run():
    # Defining the pipeline options
    pipeline_options = PipelineOptions(
        runner='DirectRunner'  # using a direct runner for simplicity
    )

    # Define the CSV headers
    csv_headers = ['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']

    with beam.Pipeline(options=pipeline_options) as p:
        (
            p
            | 'Read from CSV' >> beam.io.ReadFromText(file_path, skip_header_lines=1)  # Reading data from the source
            | 'Convert to dict' >> beam.Map(lambda line: safe_dict_read(line, csv_headers))
            | 'Filter records' >> beam.ParDo(FilterData())  # Using ParDo for a custom filtering operation
            | 'Window into' >> beam.WindowInto(FixedWindows(60))  # Windowing the data into fixed intervals
            | 'Calculate average charge' >> ComputeAverageCharge()  # Applying the custom composite transform
            | 'Write results' >> beam.Map(print)  # Writing the results to the console or another sink
        )

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7cdc87e8abc0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')


('southwest', 4012754.647620001)
('southeast', 5363689.763290002)
('northwest', 4035711.9965399993)
('northeast', 4343668.583308999)


## Conclusion
Please see the section below for proof that we performed all of the things outlined in the Assignment prompt. I have provided code snippets of code that I have written showcasing that I have performed the following Apache Beam tasks:
1. Composite Transform
2. Pipeline I/O
3. Triggers
4. Windowing
5. ParDo

### Composite Transform

A composite transform is essentially a combination of multiple transforms encapsulated into one reusable component. In the script, `ComputeAverageCharge` is a composite transform. It's a class that inherits from `beam.PTransform` and overrides the `expand` method to provide its transformation logic.

Here's the relevant code snippet:

```python
class ComputeAverageCharge(beam.PTransform):
    def expand(self, input_coll):
        # In this transform, we're combining multiple steps:
        # 1. Extracting and converting data from each element.
        # 2. Combining data by key (in this case, by 'region').
        return (
            input_coll
            | 'Extract and Convert Data' >> beam.Map(lambda elem: (elem['region'], float(elem['charges'])))
            | 'Combine by Key' >> beam.CombinePerKey(sum)  # Summing up the charges for each region
        )
```

### Pipeline I/O

Pipeline I/O refers to the input and output operations within a pipeline. This is where we read data into our pipeline and output the processed data from our pipeline.

In the script, we handle input by reading from a CSV file and output by printing the results to the console (though in a real-world scenario, you might write to a database, a file system, or some other storage service).

```python
(
    p
    | 'Read from CSV' >> beam.io.ReadFromText('/path/to/your/insurance_sample.csv', skip_header_lines=1)  # Input operation
    # ... [data processing steps] ...
    | 'Write results' >> beam.Map(print)  # Output operation
)
```

### Triggers

In the context of Apache Beam, triggers determine when to emit the aggregated results of each window (in streaming, primarily). The script you have is more suited for batch processing and doesn't explicitly define a custom trigger, so it uses Beam's default trigger, which emits the result of each window when it's considered complete.

If we were to use triggers in a streaming context, they would be defined as part of the windowing strategy, like so:

```python
| 'Window' >> beam.WindowInto(
      FixedWindows(60),
      trigger=AfterWatermark(),  # This is where you'd specify the trigger.
      accumulation_mode=AccumulationMode.DISCARDING)
```

### Windowing

Windowing is the mechanism for collecting and grouping data into finite sets or windows. In the provided script, we use fixed-time windowing, which means events are grouped into windows of a set duration.

```python
| 'Window into' >> beam.WindowInto(FixedWindows(60))  # Windowing the data into fixed intervals of 60 seconds
```

### ParDo

ParDo is a Beam transform for generic parallel processing. A `ParDo` takes a `DoFn` object that defines the processing function that should be applied to each element of the input `PCollection`.

In the script, `FilterData` is a `DoFn` applied to the pipeline using `ParDo`, which filters the elements based on a specific condition.

```python
class FilterData(beam.DoFn):
    def process(self, element):
        # ... [your filtering logic] ...

# Applying the ParDo in the pipeline
| 'Filter records' >> beam.ParDo(FilterData())
```

This `ParDo` operation is where each element is individually processed (filtered) through the user-defined function in `FilterData`.