In [32]:
#Create the Options first


import google.auth
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
#from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

options = PipelineOptions()

#Setting the Options Programmatically
options = PipelineOptions(flags=[])

#set the project to the default project in your current Google Cloud Environment
_, options.view_as(GoogleCloudOptions).project=google.auth.default()

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(StandardOptions).streaming = True

#Set the Google Cloud Region in which Cloud Dataflow run
options.view_as(GoogleCloudOptions).region='us-west1'

#Cloud Storage Location
dataflow_gcs_location='gs://gcp-dataeng-demos-soumya/dataflow'

#Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary
options.view_as(GoogleCloudOptions).staging_location = '{}/staging'.format(dataflow_gcs_location)
#Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting
options.view_as(GoogleCloudOptions).temp_location = '{}/temp'.format(dataflow_gcs_location)
#The directory to store the output files of the job
output_gcs_location = '{}/output'.format(dataflow_gcs_location)

In [33]:
import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka
import json

bootstrap_servers = '34.28.118.32:9094'
topic = 'my-topic'
topic2 = 'my-second-topic'
write_topic = 'my-fifth-topic'


def add_key(x):
    return (x["id"], x)

def extract_value(x):
    return {'Id':x[1]['details'][0]['id'],
            'Time':x[1]['details'][0]['Time'],
            'v1':x[1]['details'][0]['v1'],
            'v2':x[1]['details'][0]['v2'],
            'v3':x[1]['details'][0]['v3'],
            'v4':x[1]['details'][0]['v4'],
            'type':x[1]['type'][0]['type']}


p = beam.Pipeline(options=options)
stage1 = p| "Read From my-topic">>ReadFromKafka(
            consumer_config={'bootstrap.servers': bootstrap_servers},
            topics=[topic])
stage2 = stage1|"decode stream1">>beam.Map(lambda x: x[1].decode('utf8'))
stage3 = stage2|"convert stream1 to dict">>beam.Map(lambda x: json.loads(x
stage3a = stage3|"Add key to stream1">>beam.Map(add_key)
stage4 = stage3a|"window stream1">>beam.WindowInto(beam.window.FixedWindows(60))


stage5 = p| "Read From my-second-topic">>ReadFromKafka(
            consumer_config={'bootstrap.servers': bootstrap_servers},
            topics=[topic2])
stage6 = stage5|"decode stream2">>beam.Map(lambda x: x[1].decode('utf8'))
stage7 = stage6|"convert stream2 to dict">>beam.Map(lambda x: json.loads(x))
stage7a = stage7|"Add key to stream2">>beam.Map(add_key)
stage8 = stage7a|"window stream2">>beam.WindowInto(beam.window.FixedWindows(60))

stage9 = ({'details': stage4, 'type': stage8} | beam.CoGroupByKey()) #merging happens
stage9a = stage9|beam.Map(extract_value)
stage10 = stage9a| "Write to Bigquery">>beam.io.Write(
                                               beam.io.WriteToBigQuery(
                                                                        'my-table-kafka-join',
                                                                        dataset='gcpdataset',
                                                                        project='mimetic-parity-378803',
                                                                        schema='Id:INTEGER,Time:INTEGER,v1:FLOAT,v2:FLOAT,v3:FLOAT,v4:FLOAT,type:STRING',
                                                                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                                        
                                                                      )
                                                       )

pipeline_result = DataflowRunner().run_pipeline(p, options=options)


[notice] A new release of pip available: 22.3.1 -> 23.0.1
[notice] To update, run: /jupyter/.kernels/apache-beam-2.44.0/bin/python -m pip install --upgrade pip

[notice] A new release of pip available: 22.3.1 -> 23.0.1
[notice] To update, run: /jupyter/.kernels/apache-beam-2.44.0/bin/python -m pip install --upgrade pip


It is extremely important to convert format of the message and the program below simulate it. Then you will be able to write correct program.

In [15]:
details=[{"Time":1677408322,"id":55,"prediction":5.886671895114313,"v1":2.5482885634280708,"v2":6.5662471947042995,"v3":3.3788261063647185,"v4":7.512364419803077},
         {"Time":1677408324,"id":56,"prediction":6.852204930810261,"v1":8.397503891009848,"v2":7.711458203146031,"v3":3.5309954258360428,"v4":7.777160683323142},
         {"Time":1677408326,"id":57,"prediction":4.389202073205944,"v1":2.5313847734214967,"v2":5.813404233608326,"v3":4.307676969561568,"v4":3.452699145684147},
         {"Time":1677408328,"id":58,"prediction":4.509872699831276,"v1":2.8418497314679034,"v2":9.736096662787373,"v3":2.3352376634728156,"v4":3.1947427377129163},
         {"Time":1677408330,"id":59,"prediction":5.193421876372719,"v1":3.459238625371251,"v2":3.6355624266369144,"v3":1.2968845222475518,"v4":8.578300429584864},
         {"Time":1677408332,"id":60,"prediction":6.300016729544078,"v1":6.502576845294443,"v2":7.219677016754944,"v3":6.892670719170741,"v4":4.595056064781055},
         {"Time":1677408334,"id":61,"prediction":4.652668234891298,"v1":3.1315997556612185,"v2":1.2480402070898364,"v3":5.008117107285823,"v4":5.718662714303655},
         {"Time":1677408336,"id":62,"prediction":4.99960561785389,"v1":5.567064178446625,"v2":5.7290311917768175,"v3":4.280728153157806,"v4":4.282186289266305},
         {"Time":1677408338,"id":63,"prediction":2.892951987182519,"v1":1.4567589296541619,"v2":3.51945801160021,"v3":1.423785229019623,"v4":3.290622307977935},
         {"Time":1677408340,"id":64,"prediction":6.8995805952053075,"v1":9.301593097252756,"v2":5.448280941097005,"v3":5.255558897397265,"v4":7.50774357010363}]

type=[{"id":55,"type":"Debit"},{"id":60,"type":"Crebit"},
      {"id":56,"type":"Debit"},{"id":61,"type":"Crebit"},
      {"id":57,"type":"Debit"},{"id":62,"type":"Crebit"},
      {"id":58,"type":"Debit"},{"id":63,"type":"Crebit"},
      {"id":59,"type":"Debit"},{"id":64,"type":"Crebit"}
     ]

In [31]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
import json


def add_key(x):
    return (x["id"], x)

def makeRow(x):
    return {'Id':x[1]['details'][0]['id'],
            'Time':x[1]['details'][0]['Time'],
            'v1':x[1]['details'][0]['v1'],
            'v2':x[1]['details'][0]['v2'],
            'v3':x[1]['details'][0]['v3'],
            'v4':x[1]['details'][0]['v4'],
            'type':x[1]['type'][0]['type']}

p = beam.Pipeline(InteractiveRunner(),options=options)

stage1 = p|"create stream1">>beam.Create(details)
stage2 = stage1|"Add key to stream1">>beam.Map(add_key)
stage3 = stage2|"window stream1">>beam.WindowInto(beam.window.FixedWindows(60))

stage4 = p|"create stream2">>beam.Create(type)
stage5 = stage4|"Add key to stream2">>beam.Map(add_key)
stage6 = stage5|"window stream2">>beam.WindowInto(beam.window.FixedWindows(60))


stage7 = ({'details': stage3, 'type': stage6} | beam.CoGroupByKey())
stage8 = stage7|"Make BQ Rows">> beam.Map(makeRow)

ib.show(stage3, include_window_info=True)
ib.show(stage6, include_window_info=True)
ib.show(stage7, include_window_info=True)
ib.show(stage8, include_window_info=True)