In [1]:
!pip install apache-beam[gcp]



In [6]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterCount, AccumulationMode


# Define a simple DoFn (data processing function)
class FilterRecords(beam.DoFn):
    def process(self, element):
        if element['Clade'] == 'Hadrosauridae':
            yield element

# Define a composite transform
class FilterAndNormalize(beam.PTransform):
    def expand(self, pcoll):
        return (
            pcoll
            | 'Filter' >> beam.ParDo(FilterRecords())
            # Add more transformations here
        )

# Custom ParDo to print the records
class PrintElement(beam.DoFn):
    def process(self, element):
        print(element)

# Create Pipeline options
options = PipelineOptions()

# Create the Pipeline
with beam.Pipeline(options=options) as p:

    # Read and filter records
    records = (
        p
        | 'Read from Text' >> beam.io.ReadFromText('dinosaur.csv', skip_header_lines=1)
        | 'Split CSV' >> beam.Map(lambda line: line.split(','))
        | 'To Dict' >> beam.Map(lambda fields: {"Clade": fields[0], "Genus": fields[1], "Species": fields[2]})
        | 'Filter and Normalize' >> FilterAndNormalize()
    )

    # Windowing and Triggers
    windowed_records = (
    records
    | 'Window' >> beam.WindowInto(
        FixedWindows(60),
        trigger=AfterCount(5),
        accumulation_mode=AccumulationMode.DISCARDING)
    )

    # Write to Text
    _ = (
        windowed_records
        | 'Write to Text' >> beam.io.WriteToText('filtered_dinosaur')
    )

    # Print (for debugging)
    _ = windowed_records | 'Print' >> beam.ParDo(PrintElement())




{'Clade': 'Hadrosauridae', 'Genus': '"""Kritosaurus"""', 'Species': 'australis'}
{'Clade': 'Hadrosauridae', 'Genus': '"“Kritosaurus"""', 'Species': ''}
{'Clade': 'Hadrosauridae', 'Genus': '(indeterminate)', 'Species': ''}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatosaurus', 'Species': 'copei'}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatotitan', 'Species': ''}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatosaurus', 'Species': 'copei'}
{'Clade': 'Hadrosauridae', 'Genus': 'Edmontosaurus', 'Species': ''}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatotitan', 'Species': ''}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatotitan', 'Species': 'copei'}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatotitan', 'Species': 'copei'}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatosaurus', 'Species': 'copei'}
{'Clade': 'Hadrosauridae', 'Genus': 'Diclonius', 'Species': 'mirabilis'}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatotitan', 'Species': 'copei'}
{'Clade': 'Hadrosauridae', 'Genus': 'Anatotitan', 'Species': ''}
{'Clade': 'Hadrosau