Skip to content

Commit

Permalink
Merge pull request #68 from zopefoundation/wrapped-thread-local-208
Browse files Browse the repository at this point in the history
Changed the implementation of ThreadTransactionManager
  • Loading branch information
jimfulton committed Oct 23, 2018
2 parents f63c0aa + ef966bb commit d2b90a7
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 5 deletions.
14 changes: 12 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ that it creates the transactions and keeps track of the current one. Whenever
an application wants to use the transaction machinery, it gets the current
transaction from the transaction manager before starting any operations

The default transaction manager for the transaction package is thread aware.
Each thread is associated with a unique transaction.
The default transaction manager, `transaction.manager`, is thread
local. You use it as a global variable, but every thread has it's own
copy. [#wrapped]_

Application developers will most likely never need to create their own
transaction managers.
Expand Down Expand Up @@ -141,3 +142,12 @@ Additional Documentation
integrations
api
developer


.. [#wrapped] The thread-local transaction manager,
`transaction.manager` wraps a regular transaction manager. You can
get the wrapped transaction manager using the `manager` attribute.
Implementers of data managers can use this **advanced** feature to
allow graceful shutdown from a central/main thread, by having their
`close` methods call `unregisterSynch` on the wrapped transaction
manager they obtained when created or opened.
68 changes: 65 additions & 3 deletions transaction/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,74 @@ def run(self, func=None, tries=3):
return result


class ThreadTransactionManager(TransactionManager, threading.local):
"""Thread-aware transaction manager.
@implementer(ITransactionManager)
class ThreadTransactionManager(threading.local):
"""
Thread-local transaction manager.
A thread-local transaction manager can be used as a global
variable, but has a separate copy for each thread.
Each thread is associated with a unique transaction.
Advanced applications can use the `manager` attribute to get a
wrapped TransactionManager to allow cross-thread calls for
graceful shutdown of data managers.
"""

def __init__(self):
self.manager = TransactionManager()

@property
def explicit(self):
return self.manager.explicit

@explicit.setter
def explicit(self, v):
self.manager.explicit = v

def begin(self):
return self.manager.begin()

def get(self):
return self.manager.get()

def __enter__(self):
return self.manager.__enter__()

def commit(self):
return self.manager.commit()

def abort(self):
return self.manager.abort()

def __exit__(self, t, v, tb):
return self.manager.__exit__(t, v, tb)

def doom(self):
return self.manager.doom()

def isDoomed(self):
return self.manager.isDoomed()

def savepoint(self, optimistic=False):
return self.manager.savepoint(optimistic)

def registerSynch(self, synch):
return self.manager.registerSynch(synch)

def unregisterSynch(self, synch):
return self.manager.unregisterSynch(synch)

def clearSynchs(self):
return self.manager.clearSynchs()

def registeredSynchs(self):
return self.manager.registeredSynchs()

def attempts(self, number=3):
return self.manager.attempts(number)

def run(self, func=None, tries=3):
return self.manager.run(func, tries)

class Attempt(object):

Expand Down
104 changes: 104 additions & 0 deletions transaction/tests/test__manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import mock
import unittest

import zope.interface.verify

from .. import interfaces


class TransactionManagerTests(unittest.TestCase):

Expand All @@ -31,6 +36,10 @@ def _makePopulated(self):
nosub1 = DataObject(mgr, nost=1)
return mgr, sub1, sub2, sub3, nosub1

def test_interface(self):
zope.interface.verify.verifyObject(interfaces.ITransactionManager,
self._makeOne())

def test_ctor(self):
tm = self._makeOne()
self.assertTrue(tm._txn is None)
Expand Down Expand Up @@ -664,6 +673,101 @@ def test_notify_transaction_late_comers(self):
s.beforeCompletion.assert_called_with(t)
s.afterCompletion.assert_called_with(t)

def test_unregisterSynch_on_transaction_manager_from_serparate_thread(self):
# We should be able to get the underlying manager of the thread manager
# cand call methods from other threads.

import threading, transaction

started = threading.Event()
stopped = threading.Event()

synchronizer = self

class Runner(threading.Thread):

def __init__(self):
threading.Thread.__init__(self)
self.manager = transaction.manager.manager
self.setDaemon(True)
self.start()

def run(self):
self.manager.registerSynch(synchronizer)
started.set()
stopped.wait()

runner = Runner()
started.wait()
runner.manager.unregisterSynch(synchronizer)
stopped.set()
runner.join(1)


class TestThreadTransactionManager(unittest.TestCase):

def test_interface(self):
import transaction
zope.interface.verify.verifyObject(interfaces.ITransactionManager,
transaction.manager)

def test_sync_registration_thread_local_manager(self):
import transaction

sync = mock.MagicMock()
sync2 = mock.MagicMock()
self.assertFalse(transaction.manager.registeredSynchs())
transaction.manager.registerSynch(sync)
self.assertTrue(transaction.manager.registeredSynchs())
transaction.manager.registerSynch(sync2)
self.assertTrue(transaction.manager.registeredSynchs())
t = transaction.begin()
sync.newTransaction.assert_called_with(t)
transaction.abort()
sync.beforeCompletion.assert_called_with(t)
sync.afterCompletion.assert_called_with(t)
transaction.manager.unregisterSynch(sync)
self.assertTrue(transaction.manager.registeredSynchs())
transaction.manager.unregisterSynch(sync2)
self.assertFalse(transaction.manager.registeredSynchs())
sync.reset_mock()
transaction.begin()
transaction.abort()
sync.newTransaction.assert_not_called()
sync.beforeCompletion.assert_not_called()
sync.afterCompletion.assert_not_called()

self.assertFalse(transaction.manager.registeredSynchs())
transaction.manager.registerSynch(sync)
transaction.manager.registerSynch(sync2)
t = transaction.begin()
sync.newTransaction.assert_called_with(t)
self.assertTrue(transaction.manager.registeredSynchs())
transaction.abort()
sync.beforeCompletion.assert_called_with(t)
sync.afterCompletion.assert_called_with(t)
transaction.manager.clearSynchs()
self.assertFalse(transaction.manager.registeredSynchs())
sync.reset_mock()
transaction.begin()
transaction.abort()
sync.newTransaction.assert_not_called()
sync.beforeCompletion.assert_not_called()
sync.afterCompletion.assert_not_called()

def test_explicit_thread_local_manager(self):
import transaction.interfaces

self.assertFalse(transaction.manager.explicit)
transaction.abort()
transaction.manager.explicit = True
self.assertTrue(transaction.manager.explicit)
with self.assertRaises(transaction.interfaces.NoTransaction):
transaction.abort()
transaction.manager.explicit = False
transaction.abort()


class AttemptTests(unittest.TestCase):

def _makeOne(self, manager):
Expand Down

0 comments on commit d2b90a7

Please sign in to comment.