forked from oracle/python-cx_Oracle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_3000_subscription.py
123 lines (101 loc) · 4.32 KB
/
test_3000_subscription.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#------------------------------------------------------------------------------
# Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
#------------------------------------------------------------------------------
"""
3000 - Module for testing subscriptions
"""
import test_env
import cx_Oracle as oracledb
import threading
class SubscriptionData(object):
def __init__(self, num_messages_expected):
self.condition = threading.Condition()
self.num_messages_expected = num_messages_expected
self.num_messages_received = 0
self.table_operations = []
self.row_operations = []
self.rowids = []
def CallbackHandler(self, message):
if message.type != oracledb.EVENT_DEREG:
table, = message.tables
self.table_operations.append(table.operation)
for row in table.rows:
self.row_operations.append(row.operation)
self.rowids.append(row.rowid)
self.num_messages_received += 1
if message.type == oracledb.EVENT_DEREG or \
self.num_messages_received == self.num_messages_expected:
self.condition.acquire()
self.condition.notify()
self.condition.release()
class TestCase(test_env.BaseTestCase):
def test_3000_subscription(self):
"3000 - test Subscription for insert, update, delete and truncate"
# skip if running on the Oracle Cloud, which does not support
# subscriptions currently
if self.is_on_oracle_cloud():
message = "Oracle Cloud does not support subscriptions currently"
self.skipTest(message)
# truncate table in order to run test in known state
self.cursor.execute("truncate table TestTempTable")
# expected values
table_operations = [
oracledb.OPCODE_INSERT,
oracledb.OPCODE_UPDATE,
oracledb.OPCODE_INSERT,
oracledb.OPCODE_DELETE,
oracledb.OPCODE_ALTER | oracledb.OPCODE_ALLROWS
]
row_operations = [
oracledb.OPCODE_INSERT,
oracledb.OPCODE_UPDATE,
oracledb.OPCODE_INSERT,
oracledb.OPCODE_DELETE
]
rowids = []
# set up subscription
data = SubscriptionData(5)
connection = test_env.get_connection(threaded=True, events=True)
sub = connection.subscribe(callback=data.CallbackHandler,
timeout=10, qos=oracledb.SUBSCR_QOS_ROWIDS)
sub.registerquery("select * from TestTempTable")
connection.autocommit = True
cursor = connection.cursor()
# insert statement
cursor.execute("""
insert into TestTempTable (IntCol, StringCol)
values (1, 'test')""")
cursor.execute("select rowid from TestTempTable where IntCol = 1")
rowids.extend(r for r, in cursor)
# update statement
cursor.execute("""
update TestTempTable set
StringCol = 'update'
where IntCol = 1""")
cursor.execute("select rowid from TestTempTable where IntCol = 1")
rowids.extend(r for r, in cursor)
# second insert statement
cursor.execute("""
insert into TestTempTable (IntCol, StringCol)
values (2, 'test2')""")
cursor.execute("select rowid from TestTempTable where IntCol = 2")
rowids.extend(r for r, in cursor)
# delete statement
cursor.execute("delete TestTempTable where IntCol = 2")
rowids.append(rowids[-1])
# truncate table
cursor.execute("truncate table TestTempTable")
# wait for all messages to be sent
data.condition.acquire()
data.condition.wait(10)
# verify the correct messages were sent
self.assertEqual(data.table_operations, table_operations)
self.assertEqual(data.row_operations, row_operations)
self.assertEqual(data.rowids, rowids)
# test string format of subscription object is as expected
fmt = "<cx_Oracle.Subscription on <cx_Oracle.Connection to %s@%s>>"
expected = fmt % \
(test_env.get_main_user(), test_env.get_connect_string())
self.assertEqual(str(sub), expected)
if __name__ == "__main__":
test_env.run_test_cases()