In [4]:
!pip install -q apache-beam

In [69]:
import pandas as pd
import csv
import apache_beam as beam
import time
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, SlidingWindows, TimestampedValue
from apache_beam.transforms.trigger import AccumulationMode, AfterWatermark, AfterCount

In [5]:
from google.colab import files
uploaded = files.upload()

Saving bank_marketing_dataset.csv to bank_marketing_dataset.csv


In [8]:
df = pd.read_csv('bank_marketing_dataset.csv')
df.head()

Unnamed: 0,age,job,marital,education,default,housing,loan,contact,month,day_of_week,...,campaign,pdays,previous,poutcome,emp.var.rate,cons.price.idx,cons.conf.idx,euribor3m,nr.employed,subscribed
0,56,housemaid,married,basic.4y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
1,57,services,married,high.school,unknown,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
2,37,services,married,high.school,no,yes,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
3,40,admin.,married,basic.6y,no,no,no,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no
4,56,services,married,high.school,no,no,yes,telephone,may,mon,...,1,999,0,nonexistent,1.1,93.994,-36.4,4.857,5191.0,no


In [12]:
print(df['marital'].unique())

['married' 'single' 'divorced' 'unknown']


In [21]:
df_100 = df.head(100)

In [25]:
class PreprocessBankData(beam.PTransform):
    def expand(self, pcoll):
      return (
          pcoll
          | 'Encode Education' >> beam.Map(self.encode_education)
          | 'Encode Marital Status' >> beam.Map(self.encode_marital)
      )

    @staticmethod
    def encode_education(element):
        education_map = {
            'illiterate': 0, 'basic.4y': 1, 'basic.6y': 2, 'basic.9y': 3,
            'high.school': 4, 'professional.course': 5, 'university.degree': 6, 'unknown': -1
        }
        element['education'] = education_map.get(element['education'], -1)
        return element

    @staticmethod
    def encode_marital(element):
        marital_map = {'married': 1, 'single': 0, 'divorced': -1, 'unknown': 0}
        element['marital'] = marital_map.get(element['marital'], 0)
        return element

with beam.Pipeline() as pipeline:
    (pipeline
     | 'Create Data' >> beam.Create(df_100.to_dict(orient='records'))
     | 'Preprocess Data' >> PreprocessBankData()
     | 'Print Results' >> beam.Map(print))

{'age': 56, 'job': 'housemaid', 'marital': 1, 'education': 1, 'default': 'no', 'housing': 'no', 'loan': 'no', 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 261, 'campaign': 1, 'pdays': 999, 'previous': 0, 'poutcome': 'nonexistent', 'emp.var.rate': 1.1, 'cons.price.idx': 93.994, 'cons.conf.idx': -36.4, 'euribor3m': 4.857, 'nr.employed': 5191.0, 'subscribed': 'no'}
{'age': 57, 'job': 'services', 'marital': 1, 'education': 4, 'default': 'unknown', 'housing': 'no', 'loan': 'no', 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 149, 'campaign': 1, 'pdays': 999, 'previous': 0, 'poutcome': 'nonexistent', 'emp.var.rate': 1.1, 'cons.price.idx': 93.994, 'cons.conf.idx': -36.4, 'euribor3m': 4.857, 'nr.employed': 5191.0, 'subscribed': 'no'}
{'age': 37, 'job': 'services', 'marital': 1, 'education': 4, 'default': 'no', 'housing': 'yes', 'loan': 'no', 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 226, 'campaign': 1, 'pda

In [41]:
def parse_csv(line):
    for row in csv.reader([line]):
        return {
                'age': int(row[0]),
                'job': row[1],
                'marital': row[2],
                'education': row[3],
                'subscribed': row[-1]
        }

def format_csv(element):
    # Return a properly formatted CSV line as a string
    return f"{element['age']},{element['job']},{element['marital']},{element['education']},{element['subscribed']}"

with beam.Pipeline() as pipeline:
    header = 'age,job,marital,education,subscribed'

    # Write to CSV with headers
    (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText('bank_marketing_dataset.csv', skip_header_lines=1)
        | 'Parse CSV' >> beam.Map(parse_csv)
        | 'Filter None' >> beam.Filter(lambda x: x is not None)
        | 'Filter Subscribed' >> beam.Filter(lambda x: x['subscribed'] == 'yes')
        | 'Format as CSV' >> beam.Map(format_csv)
        | 'Write to CSV' >> beam.io.WriteToText(
            'subscribed',
            file_name_suffix='.csv',
            header=header
        )
    )

In [67]:
with beam.Pipeline() as pipeline:
    (pipeline
     | 'Create PCollection' >> beam.Create(df.to_dict(orient='records'))
     | 'Filter Subscribed' >> beam.Filter(lambda x: x.get('subscribed') == 'yes')
     | 'Apply Fixed Windows' >> beam.WindowInto(
            FixedWindows(5),
            trigger=AfterWatermark(early=AfterCount(1)),
            accumulation_mode=AccumulationMode.ACCUMULATING
        )
     | 'Count Subscribed' >> beam.combiners.Count.Globally().without_defaults()
     | 'Print Results' >> beam.Map(print))

4640
4640


In [70]:
with beam.Pipeline() as pipeline:
    (pipeline
     | 'Create PCollection' >> beam.Create(df.to_dict(orient='records'))
     | 'Filter Subscribed' >> beam.Filter(lambda x: x.get('subscribed') == 'yes')
     | 'With Timestamps' >> beam.Map(lambda x: TimestampedValue(x, time.time()))
     | 'Apply Sliding Windows' >> beam.WindowInto(
            SlidingWindows(size=10, period=5),
            trigger=AfterWatermark(early=AfterCount(1)),
            accumulation_mode=AccumulationMode.ACCUMULATING
        )
     | 'Count Subscribed' >> beam.combiners.Count.Globally().without_defaults()
     | 'Print Results' >> beam.Map(print))

4640
4640
4640
4640


In [72]:
class EncodeLoanData(beam.DoFn):
    def process(self, element):
        element['housing'] = 1 if element['housing'] == 'yes' else 0
        element['loan'] = 1 if element['loan'] == 'yes' else 0
        yield element

with beam.Pipeline() as pipeline:
    (pipeline
     | 'Create Data' >> beam.Create(df_100.to_dict(orient='records'))
     | 'Encode Loan Data' >> beam.ParDo(EncodeLoanData())
     | 'Print Results' >> beam.Map(print))

{'age': 56, 'job': 'housemaid', 'marital': 'married', 'education': 'basic.4y', 'default': 'no', 'housing': 0, 'loan': 0, 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 261, 'campaign': 1, 'pdays': 999, 'previous': 0, 'poutcome': 'nonexistent', 'emp.var.rate': 1.1, 'cons.price.idx': 93.994, 'cons.conf.idx': -36.4, 'euribor3m': 4.857, 'nr.employed': 5191.0, 'subscribed': 'no'}
{'age': 57, 'job': 'services', 'marital': 'married', 'education': 'high.school', 'default': 'unknown', 'housing': 0, 'loan': 0, 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 149, 'campaign': 1, 'pdays': 999, 'previous': 0, 'poutcome': 'nonexistent', 'emp.var.rate': 1.1, 'cons.price.idx': 93.994, 'cons.conf.idx': -36.4, 'euribor3m': 4.857, 'nr.employed': 5191.0, 'subscribed': 'no'}
{'age': 37, 'job': 'services', 'marital': 'married', 'education': 'high.school', 'default': 'no', 'housing': 1, 'loan': 0, 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon'

In [74]:
df_1000 = df.head(1000)

In [75]:
class SimulateStream(beam.DoFn):
    def process(self, element):
        time.sleep(1)
        yield TimestampedValue(element, time.time())

class PrintElements(beam.DoFn):
    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        print(f"Element: {element}, Timestamp: {timestamp.to_utc_datetime()}")
        yield element

with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(streaming=True)) as pipeline:
    (
        pipeline
        | 'Create PCollection' >> beam.Create(df_1000.to_dict(orient='records'))
        | 'Simulate Streaming' >> beam.ParDo(SimulateStream())
        | 'Filter Subscribed' >> beam.Filter(lambda x: x.get('subscribed') == 'yes')
        | 'Apply Fixed Windows' >> beam.WindowInto(
            FixedWindows(5),
            trigger=AfterWatermark(early=AfterCount(1)),
            accumulation_mode=AccumulationMode.ACCUMULATING
        )
        | 'Print on Emission' >> beam.ParDo(PrintElements())
    )



Element: {'age': 41, 'job': 'blue-collar', 'marital': 'divorced', 'education': 'basic.4y', 'default': 'unknown', 'housing': 'yes', 'loan': 'no', 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 1575, 'campaign': 1, 'pdays': 999, 'previous': 0, 'poutcome': 'nonexistent', 'emp.var.rate': 1.1, 'cons.price.idx': 93.994, 'cons.conf.idx': -36.4, 'euribor3m': 4.857, 'nr.employed': 5191.0, 'subscribed': 'yes'}, Timestamp: 2024-10-21 00:21:33.760619
Element: {'age': 49, 'job': 'entrepreneur', 'marital': 'married', 'education': 'university.degree', 'default': 'unknown', 'housing': 'yes', 'loan': 'no', 'contact': 'telephone', 'month': 'may', 'day_of_week': 'mon', 'duration': 1042, 'campaign': 1, 'pdays': 999, 'previous': 0, 'poutcome': 'nonexistent', 'emp.var.rate': 1.1, 'cons.price.idx': 93.994, 'cons.conf.idx': -36.4, 'euribor3m': 4.857, 'nr.employed': 5191.0, 'subscribed': 'yes'}, Timestamp: 2024-10-21 00:21:41.768915
Element: {'age': 49, 'job': 'technician', 'marital'