In [1]:
!pip install apache-beam[gcp]

Collecting apache-beam[gcp]
  Downloading apache_beam-2.59.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/89.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting orjson<4,>=3.9.7 (from apache-beam[gcp])
  Downloading orjson-3.10.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.4/50.4 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m9.6 MB/s

In [9]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create a sample text file for input
with open('/content/sample-input.txt', 'w') as f:
    f.write("Hello, this is a sample file.\n")
    f.write("The name Manjunath appears here.\n")
    f.write("This is the third line with more data.\n")

input_file = '/content/sample-input.txt'

# Print the contents to verify
with open(input_file, 'r') as f:
    print(f.read())


Hello, this is a sample file.
The name Manjunath appears here.
This is the third line with more data.



In [13]:
#basic example
import apache_beam as beam

def print_element(element):
    print(element)

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create' >> beam.Create([1, 2, 3, 4, 5])
        | 'Print' >> beam.Map(print_element)
    )

1
2
3
4
5


In [23]:
#A Composite Transform is when multiple transformations are combined into a single one. Let's create a composite transform that performs filtering and mapping.
class MultiplyAndAdd(beam.PTransform):
    def expand(self, input_collection):
        return (input_collection
                | 'Multiply by 2' >> beam.Map(lambda x: x * 2)
                | 'Add 5' >> beam.Map(lambda x: x + 5))

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create Numbers' >> beam.Create([1, 2, 3, 4,5,6,7,8])
        | 'Multiply and Add' >> MultiplyAndAdd()
        | 'Print Results' >> beam.Map(print_element)
    )


7
9
11
13
15
17
19
21


In [11]:
# Create a sample text file for input, including your name "Manjunath"
with open('/content/sample-input.txt', 'w') as f:
    f.write("Hello, this is a sample file.\n")
    f.write("The name Manjunath appears here.\n")
    f.write("This is the third line with more data.\n")

# Use this file in your pipeline
input_file = '/content/sample-input.txt'

# Print the contents to verify
with open(input_file, 'r') as f:
    print(f.read())

In [15]:
#Pipeline IO
input_path = '/content/sample-input.txt'  # Ensure you have a sample text file in Colab or upload one
output_path = 'output.txt'

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Read from File' >> beam.io.ReadFromText(input_path)
        | 'Transform Data' >> beam.Map(lambda x: x.upper())
        | 'Write to File' >> beam.io.WriteToText(output_path)
    )


In [17]:
#Triggers and Windowing
import apache_beam.transforms.window as window

def print_window_info(element, window=beam.DoFn.WindowParam):
    print(f'Element: {element} | Window: {window}')

with beam.Pipeline() as pipeline:
    (
        pipeline
        | 'Create Events' >> beam.Create([('event1', 1), ('event2', 2), ('event3', 3)])
        | 'Window' >> beam.WindowInto(window.FixedWindows(10))
        | 'Trigger' >> beam.transforms.combiners.Count.PerElement()
        | 'Print Elements' >> beam.ParDo(print_window_info)
    )

Element: (('event1', 1), 1) | Window: [-9223372036854.775, -9223372036850.0)
Element: (('event2', 2), 1) | Window: [-9223372036854.775, -9223372036850.0)
Element: (('event3', 3), 1) | Window: [-9223372036854.775, -9223372036850.0)


In [22]:
#ParDo Transform
class SplitWords(beam.DoFn):
    def process(self, element):
        words = element.split()
        for word in words:
            yield word

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create Sentences' >> beam.Create(['Apache Beam is powerful and usefull tool in ', 'Data pipelines'])
        | 'Split Words' >> beam.ParDo(SplitWords())
        | 'Print Results' >> beam.Map(print_element)
    )

Apache
Beam
is
powerful
and
usefull
tool
in
Data
pipelines
