Permalink
Cannot retrieve contributors at this time
Join GitHub today
GitHub is home to over 31 million developers working together to host and review code, manage projects, and build software together.
Sign up
Fetching contributors…

#------------------------------------------------------------------------------ | |
# Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved. | |
# | |
# Portions Copyright 2007-2015, Anthony Tuininga. All rights reserved. | |
# | |
# Portions Copyright 2001-2007, Computronix (Canada) Ltd., Edmonton, Alberta, | |
# Canada. All rights reserved. | |
#------------------------------------------------------------------------------ | |
#------------------------------------------------------------------------------ | |
# CQN.py | |
# This script demonstrates using continuous query notification in Python, a | |
# feature that is available in Oracle 11g and later. Once this script is | |
# running, use another session to insert, update or delete rows from the table | |
# cx_Oracle.TestTempTable and you will see the notification of that change. | |
# | |
# This script requires cx_Oracle 5.3 and higher. | |
#------------------------------------------------------------------------------ | |
from __future__ import print_function | |
import cx_Oracle | |
import SampleEnv | |
import threading | |
import time | |
registered = True | |
def callback(message): | |
global registered | |
print("Message type:", message.type) | |
if not message.registered: | |
print("Deregistration has taken place...") | |
registered = False | |
return | |
print("Message database name:", message.dbname) | |
print("Message tranasction id:", message.txid) | |
print("Message queries:") | |
for query in message.queries: | |
print("--> Query ID:", query.id) | |
print("--> Query Operation:", query.operation) | |
for table in query.tables: | |
print("--> --> Table Name:", table.name) | |
print("--> --> Table Operation:", table.operation) | |
if table.rows is not None: | |
print("--> --> Table Rows:") | |
for row in table.rows: | |
print("--> --> --> Row RowId:", row.rowid) | |
print("--> --> --> Row Operation:", row.operation) | |
print("-" * 60) | |
print("=" * 60) | |
connection = cx_Oracle.connect(SampleEnv.GetMainConnectString(), events = True) | |
sub = connection.subscribe(callback = callback, timeout = 1800, | |
qos = cx_Oracle.SUBSCR_QOS_QUERY | cx_Oracle.SUBSCR_QOS_ROWIDS) | |
print("Subscription:", sub) | |
print("--> Connection:", sub.connection) | |
print("--> Callback:", sub.callback) | |
print("--> Namespace:", sub.namespace) | |
print("--> Protocol:", sub.protocol) | |
print("--> Timeout:", sub.timeout) | |
print("--> Operations:", sub.operations) | |
print("--> Rowids?:", bool(sub.qos & cx_Oracle.SUBSCR_QOS_ROWIDS)) | |
queryId = sub.registerquery("select * from TestTempTable") | |
print("Registered query:", queryId) | |
while registered: | |
print("Waiting for notifications....") | |
time.sleep(5) | |