In [None]:
!pip install apache-beam[gcp]


Collecting apache-beam[gcp]
  Downloading apache_beam-2.60.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting crcmod<2.0,>=1.7 (from apache-beam[gcp])
  Downloading crcmod-1.7.tar.gz (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.7/89.7 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting dill<0.3.2,>=0.3.1.1 (from apache-beam[gcp])
  Downloading dill-0.3.1.1.tar.gz (151 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.0/152.0 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting cloudpickle~=2.2.1 (from apache-beam[gcp])
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting fastavro<2,>=0.23.6 (from apache-beam[gcp])
  Downloading fastavro-1.9.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting fasteners<1.0,>=0.

In [None]:
# Create input.txt in the Colab environment
with open('input.txt', 'w') as f:
    f.write("his exercise is to learn apache beam\n")
    f.write("I will understand apache beam after this assignment\n")
    f.write("Lets find out what apache beam is\n")


In [None]:
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.options.pipeline_options import PipelineOptions
import time

# Define a custom composite transform (word count)
class CountWords(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | "Split words" >> beam.FlatMap(lambda x: x.split())
            | "Pair with 1" >> beam.Map(lambda x: (x, 1))
            | "Group and sum" >> beam.CombinePerKey(sum)
        )

# Create a pipeline
options = PipelineOptions()

with beam.Pipeline(options=options) as p:
    # Step 1: Reading from an input text file
    lines = p | "Read Input" >> beam.io.ReadFromText('input.txt')

    # Step 2: Use the custom composite transform for word counting
    word_counts = lines | "Count words" >> CountWords()

    # Step 3: Windowing
    windowed_counts = (
        word_counts
        | "Apply Fixed Window" >> beam.WindowInto(FixedWindows(60))  # 60-second windows
        | "Sum counts in window" >> beam.CombinePerKey(sum)
    )

    # Step 4: Triggers
    windowed_counts = (
        windowed_counts
        | "Trigger after processing" >> beam.WindowInto(
            FixedWindows(60),
            trigger=beam.transforms.trigger.AfterProcessingTime(10),  # 10 seconds
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
        )
    )

    # Step 5: ParDo
    class ProcessWords(beam.DoFn):
        def process(self, element):
            word, count = element
            yield f"Word: {word}, Count: {count}"

    processed = word_counts | "Process Words with ParDo" >> beam.ParDo(ProcessWords())

    # Step 6: Write output to text file Streaming
    processed | "Write Output" >> beam.io.WriteToText('output.txt')







In [None]:
!cat output.txt-00000-of-00001


Word: his, Count: 1
Word: exercise, Count: 1
Word: is, Count: 2
Word: to, Count: 1
Word: learn, Count: 1
Word: apache, Count: 3
Word: beam, Count: 3
Word: I, Count: 1
Word: will, Count: 1
Word: understand, Count: 1
Word: after, Count: 1
Word: this, Count: 1
Word: assignment, Count: 1
Word: Lets, Count: 1
Word: find, Count: 1
Word: out, Count: 1
Word: what, Count: 1
