**Demonstrating apache beam using the Breast Cancer Gene Expression Profiles (METABRIC) dataset**

**Understanding the domain and data selection**

In [14]:
import pandas as pd

# Loading the dataset
# Load the dataset
from google.colab import files
uploaded = files.upload()
mutation_df = pd.read_csv('mutation.csv')

Saving mutation.csv to mutation (1).csv


In [15]:
# Displaying the first few rows of the dataset
mutation_df .head()

Unnamed: 0,patient_id,age_at_diagnosis,type_of_breast_surgery,cancer_type,cancer_type_detailed,cellularity,chemotherapy,pam50_+_claudin-low_subtype,cohort,er_status_measured_by_ihc,...,integrative_cluster,primary_tumor_laterality,lymph_nodes_examined_positive,mutation_count,nottingham_prognostic_index,oncotree_code,overall_survival_months,overall_survival,pr_status,radio_therapy
0,0,75.65,MASTECTOMY,Breast Cancer,Breast Invasive Ductal Carcinoma,,0,claudin-low,1,Positve,...,4ER+,Right,10,,6.044,IDC,140.5,1,Negative,1
1,2,43.19,BREAST CONSERVING,Breast Cancer,Breast Invasive Ductal Carcinoma,High,0,LumA,1,Positve,...,4ER+,Right,0,2.0,4.02,IDC,84.633333,1,Positive,1
2,5,48.87,MASTECTOMY,Breast Cancer,Breast Invasive Ductal Carcinoma,High,1,LumB,1,Positve,...,3,Right,1,2.0,4.03,IDC,163.7,0,Positive,0
3,6,47.68,MASTECTOMY,Breast Cancer,Breast Mixed Ductal and Lobular Carcinoma,Moderate,1,LumB,1,Positve,...,9,Right,3,1.0,4.05,MDLC,164.933333,1,Positive,1
4,8,76.97,MASTECTOMY,Breast Cancer,Breast Mixed Ductal and Lobular Carcinoma,High,1,LumB,1,Positve,...,9,Right,8,2.0,6.08,MDLC,41.366667,0,Positive,1


In [16]:
!pip install apache-beam



**Composite Transform**

In [17]:
import apache_beam as beam
# Composite Transform to filter NaN mutation counts and compute average mutation_count for each cancer_type_detailed
class ComputeAverageMutationCount(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'FilterValidMutationCount' >> beam.Filter(lambda record: not pd.isna(record['mutation_count']))
            | 'ExtractCancerTypeAndMutationCount' >> beam.Map(lambda record: (record['cancer_type_detailed'], record['mutation_count']))
            | 'SumMutationCountsAndCountRecords' >> beam.CombinePerKey(lambda counts: (sum(counts), len(counts)))
            | 'ComputeAverage' >> beam.Map(lambda pair: (pair[0], pair[1][0] / pair[1][1]))
        )

# Convert the dataframe to a list of dictionaries (each dictionary represents a row)
mutation_records = mutation_df.to_dict(orient='records')

**Pipeline IO**

In [19]:
# Define the custom combiner
class AverageCombiner(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)  # sum, count

    def add_input(self, accumulator, input_value):
        sum_, count = accumulator
        return sum_ + input_value, count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, accumulator):
        sum_, count = accumulator
        if count:
            return sum_ / count
        else:
            return 0  # Handle division by zero

# Composite Transform to filter NaN mutation counts and compute average mutation_count for each cancer_type_detailed
class ComputeAverageMutationCount(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'FilterValidMutationCount' >> beam.Filter(lambda record: not pd.isna(record['mutation_count']))
            | 'ExtractCancerTypeAndMutationCount' >> beam.Map(lambda record: (record['cancer_type_detailed'], record['mutation_count']))
            | 'ComputeAverageMutationCount' >> beam.CombinePerKey(AverageCombiner())
        )

# Convert the dataframe to a list of dictionaries (each dictionary represents a row)
mutation_records = mutation_df.to_dict(orient='records')

# Apply the composite transform and write to CSV
with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'ReadFromMemory' >> beam.Create(mutation_records)
        | 'ComputeAverageMutationCount' >> ComputeAverageMutationCount()
        | 'WriteToCSV' >> beam.io.WriteToText('output.csv', file_name_suffix='.csv')
    )


**Triggers and Windowing**

In [20]:
window_size = 1000

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'ReadFromMemory' >> beam.Create(mutation_records)
        | 'WindowIntoBatches' >> beam.WindowInto(beam.window.FixedWindows(window_size))
        | 'WriteToCSV' >> beam.io.WriteToText('windowed_output.csv')
    )


**ParDo**

In [22]:
class TagWithCancerTypeAndCount(beam.DoFn):
    def process(self, record):
        yield (record['cancer_type'], record['mutation_count'])

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'ReadFromMemory' >> beam.Create(mutation_records)
        | 'TagWithCancerTypeAndCount' >> beam.ParDo(TagWithCancerTypeAndCount())
        | 'WriteToCSV' >> beam.io.WriteToText('tagged_output.csv')
    )
