Skip to content

Commit

Permalink
Added support for receiving notifications when AQ messages are availa…
Browse files Browse the repository at this point in the history
…ble to be

dequeued.
  • Loading branch information
anthony-tuininga committed May 30, 2018
1 parent 6583cdf commit 2112982
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 165 deletions.
57 changes: 44 additions & 13 deletions doc/src/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,18 @@ Connection Object
This attribute is an extension to the DB API definition.


.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY)
.. method:: Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=OPCODE_ALLOPS, port=0, qos=0, ipAddress=None, groupingClass=0, groupingValue=0, groupingType=cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY, name=None)

Return a new :ref:`subscription object <subscrobj>` using the connection.
Currently the namespace and protocol parameters cannot have any other
meaningful values.
Return a new :ref:`subscription object <subscrobj>` that receives
notifications for events that take place in the database that match the
given parameters.

The namespace parameter specifies the namespace the subscription uses. It
can be one of :data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` or
:data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`.

The protocol parameter specifies the protocol to use when notifications are
sent. Currently the only valid value is :data:`cx_Oracle.SUBSCR_PROTO_OCI`.

The callback is expected to be a callable that accepts a single parameter.
A :ref:`message object <msgobjects>` is passed to this callback whenever a
Expand All @@ -496,11 +503,12 @@ Connection Object

The operations parameter enables filtering of the messages that are sent
(insert, update, delete). The default value will send notifications for all
operations.
operations. This parameter is only used when the namespace is set to
:data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE`.

The port parameter specifies the listening port for callback notifications
from the database server. If not specified, an unused port will be selected
by the database.
by the Oracle Client libraries.

The qos parameter specifies quality of service options. It should be one or
more of the following flags, OR'ed together:
Expand All @@ -510,9 +518,10 @@ Connection Object
:data:`cx_Oracle.SUBSCR_QOS_QUERY`,
:data:`cx_Oracle.SUBSCR_QOS_BEST_EFFORT`.

The ipAddress parameter specifies the IP address (IPv4 or IPv6) to bind for
callback notifications from the database server. If not specified, the
client IP address will be determined by the Oracle Client libraries.
The ipAddress parameter specifies the IP address (IPv4 or IPv6) in standard
string notation to bind for callback notifications from the database
server. If not specified, the client IP address will be determined by the
Oracle Client libraries.

The groupingClass parameter specifies what type of grouping of
notifications should take place. Currently, if set, this value can only be
Expand All @@ -522,18 +531,30 @@ Connection Object
values :data:`cx_Oracle.SUBSCR_GROUPING_TYPE_SUMMARY` (the default) or
:data:`cx_Oracle.SUBSCR_GROUPING_TYPE_LAST`.

The name parameter is used to identify the subscription and is specific to
the selected namespace. If the namespace parameter is
:data:`cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE` then the name is optional and
can be any value. If the namespace parameter is
:data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`, however, the name must be in the
format '<QUEUE_NAME>' for single consumer queues and
'<QUEUE_NAME>:<CONSUMER_NAME>' for multiple consumer queues, and identifies
the queue that will be monitored for messages. The queue name may include
the schema, if needed.

*New in version 6.4:* The parameters ipAddress, groupingClass,
groupingValue and groupingType were added.
groupingValue, groupingType and name were added.

.. note::

This method is an extension to the DB API definition.

.. note::

Do not close the connection before the subscription object is deleted
or the subscription object will not be deregistered in the database.
This is done automatically if connection.close() is never called.
The subscription can be deregistered in the database by calling the
function :meth:`~Connection.unsubscribe()`. If this method is not
called and the connection that was used to create the subscription is
explictly closed using the function :meth:`~Connection.close()`, the
subscription will not be deregistered in the database.


.. attribute:: Connection.tnsentry
Expand All @@ -546,6 +567,16 @@ Connection Object
This attribute is an extension to the DB API definition.


.. method:: Connection.unsubscribe(subscr)

Unsubscribe from events in the database that were originally subscribed to
using :meth:`~Connection.subscribe()`. The connection used to unsubscribe
should be the same one used to create the subscription, or should access
the same database and be connected as the same user name.

.. versionadded:: 6.4


.. attribute:: Connection.username

This read-only attribute returns the name of the user which established the
Expand Down
13 changes: 11 additions & 2 deletions doc/src/module.rst
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,12 @@ values for the :attr:`Message.type` attribute of the messages that are sent
for subscriptions created by the :meth:`Connection.subscribe()` method.


.. data:: EVENT_AQ

This constant is used to specify that one or more messages are available
for dequeuing on the queue specified when the subscription was created.


.. data:: EVENT_DEREG

This constant is used to specify that the subscription has been
Expand Down Expand Up @@ -847,12 +853,15 @@ These constants are extensions to the DB API definition. They are possible
values for the namespace parameter of the :meth:`Connection.subscribe()`
method.

.. data:: SUBSCR_NAMESPACE_AQ

This constant is used to specify that notifications should be sent when a
queue has messages available to dequeue.

.. data:: SUBSCR_NAMESPACE_DBCHANGE

This constant is used to specify that database change notification or query
change notification messages are to be sent. This is the default value and
currently the only value that is supported.
change notification messages are to be sent. This is the default value.


Subscription Protocols
Expand Down
55 changes: 42 additions & 13 deletions doc/src/subscription.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,23 @@ Subscription Object
This attribute was never intended to be exposed.


.. attribute:: Subscription.ipAddress

This read-only attribute returns the IP address used for callback
notifications from the database server. If not set during construction,
this value is None.

.. versionadded:: 6.4


.. attribute:: Subscription.name

This read-only attribute returns the name used to register the subscription
when it was created.

.. versionadded:: 6.4


.. attribute:: Subscription.namespace

This read-only attribute returns the namespace used to register the
Expand All @@ -43,13 +60,6 @@ Subscription Object
subscription.


.. attribute:: Subscription.ipAddress

This read-only attribute returns the IP address used for callback
notifications from the database server. If not set during construction,
this value is None.


.. attribute:: Subscription.port

This read-only attribute returns the port used for callback notifications
Expand Down Expand Up @@ -103,12 +113,6 @@ Message Objects
the notification.


.. attribute:: Message.txid

This read-only attribute returns the id of the transaction that generated
the notification.


.. attribute:: Message.queries

This read-only attribute returns a list of message query objects that give
Expand All @@ -117,6 +121,25 @@ Message Objects
:data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created.


.. attribute:: Message.queueName

This read-only attribute returns the name of the queue which generated the
notification. It will only be populated if the subscription was created
with the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ`.

.. versionadded:: 6.4


.. attribute:: Message.consumerName

This read-only attribute returns the name of the consumer which generated
the notification. It will be populated if the subscription was created with
the namespace :data:`cx_Oracle.SUBSCR_NAMESPACE_AQ` and the queue is a
multiple consumer queue.

.. versionadded:: 6.4


.. attribute:: Message.subscription

This read-only attribute returns the subscription object for which this
Expand All @@ -131,6 +154,12 @@ Message Objects
:data:`~cx_Oracle.SUBSCR_QOS_QUERY` when the subscription was created.


.. attribute:: Message.txid

This read-only attribute returns the id of the transaction that generated
the notification.


.. attribute:: Message.type

This read-only attribute returns the type of message that has been sent.
Expand Down
51 changes: 12 additions & 39 deletions samples/AdvancedQueuing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#------------------------------------------------------------------------------
# Copyright 2016, 2017, Oracle and/or its affiliates. All rights reserved.
# Copyright 2016, 2018, Oracle and/or its affiliates. All rights reserved.
#
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved.
#
Expand All @@ -10,7 +10,7 @@
#------------------------------------------------------------------------------
# AdvancedQueuing.py
# This script demonstrates how to use advanced queuing using cx_Oracle. It
# creates a simple type and enqueues and dequeues a few objects.
# makes use of a simple type and queue created in the sample setup.
#
# This script requires cx_Oracle 5.3 and higher.
#------------------------------------------------------------------------------
Expand All @@ -29,44 +29,17 @@
connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING)
cursor = connection.cursor()

# drop queue table, if present
cursor.execute("""
select count(*)
from user_tables
where table_name = :name""", name = QUEUE_TABLE_NAME)
count, = cursor.fetchone()
if count > 0:
print("Dropping queue table...")
cursor.callproc("dbms_aqadm.drop_queue_table", (QUEUE_TABLE_NAME, True))

# drop type, if present
cursor.execute("""
select count(*)
from user_types
where type_name = :name""", name = BOOK_TYPE_NAME)
count, = cursor.fetchone()
if count > 0:
print("Dropping books type...")
cursor.execute("drop type %s" % BOOK_TYPE_NAME)

# create type
print("Creating books type...")
cursor.execute("""
create type %s as object (
title varchar2(100),
authors varchar2(100),
price number(5,2)
);""" % BOOK_TYPE_NAME)

# create queue table and quueue and start the queue
print("Creating queue table...")
cursor.callproc("dbms_aqadm.create_queue_table",
(QUEUE_TABLE_NAME, BOOK_TYPE_NAME))
cursor.callproc("dbms_aqadm.create_queue", (QUEUE_NAME, QUEUE_TABLE_NAME))
cursor.callproc("dbms_aqadm.start_queue", (QUEUE_NAME,))
# dequeue all existing messages to ensure the queue is empty, just so that
# the results are consistent
booksType = connection.gettype(BOOK_TYPE_NAME)
book = booksType.newobject()
options = connection.deqoptions()
options.wait = cx_Oracle.DEQ_NO_WAIT
messageProperties = connection.msgproperties()
while connection.deq(QUEUE_NAME, options, messageProperties, book):
pass

# enqueue a few messages
booksType = connection.gettype(BOOK_TYPE_NAME)
book1 = booksType.newobject()
book1.TITLE = "The Fellowship of the Ring"
book1.AUTHORS = "Tolkien, J.R.R."
Expand All @@ -76,7 +49,6 @@
book2.AUTHORS = "Rowling, J.K."
book2.PRICE = decimal.Decimal("7.99")
options = connection.enqoptions()
messageProperties = connection.msgproperties()
for book in (book1, book2):
print("Enqueuing book", book.TITLE)
connection.enq(QUEUE_NAME, options, messageProperties, book)
Expand All @@ -88,4 +60,5 @@
options.wait = cx_Oracle.DEQ_NO_WAIT
while connection.deq(QUEUE_NAME, options, messageProperties, book):
print("Dequeued book", book.TITLE)
connection.commit()

47 changes: 47 additions & 0 deletions samples/AdvancedQueuingNotification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#------------------------------------------------------------------------------
# Copyright 2018, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------

#------------------------------------------------------------------------------
# AdvancedQueuingNotification.py
# This script demonstrates using advanced queuing notification. Once this
# script is running, use another session to enqueue a few messages to the
# "BOOKS" queue. This is most easily accomplished by running the
# AdvancedQueuing sample.
#
# This script requires cx_Oracle 6.4 and higher.
#------------------------------------------------------------------------------

from __future__ import print_function

import cx_Oracle
import SampleEnv
import threading
import time

registered = True

def callback(message):
global registered
print("Message type:", message.type)
if message.type == cx_Oracle.EVENT_DEREG:
print("Deregistration has taken place...")
registered = False
return
print("Queue name:", message.queueName)
print("Consumer name:", message.consumerName)

connection = cx_Oracle.Connection(SampleEnv.MAIN_CONNECT_STRING, events = True)
sub = connection.subscribe(namespace = cx_Oracle.SUBSCR_NAMESPACE_AQ,
name = "BOOKS", callback = callback, timeout = 300)
print("Subscription:", sub)
print("--> Connection:", sub.connection)
print("--> Callback:", sub.callback)
print("--> Namespace:", sub.namespace)
print("--> Protocol:", sub.protocol)
print("--> Timeout:", sub.timeout)

while registered:
print("Waiting for notifications....")
time.sleep(5)

16 changes: 16 additions & 0 deletions samples/sql/SetupSamples.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ create or replace type &main_user..udt_Building as object (
);
/

create or replace type &main_user..udt_Book as object (
Title varchar2(100),
Authors varchar2(100),
Price number(5,2)
);
/

-- create tables

create table &main_user..TestNumbers (
Expand Down Expand Up @@ -147,6 +154,15 @@ create table &main_user..Ptab (
mydata varchar(20)
);

-- create queue table and queues for demonstrating advanced queuing
begin
dbms_aqadm.create_queue_table('&main_user..BOOK_QUEUE',
'&main_user..UDT_BOOK');
dbms_aqadm.create_queue('&main_user..BOOKS', '&main_user..BOOK_QUEUE');
dbms_aqadm.start_queue('&main_user..BOOKS');
end;
/

-- populate tables

begin
Expand Down
Loading

0 comments on commit 2112982

Please sign in to comment.