* see [Quickstart: Using Client Libraries](https://cloud.google.com/pubsub/docs/quickstart-client-libraries)
* Json key file of service account is located in /Users/ken/Develop/smartq-config/rd2/gke-testbed/smartq
* export GOOGLE_APPLICATION_CREDENTIALS=/Users/ken/Develop/smartq-config/rd2/gke-testbed/smartq/organic-byway-253306-7cf81f31b25e.json
* install client library: `pip install --upgrade google-cloud-pubsub`

In [3]:
#
# Initial Set-Up
#
import os

project_id = "organic-byway-253306"
topic_id = "smartq-20210925-devreq"
subscription_id = "pub-sub-subscriprion-test"
google_credential="/Users/huzhongwei/Development/smartq-config/rd2/gke-testbed/smartq/organic-byway-253306-7cf81f31b25e.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = google_credential

In [4]:
#
# Create subscription programatically
#
# First time execution get the following error:
#  debug_error_string = "{"created":"@1633143199.813087000",
#                         "description":"Error received from peer ipv4:172.217.27.138:443",
#                         "file":"src/core/lib/surface/call.cc",
#                         "file_line":1070,
#                         "grpc_message":"User not authorized to perform this action.",
#                         "grpc_status":7}"
#
# After I add Pub/Sub Editor role to this service account, it suceeds.
#

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path}
    )

print(f"Subscription created: {subscription}")

Subscription created: name: "projects/organic-byway-253306/subscriptions/pub-sub-subscriprion-test"
topic: "projects/organic-byway-253306/topics/smartq-20210925-devreq"
push_config {
}
ack_deadline_seconds: 10
message_retention_duration {
  seconds: 604800
}
expiration_policy {
  ttl {
    seconds: 2678400
  }
}



In [None]:
#
# publish a message to topic
# "projects/organic-byway-253306/topics/smartq-20210925-devreq" is a topic 
# created in production project: organic-byway-253306 
#
# First time execution get the following error:
# debug_error_string = "{"created":"@1633141307.387916000",
#   "description":"Error received from peer ipv4:172.217.160.74:443",
#   "file":"src/core/lib/surface/call.cc",
#   "file_line":1070,
#   "grpc_message":"User not authorized to perform this action.","grpc_status":7}"
#
# After I add Pub/Sub Publisher role to this service account, it suceeds.
#

from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data = f"Message number {n}"
    # Data must be a bytestring
    data = data.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

In [12]:
#
# Receiving messages.
# Before running this code, go to GCP console to create a subscription first.
# The subscription id of my created subscription is "pub-sub-subscriprion-test"
#
# First time execution get the following error:
#   PermissionDenied: 403 User not authorized to perform this action
#
# After I add Pub/Sub Subscriber role to this service account, it suceeds.
#

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# Number of seconds the subscriber should listen for messages
timeout = 900.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Listening for messages on projects/organic-byway-253306/subscriptions/pub-sub-subscriprion-test..

Received Message {
  data: b'Message number 1'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Message number 3'
  ordering_key: ''
  attributes: {}
}.Received Message {
  data: b'Message number 7'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Message number 9'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Message number 6'
  ordering_key: ''
  attributes: {}
}.Received Message {
  data: b'Message number 4'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Message number 2'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Message number 5'
  ordering_key: ''
  attributes: {}
}.
Received Message {
  data: b'Message number 8'
  ordering_key: ''
  attributes: {}
}.




KeyboardInterrupt: 

In [7]:
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    subscriber.delete_subscription(request={"subscription": subscription_path})

print(f"Subscription deleted: {subscription_path}.")

Subscription deleted: projects/organic-byway-253306/subscriptions/pub-sub-subscriprion-test.
