In [None]:
!pip install apache-beam[interactive]

Collecting apache-beam[interactive]
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[interactive])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[interactive])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[interactive])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdo

# Simple Batch Pipeline

####  This example reads a CSV file from the sample_data directory, processes each line with a process_line function, and writes the output to a text file.

In [None]:
import apache_beam as beam


# this is just a simple function to process the data. it's just one line
def process_line(line):
  elements = line.split(',')

  return f"Name: {elements[0]}, Age: {elements[1]}"


# create a Beam pipline using DirectRunner

with beam.Pipeline() as pipeline:

    lines = (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText('train.csv')
        | 'Process Lines' >> beam.Map(process_line)
        | 'Write Output' >> beam.io.WriteToText('output.txt')
    )



In [None]:
class ProcessData(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Map Elements' >> beam.Map(lambda x: x.upper())  # Convert to uppercase
            | 'Filter Elements' >> beam.Filter(lambda x: 'A' in x)  # Filter lines containing 'A'
        )

# Use the composite transform in a pipeline
with beam.Pipeline() as pipeline:
    output = (
        pipeline
        | 'Read Text File' >> beam.io.ReadFromText('test.csv')
        | 'Process Data' >> ProcessData()
        | 'Write to Output' >> beam.io.WriteToText('processed_output.txt')
    )


In [None]:
with beam.Pipeline() as pipeline:
    input_data = (
        pipeline
        | 'Read CSV' >> beam.io.ReadFromText('test.csv')
        | 'Write to Output' >> beam.io.WriteToText('csv_output.txt')
    )


In [None]:
with beam.Pipeline() as pipeline:
    events = (
        pipeline
        | 'Create Events' >> beam.Create([
            ('event1', '2024-10-01 12:01:00'),
            ('event2', '2024-10-01 12:02:00'),
            ('event3', '2024-10-01 12:04:00'),
        ])
        | 'Window into Fixed Intervals' >> beam.WindowInto(
            beam.window.FixedWindows(60)  # 60-second fixed windows
        )
        | 'Print Results' >> beam.Map(print)
    )
