In [24]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.runners import DataflowRunner
import google.auth
import json

In [2]:
options = pipeline_options.PipelineOptions()

options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(options=options)

In [3]:
messages = (p | "read" >> beam.io.ReadFromPubSub(topic="projects/our-shield-373717/topics/collect-data")
              | "print" >> beam.Map(print))



In [4]:
p.run()

<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f7bd36da3a0>

#### Apache Beam - Dataflow runner

In [1]:
import apache_beam as beam
from apache_beam.runners import DataflowRunner
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

In [2]:
options = pipeline_options.PipelineOptions()

# Set the project to the default project in your current Google Cloud
# environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://another_bucket_pubsub/dataflow'

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

##### Required for Pub/Sub : set the streaming option #####
options.view_as(pipeline_options.StandardOptions).streaming = True

p = beam.Pipeline(options=options)

In [3]:
table_spec = 'our-shield-373717:our-shield-373717.dataset_order.string_table'
table_schema = 'test_values: STRING'

In [4]:
class TextToDict(beam.DoFn):
    def process(self, element):
        print(element)
        print("message: {}".format(element))
        diction = {'test_values': element}
        yield diction

In [5]:
messages = (p | "read" >> beam.io.ReadFromPubSub(topic="projects/our-shield-373717/topics/collect-data")
              | "encode in dict" >> beam.ParDo(TextToDict())
              | "print" >> beam.Map(print))



In [6]:
messages | beam.io.WriteToBigQuery(table_spec, schema = table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

<apache_beam.io.gcp.bigquery.WriteResult at 0x7f79b8c47b50>

#### Pipeline complète fonctionnelle

In [13]:
project = google.auth.default()[1]
project

'our-shield-373717'

In [35]:
topic = "projects/{}/topics/collect-data".format(project)
table = "{}:dataset_order.order_table".format(project)
schema = "customer_id:integer,order_id:integer,order_amount:float"
bucket = "gs://spark_bucket_rbgt"
region="us-east1"

options = PipelineOptions(
    flags={},
    streaming=True,
    project=project,
    region=region,
    staging_location="%s/staging" % bucket,
    temp_location="%s/temp" % bucket
)

p = beam.Pipeline(DataflowRunner(), options=options)

elements = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
              | "To Dict" >> beam.Map(json.loads)) # Example message: {"name": "carlos", 'score': 10, 'timestamp': "2020-03-14 17:29:00.00000"}

elements | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=schema,
                              create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                              write_disposition=BigQueryDisposition.WRITE_APPEND)

pipeline = p.run()


[notice] A new release of pip available: 22.3.1 -> 23.0
[notice] To update, run: /jupyter/.kernels/apache-beam-2.44.0/bin/python -m pip install --upgrade pip

[notice] A new release of pip available: 22.3.1 -> 23.0
[notice] To update, run: /jupyter/.kernels/apache-beam-2.44.0/bin/python -m pip install --upgrade pip


b'{"customer_id": 41.0, "order_id": 7766.0, "order_amount": 67.66}'
b'{"customer_id": 28.0, "order_id": 2239.0, "order_amount": 19.92}'
b'{"customer_id": 74.0, "order_id": 9053.0, "order_amount": 64.47}'
b'{"customer_id": 84.0, "order_id": 5460.0, "order_amount": 47.38}'
b'{"customer_id": 5.0, "order_id": 2441.0, "order_amount": 88.12}'
b'{"customer_id": 33.0, "order_id": 4759.0, "order_amount": 89.42}'
b'{"customer_id": 8.0, "order_id": 6873.0, "order_amount": 38.29}'
b'{"customer_id": 23.0, "order_id": 7702.0, "order_amount": 20.4}'
b'{"customer_id": 60.0, "order_id": 1261.0, "order_amount": 24.44}'
b'{"customer_id": 36.0, "order_id": 3321.0, "order_amount": 66.97}'
b'{"customer_id": 57.0, "order_id": 199.0, "order_amount": 0.94}'
b'{"customer_id": 31.0, "order_id": 8219.0, "order_amount": 25.48}'
b'{"customer_id": 27.0, "order_id": 5312.0, "order_amount": 24.34}'
b'{"customer_id": 25.0, "order_id": 4657.0, "order_amount": 8.23}'
b'{"customer_id": 4.0, "order_id": 2218.0, "order_amou

In [34]:
pipeline.cancel()

'CANCELLING'