Skip to content

Commit

Permalink
PubSub: adds region tags and updates existing to standard [(#1491)](G…
Browse files Browse the repository at this point in the history
  • Loading branch information
alixhami authored and plamut committed Jul 10, 2020
1 parent 9123079 commit 6f6c60c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 8 deletions.
12 changes: 12 additions & 0 deletions samples/snippets/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

def get_topic_policy(project, topic_name):
"""Prints the IAM policy for the given topic."""
# [START pubsub_get_topic_policy]
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic_name)

Expand All @@ -36,10 +37,12 @@ def get_topic_policy(project, topic_name):
print('Policy for topic {}:'.format(topic_path))
for binding in policy.bindings:
print('Role: {}, Members: {}'.format(binding.role, binding.members))
# [END pubsub_get_topic_policy]


def get_subscription_policy(project, subscription_name):
"""Prints the IAM policy for the given subscription."""
# [START pubsub_get_subscription_policy]
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project, subscription_name)

Expand All @@ -48,10 +51,12 @@ def get_subscription_policy(project, subscription_name):
print('Policy for subscription {}:'.format(subscription_path))
for binding in policy.bindings:
print('Role: {}, Members: {}'.format(binding.role, binding.members))
# [END pubsub_get_subscription_policy]


def set_topic_policy(project, topic_name):
"""Sets the IAM policy for a topic."""
# [START pubsub_set_topic_policy]
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic_name)

Expand All @@ -72,10 +77,12 @@ def set_topic_policy(project, topic_name):

print('IAM policy for topic {} set: {}'.format(
topic_name, policy))
# [END pubsub_set_topic_policy]


def set_subscription_policy(project, subscription_name):
"""Sets the IAM policy for a topic."""
# [START pubsub_set_subscription_policy]
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project, subscription_name)

Expand All @@ -96,10 +103,12 @@ def set_subscription_policy(project, subscription_name):

print('IAM policy for subscription {} set: {}'.format(
subscription_name, policy))
# [END pubsub_set_subscription_policy]


def check_topic_permissions(project, topic_name):
"""Checks to which permissions are available on the given topic."""
# [START pubsub_test_topic_permissions]
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic_name)

Expand All @@ -113,10 +122,12 @@ def check_topic_permissions(project, topic_name):

print('Allowed permissions for topic {}: {}'.format(
topic_path, allowed_permissions))
# [END pubsub_test_topic_permissions]


def check_subscription_permissions(project, subscription_name):
"""Checks to which permissions are available on the given subscription."""
# [START pubsub_test_subscription_permissions]
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project, subscription_name)

Expand All @@ -130,6 +141,7 @@ def check_subscription_permissions(project, subscription_name):

print('Allowed permissions for subscription {}: {}'.format(
subscription_path, allowed_permissions))
# [END pubsub_test_subscription_permissions]


if __name__ == '__main__':
Expand Down
14 changes: 14 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,42 @@

def list_topics(project):
"""Lists all Pub/Sub topics in the given project."""
# [START pubsub_list_topics]
publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project)

for topic in publisher.list_topics(project_path):
print(topic)
# [END pubsub_list_topics]


def create_topic(project, topic_name):
"""Create a new Pub/Sub topic."""
# [START pubsub_create_topic]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_create_topic]


def delete_topic(project, topic_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_topic]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

publisher.delete_topic(topic_path)

print('Topic deleted: {}'.format(topic_path))
# [END pubsub_delete_topic]


def publish_messages(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

Expand All @@ -67,11 +74,13 @@ def publish_messages(project, topic_name):
publisher.publish(topic_path, data=data)

print('Published messages.')
# [END pubsub_quickstart_publisher]


def publish_messages_with_custom_attributes(project, topic_name):
"""Publishes multiple messages with custom attributes
to a Pub/Sub topic."""
# [START pubsub_publish_custom_attributes]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

Expand All @@ -84,11 +93,13 @@ def publish_messages_with_custom_attributes(project, topic_name):
topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')
# [END pubsub_publish_custom_attributes]


def publish_messages_with_futures(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic and prints their
message IDs."""
# [START pubsub_publisher_concurrency_control]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

Expand All @@ -107,10 +118,12 @@ def publish_messages_with_futures(project, topic_name):
for future in futures:
# result() blocks until the message is published.
print(future.result())
# [END pubsub_publisher_concurrency_control]


def publish_messages_with_batch_settings(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic with batch settings."""
# [START pubsub_publisher_batch_settings]
# Configure the batch to publish once there is one kilobyte of data or
# 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
Expand All @@ -127,6 +140,7 @@ def publish_messages_with_batch_settings(project, topic_name):
publisher.publish(topic_path, data=data)

print('Published messages.')
# [END pubsub_publisher_batch_settings]


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions samples/snippets/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def run_quickstart():
# [START pubsub_quickstart]
# [START pubsub_quickstart_create_topic]
# Imports the Google Cloud client library
from google.cloud import pubsub_v1

Expand All @@ -32,7 +32,7 @@ def run_quickstart():
topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_quickstart]
# [END pubsub_quickstart_create_topic]


if __name__ == '__main__':
Expand Down
32 changes: 26 additions & 6 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,29 @@

def list_subscriptions_in_topic(project, topic_name):
"""Lists all subscriptions for a given topic."""
# [START pubsub_list_topic_subscriptions]
subscriber = pubsub_v1.PublisherClient()
topic_path = subscriber.topic_path(project, topic_name)

for subscription in subscriber.list_topic_subscriptions(topic_path):
print(subscription)
# [END pubsub_list_topic_subscriptions]


def list_subscriptions_in_project(project):
"""Lists all subscriptions in the current project."""
# [START pubsub_list_subscriptions]
subscriber = pubsub_v1.SubscriberClient()
project_path = subscriber.project_path(project)

for subscription in subscriber.list_subscriptions(project_path):
print(subscription.name)
# [END pubsub_list_subscriptions]


def create_subscription(project, topic_name, subscription_name):
"""Create a new pull subscription on the given topic."""
# [START pubsub_create_pull_subscription]
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project, topic_name)
subscription_path = subscriber.subscription_path(
Expand All @@ -56,16 +61,16 @@ def create_subscription(project, topic_name, subscription_name):
subscription_path, topic_path)

print('Subscription created: {}'.format(subscription))
# [END pubsub_create_pull_subscription]


def create_push_subscription(project,
topic_name,
subscription_name,
endpoint):
"""Create a new push subscription on the given topic.
For example, endpoint is
"https://my-test-project.appspot.com/push".
"""
"""Create a new push subscription on the given topic."""
# [START pubsub_create_push_subscription]
# endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project, topic_name)
subscription_path = subscriber.subscription_path(
Expand All @@ -79,26 +84,30 @@ def create_push_subscription(project,

print('Push subscription created: {}'.format(subscription))
print('Endpoint for subscription is: {}'.format(endpoint))
# [END pubsub_create_push_subscription]


def delete_subscription(project, subscription_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

subscriber.delete_subscription(subscription_path)

print('Subscription deleted: {}'.format(subscription_path))
# [END pubsub_delete_subscription]


def update_subscription(project, subscription_name, endpoint):
"""
Updates an existing Pub/Sub subscription's push endpoint URL.
Note that certain properties of a subscription, such as
its topic, are not modifiable. For example, endpoint is
"https://my-test-project.appspot.com/push".
its topic, are not modifiable.
"""
# [START pubsub_update_push_configuration]
# endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -122,10 +131,13 @@ def update_subscription(project, subscription_name, endpoint):
print('Subscription updated: {}'.format(subscription_path))
print('New endpoint for subscription is: {}'.format(
result.push_config))
# [END pubsub_update_push_configuration]


def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_async_pull]
# [START pubsub_quickstart_subscriber]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -141,10 +153,13 @@ def callback(message):
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]


def receive_messages_with_custom_attributes(project, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -165,10 +180,12 @@ def callback(message):
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_sync_pull_custom_attributes]


def receive_messages_with_flow_control(project, subscription_name):
"""Receives messages from a pull subscription with flow control."""
# [START pubsub_subscriber_flow_settings]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -187,10 +204,12 @@ def callback(message):
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_flow_settings]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -210,6 +229,7 @@ def callback(message):
'Listening for messages on {} threw an Exception: {}.'.format(
subscription_name, e))
raise
# [END pubsub_subscriber_error_listener]


if __name__ == '__main__':
Expand Down

0 comments on commit 6f6c60c

Please sign in to comment.