In [4]:
!pip install apache-beam




In [1]:
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
from google.colab import files

uploaded = files.upload()


Saving heart.csv to heart.csv


Basic Pipeline with I/O:

In [26]:
import apache_beam as beam

def run_pipeline():
    with beam.Pipeline() as p:
        (p | 'Read from CSV' >> beam.io.ReadFromText('heart.csv')
           | 'Write to CSV' >> beam.io.WriteToText('output'))

run_pipeline()


In [27]:
import pandas as pd
data = pd.read_csv('output')
data.info()
data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 303 entries, 0 to 302
Data columns (total 14 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   age       303 non-null    int64  
 1   sex       303 non-null    int64  
 2   cp        303 non-null    int64  
 3   trtbps    303 non-null    int64  
 4   chol      303 non-null    int64  
 5   fbs       303 non-null    int64  
 6   restecg   303 non-null    int64  
 7   thalachh  303 non-null    int64  
 8   exng      303 non-null    int64  
 9   oldpeak   303 non-null    float64
 10  slp       303 non-null    int64  
 11  caa       303 non-null    int64  
 12  thall     303 non-null    int64  
 13  output    303 non-null    int64  
dtypes: float64(1), int64(13)
memory usage: 33.3 KB


Unnamed: 0,age,sex,cp,trtbps,chol,fbs,restecg,thalachh,exng,oldpeak,slp,caa,thall,output
0,63,1,3,145,233,1,0,150,0,2.3,0,0,1,1
1,37,1,2,130,250,0,1,187,0,3.5,0,0,2,1
2,41,0,1,130,204,0,0,172,0,1.4,2,0,2,1
3,56,1,1,120,236,0,1,178,0,0.8,2,0,2,1
4,57,0,0,120,354,0,1,163,1,0.6,2,0,2,1


Using ParDo for Custom Processing:


In [22]:
import apache_beam as beam

class FilterAge(beam.DoFn):
    def process(self, element):
        # Check if the element is the header row
        if element.startswith('age'):
            yield element  # Pass the header through if desired
            return

        # Filtering records with age > 50
        if int(element.split(',')[0]) > 50:  # Assuming age is the first column
            yield element

def run_filter_pipeline():
    with beam.Pipeline() as p:
        (p | 'Read from heart.csv' >> beam.io.ReadFromText('heart.csv')
           | 'Filter Age' >> beam.ParDo(FilterAge())
           | 'Write to filtered_output.csv' >> beam.io.WriteToText('filtered_output'))

run_filter_pipeline()



In [23]:
import pandas as pd
data = pd.read_csv('filtered_output')
data.info()
data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 208 entries, 0 to 207
Data columns (total 14 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   age       208 non-null    int64  
 1   sex       208 non-null    int64  
 2   cp        208 non-null    int64  
 3   trtbps    208 non-null    int64  
 4   chol      208 non-null    int64  
 5   fbs       208 non-null    int64  
 6   restecg   208 non-null    int64  
 7   thalachh  208 non-null    int64  
 8   exng      208 non-null    int64  
 9   oldpeak   208 non-null    float64
 10  slp       208 non-null    int64  
 11  caa       208 non-null    int64  
 12  thall     208 non-null    int64  
 13  output    208 non-null    int64  
dtypes: float64(1), int64(13)
memory usage: 22.9 KB


Unnamed: 0,age,sex,cp,trtbps,chol,fbs,restecg,thalachh,exng,oldpeak,slp,caa,thall,output
0,63,1,3,145,233,1,0,150,0,2.3,0,0,1,1
1,56,1,1,120,236,0,1,178,0,0.8,2,0,2,1
2,57,0,0,120,354,0,1,163,1,0.6,2,0,2,1
3,57,1,0,140,192,0,1,148,0,0.4,1,0,1,1
4,56,0,1,140,294,0,0,153,0,1.3,1,0,2,1


Windowing and Triggers:

In [19]:
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterCount, AccumulationMode

def run_windowed_pipeline():
    with beam.Pipeline() as p:
        (p | 'Read from Stream' >> beam.io.ReadFromText('heart.csv')
           | 'Window into Fixed Intervals' >> beam.WindowInto(
               FixedWindows(10),
               trigger=AfterWatermark(early=AfterCount(5)),
               accumulation_mode=AccumulationMode.ACCUMULATING)
           | 'Write to windowed_output.csv' >> beam.io.WriteToText('windowed_output'))

run_windowed_pipeline()



In [21]:
import pandas as pd
data = pd.read_csv('windowed_output')
data.info()
data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 303 entries, 0 to 302
Data columns (total 14 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   age       303 non-null    int64  
 1   sex       303 non-null    int64  
 2   cp        303 non-null    int64  
 3   trtbps    303 non-null    int64  
 4   chol      303 non-null    int64  
 5   fbs       303 non-null    int64  
 6   restecg   303 non-null    int64  
 7   thalachh  303 non-null    int64  
 8   exng      303 non-null    int64  
 9   oldpeak   303 non-null    float64
 10  slp       303 non-null    int64  
 11  caa       303 non-null    int64  
 12  thall     303 non-null    int64  
 13  output    303 non-null    int64  
dtypes: float64(1), int64(13)
memory usage: 33.3 KB


Unnamed: 0,age,sex,cp,trtbps,chol,fbs,restecg,thalachh,exng,oldpeak,slp,caa,thall,output
0,63,1,3,145,233,1,0,150,0,2.3,0,0,1,1
1,37,1,2,130,250,0,1,187,0,3.5,0,0,2,1
2,41,0,1,130,204,0,0,172,0,1.4,2,0,2,1
3,56,1,1,120,236,0,1,178,0,0.8,2,0,2,1
4,57,0,0,120,354,0,1,163,1,0.6,2,0,2,1


Composite Transforms:

In [24]:
class AddNewColumn(beam.PTransform):
    def expand(self, pcoll):
        return (pcoll
                | 'Filter Age' >> beam.ParDo(FilterAge())
                | 'Add Column' >> beam.Map(lambda x: x + ',NEW_COLUMN_VALUE'))

def run_composite_pipeline():
    with beam.Pipeline() as p:
        (p | 'Read from heart.csv' >> beam.io.ReadFromText('heart.csv')
           | 'Composite Transform' >> AddNewColumn()
           | 'Write to composite_output.csv' >> beam.io.WriteToText('composite_output'))

run_composite_pipeline()


In [25]:
import pandas as pd
data = pd.read_csv('composite_output')
data.info()
data.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 208 entries, 0 to 207
Data columns (total 15 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   age               208 non-null    int64  
 1   sex               208 non-null    int64  
 2   cp                208 non-null    int64  
 3   trtbps            208 non-null    int64  
 4   chol              208 non-null    int64  
 5   fbs               208 non-null    int64  
 6   restecg           208 non-null    int64  
 7   thalachh          208 non-null    int64  
 8   exng              208 non-null    int64  
 9   oldpeak           208 non-null    float64
 10  slp               208 non-null    int64  
 11  caa               208 non-null    int64  
 12  thall             208 non-null    int64  
 13  output            208 non-null    int64  
 14  NEW_COLUMN_VALUE  208 non-null    object 
dtypes: float64(1), int64(13), object(1)
memory usage: 24.5+ KB


Unnamed: 0,age,sex,cp,trtbps,chol,fbs,restecg,thalachh,exng,oldpeak,slp,caa,thall,output,NEW_COLUMN_VALUE
0,63,1,3,145,233,1,0,150,0,2.3,0,0,1,1,NEW_COLUMN_VALUE
1,56,1,1,120,236,0,1,178,0,0.8,2,0,2,1,NEW_COLUMN_VALUE
2,57,0,0,120,354,0,1,163,1,0.6,2,0,2,1,NEW_COLUMN_VALUE
3,57,1,0,140,192,0,1,148,0,0.4,1,0,1,1,NEW_COLUMN_VALUE
4,56,0,1,140,294,0,0,153,0,1.3,1,0,2,1,NEW_COLUMN_VALUE
