In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms.trigger import AfterWatermark, AfterCount
import logging

class PrintMessages(beam.DoFn):
    def process(self, element):
        print(element)
        yield element

class EnrichWithLookup(beam.DoFn):
    def __init__(self, lookup_data):
        self.lookup_data = lookup_data

    def process(self, element, timestamp=beam.DoFn.TimestampParam):
        key = element['msg'].split(',')[0]
        if key in self.lookup_data:
            yield beam.window.TimestampedValue(
                {'msg': f"{element['msg']},{self.lookup_data[key]}"}, timestamp)
        else:
            yield beam.window.TimestampedValue(
                {'msg': f"{element['msg']},Unknown"}, timestamp)

def run():
    pipeline_options = PipelineOptions()

    # Set the Google Cloud project and specify the Dataflow runner
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = 'my-another-394512'
    google_cloud_options.job_name = 'pubsub-to-bq-batch1'
    google_cloud_options.staging_location = 'gs://maniprakash-bucket/staging'
    google_cloud_options.temp_location = 'gs://maniprakash-bucket/temp'
    google_cloud_options.region = 'europe-west2'

    # Enable streaming mode
    pipeline_options.view_as(beam.options.pipeline_options.StandardOptions).streaming = True
    pipeline_options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'

    # Create the pipeline
    pipeline = beam.Pipeline(options=pipeline_options)

    # Define the lookup data as a dictionary within the code
    lookup_data = {
        'key1': 'value1_enriched',
        'key3': 'value3_enriched'
    }

    # Read messages from Pub/Sub topic
    messages = (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription='projects/my-another-394512/subscriptions/my-topic-sub')
        | 'Decode message' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'Format Messages' >> beam.Map(lambda x: {'msg': x})
    )

    # Apply windowing and triggers
    windowed_messages = messages | 'Apply Windowing' >> beam.WindowInto(
        beam.window.FixedWindows(60),  # 1-minute window
        trigger=AfterWatermark(early=AfterCount(10), late=AfterCount(20)),  # Batch trigger
        accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
    )

    # Apply lookup enrichment using ParDo
    enriched_messages = windowed_messages | 'Enrich with Lookup' >> beam.ParDo(
        EnrichWithLookup(lookup_data=lookup_data)
    )

    # Print messages to console
    enriched_messages | 'Print Enriched Messages' >> beam.ParDo(PrintMessages())

    # Write messages to BigQuery
    table_spec = 'my-another-394512.mydataset.pubsub'
    enriched_messages | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
        table_spec,
        schema='msg:STRING',  # Define your schema here
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

    pipeline.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()


  is_streaming_pipeline = p.options.view_as(StandardOptions).streaming
INFO:apache_beam.runners.direct.direct_runner:Running pipeline with DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.


{'msg': 'key1,data1\n,value1_enriched'}
{'msg': 'key2,data2,Unknown'}
{'msg': 'key3,data3,value3_enriched'}
