Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix stackoverflow and logcontexts from linearizer #2532

Merged
merged 1 commit into from
Oct 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions synapse/util/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,35 @@ def queue(self, key):
except:
logger.exception("Unexpected exception in Linearizer")

logger.info("Acquired linearizer lock %r for key %r", self.name, key)
logger.info("Acquired linearizer lock %r for key %r", self.name,
key)

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (There's no particular need for it to happen before we return
# the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place.
yield run_on_reactor()

else:
logger.info("Acquired uncontended linearizer lock %r for key %r",
self.name, key)

@contextmanager
def _ctx_manager():
try:
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
new_defer.callback(None)
with PreserveLoggingContext():
new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)
Expand Down
28 changes: 24 additions & 4 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from synapse.util import async, logcontext
from tests import unittest

from twisted.internet import defer
Expand All @@ -38,7 +37,28 @@ def test_linearizer(self):
with cm1:
self.assertFalse(d2.called)

self.assertTrue(d2.called)

with (yield d2):
pass

def test_lots_of_queued_things(self):
# we have one slow thing, and lots of fast things queued up behind it.
# it should *not* explode the stack.
linearizer = Linearizer()

@defer.inlineCallbacks
def func(i, sleep=False):
with logcontext.LoggingContext("func(%s)" % i) as lc:
with (yield linearizer.queue("")):
self.assertEqual(
logcontext.LoggingContext.current_context(), lc)
if sleep:
yield async.sleep(0)

self.assertEqual(
logcontext.LoggingContext.current_context(), lc)

func(0, sleep=True)
for i in xrange(1, 100):
func(i)

return func(1000)