In [8]:
from azure.servicebus import (
    ServiceBusReceiveMode,
    NEXT_AVAILABLE_SESSION,
    AutoLockRenewer,
)
from azure.servicebus.aio import ServiceBusClient
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = ""
TOPIC_NAME = ""
SUBSCRIPTION_NAME = ""
credential = DefaultAzureCredential()


async def run():
    async with ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True,
    ) as servicebus_client:
        with servicebus_client.get_subscription_receiver(
            topic_name=TOPIC_NAME,
            ServiceBusReceiveMode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
            session_id=NEXT_AVAILABLE_SESSION,
            subscription_name=SUBSCRIPTION_NAME,
            prefetch_count=20,
        ) as receiver:
            received_msgs = receiver.receive_messages(
                max_message_count=10, max_wait_time=5
            )
            for message in received_msgs:
                receiver.complete_message(message)


print(
    f"Starting Receiving Messages from SB: {FULLY_QUALIFIED_NAMESPACE} TOPIC: {TOPIC_NAME}, Subscription Name: {SUBSCRIPTION_NAME}"
)
print("--------------------------------------------------------------------")
await run()
print("--------------------------------------------------------------------")
print("Completed")

Starting Receiving Messages from SB: sb-ea2-core-jcza6-int.servicebus.windows.net TOPIC: sbt-posting, Subscription Name: sbs-backward-sync-02
--------------------------------------------------------------------


TypeError: 'ServiceBusReceiver' object does not support the context manager protocol

In [25]:
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import (
    ServiceBusReceiveMode,
    NEXT_AVAILABLE_SESSION,
    AutoLockRenewer,
)
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = ""
TOPIC_NAME = ""
SUBSCRIPTION_NAME = ""
credential = DefaultAzureCredential()


async def run():
    async with ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True,
    ) as servicebus_client:
        receiver = servicebus_client.get_subscription_receiver(
            topic_name=TOPIC_NAME,
            subscription_name=SUBSCRIPTION_NAME,
            ServiceBusReceiveMode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
            session_id=NEXT_AVAILABLE_SESSION,
        )

        async with receiver:
            while True:
                received_msgs = await receiver.receive_messages(
                    max_message_count=26, max_wait_time=5
                )
                for msg in received_msgs:
                    await receiver.complete_message(msg)


print(
    f"Starting Receiving Messages from SB: {FULLY_QUALIFIED_NAMESPACE} TOPIC: {TOPIC_NAME}, Subscription Name: {SUBSCRIPTION_NAME}"
)
print("--------------------------------------------------------------------")
await run()
print("--------------------------------------------------------------------")
print("Completed")

Starting Receiving Messages from SB: sb-ea2-core-jcza6-int.servicebus.windows.net TOPIC: sbt-posting, Subscription Name: sbs-backward-sync-02
--------------------------------------------------------------------
--------------------------------------------------------------------
Completed


## Receive and Delete from Deadletter


In [3]:
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusSubQueue
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = ""
TOPIC_NAME = ""
SUBSCRIPTION_NAME = ""
credential = DefaultAzureCredential()


async def run():
    async with ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True,
    ) as servicebus_client:
        dlq_receiver = servicebus_client.get_subscription_receiver(
            topic_name=TOPIC_NAME,
            subscription_name=SUBSCRIPTION_NAME,
            sub_queue=ServiceBusSubQueue.DEAD_LETTER,
            prefetch_count=20,
        )
        completed = False
        async with dlq_receiver:
            while not completed:
                peekMessages = await dlq_receiver.peek_messages(max_message_count=10)
                if not peekMessages:
                    completed = True
                    continue
                received_msgs = await dlq_receiver.receive_messages(
                    max_message_count=26, max_wait_time=5
                )
                for msg in received_msgs:
                    await dlq_receiver.complete_message(msg)


print(
    f"Starting Receiving Messages from Service Bus SB: {FULLY_QUALIFIED_NAMESPACE}, TOPIC: {TOPIC_NAME}, DeadLetter"
)
print("--------------------------------------------------------------------")
await run()
print("--------------------------------------------------------------------")
print("Completed")

Starting Receiving Messages from Service Bus SB: sb-ea2-core-jcza6-int.servicebus.windows.net, TOPIC: sbt-posting
--------------------------------------------------------------------
--------------------------------------------------------------------
Completed


# Send To sbt-Posting with Session Id


In [1]:
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = ""
TOPIC_NAME = ""
credential = DefaultAzureCredential()


async def run():
    async with ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True,
    ) as servicebus_client:
        sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
        with open("single-record.json") as json_file:
            data = json.load(json_file)
            count = 0
            for record in data:
                count += 1
                print(
                    f"{count}: {record['upc']} - {record['unit']} - {record['auditDocumentType']}"
                )
                session_id = f"posting-{record['unit']}-{record['upc']}"
                custom_properties = {"auditDocumentType": record["auditDocumentType"]}
                message = json.dumps(record)
                await send_message(sender, message, session_id, custom_properties)


async def send_message(sender, message, session_id, custom_properties):
    # Create a Service Bus message
    print(session_id)
    message = ServiceBusMessage(
        session_id=session_id, body=message, application_properties=custom_properties
    )
    # send the message to the topic
    resp = await sender.send_messages(message)
    print("Sent message")


print(
    f"Starting Sending Messages to Service Bus SB: {FULLY_QUALIFIED_NAMESPACE}, TOPIC: {TOPIC_NAME}"
)
print("--------------------------------------------------------------------")
await run()
print("--------------------------------------------------------------------")
print("Completed")

Starting Sending Messages to Service Bus SB: sb-ea2-core-6t6td-cert.servicebus.windows.net, TOPIC: sbt-posting
--------------------------------------------------------------------
1: 88692615318 - 653 - productInventory
posting-653-88692615318
Sent message
--------------------------------------------------------------------
Completed


# Send To sbt-posting


In [None]:
import json
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential

FULLY_QUALIFIED_NAMESPACE = ""
TOPIC_NAME = ""
credential = DefaultAzureCredential()


async def run():
    async with ServiceBusClient(
        fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
        credential=credential,
        logging_enable=True,
    ) as servicebus_client:
        sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
        with open("single-record.json") as json_file:
            data = json.load(json_file)
            count = 0
            for record in data:
                count += 1
                print(
                    f"{count}: {record['upc']} - {record['unit']} - {record['auditDocumentType']}"
                )
                custom_properties = {"auditDocumentType": record["auditDocumentType"]}
                message = json.dumps(record)
                await send_message(sender, message, custom_properties)


async def send_message(sender, message, custom_properties):
    # Create a Service Bus message
    message = ServiceBusMessage(message, application_properties=custom_properties)
    # send the message to the topic
    resp = await sender.send_messages(message)
    print("Sent message")


print(
    f"Starting Sending Messages to Service Bus SB: {FULLY_QUALIFIED_NAMESPACE}, TOPIC: {TOPIC_NAME}"
)
print("--------------------------------------------------------------------")
await run()
print("--------------------------------------------------------------------")
print("Completed")

Starting Sending Messages to Service Bus SB: sb-ea2-core-jcza6-int.servicebus.windows.net, TOPIC: sbt-posting
--------------------------------------------------------------------
1: 886926015423 - 265 - categoryInventory


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000001D2A2322120>


Sent message
--------------------------------------------------------------------
Completed
