Skip to content

Commit

Permalink
Create a stub class and singleton to represent main
Browse files Browse the repository at this point in the history
This is an updated patch from pull request #16 which will throw an
exception if coro is called without starting the event_loop(). The
main update is to bring it up to date with recent changes and make
the linux poller throw the correct exception in this case.
  • Loading branch information
markpeek committed Jul 23, 2014
1 parent 60c78a3 commit 218fd02
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 28 deletions.
44 changes: 25 additions & 19 deletions coro/_coro.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ include "fifo.pyx"

import sys

class YieldFromMain (Exception):
"attempt to yield from main"
pass

class ScheduleError (Exception):
"attempt to schedule an already-scheduled coroutine"
pass
Expand Down Expand Up @@ -192,7 +196,8 @@ cdef extern int SHRAP_STACK_PAD
# forward
#cdef public class sched [ object sched_object, type sched_type ]
cdef public class queue_poller [ object queue_poller_object, type queue_poller_type ]
cdef sched the_scheduler "the_scheduler"
cdef sched the_scheduler "_the_scheduler"
cdef main_stub the_main_coro "the_main_coro"
cdef queue_poller the_poller "the_poller"

cdef int default_selfishness
Expand Down Expand Up @@ -318,7 +323,7 @@ cdef public class coro [ object _coro_object, type _coro_type ]:
# save exception data
self.save_exception_data()
if not self.dead:
the_scheduler._current = None
the_scheduler._current = the_main_coro
the_scheduler._last = self
else:
# Beware. When this coroutine is 'dead', it's about to __swap()
Expand Down Expand Up @@ -684,6 +689,17 @@ cdef public class coro [ object _coro_object, type _coro_type ]:

self.waiting_joiners.wait()

cdef class main_stub (coro):
"""This class serves only one purpose - to catch attempts at yielding() from main,
which almost certainly means someone forgot to run inside the event loop."""

def __init__ (self):
self.name = b'main/scheduler'
self.id = -1

cdef __yield (self):
raise YieldFromMain ("is the event loop running?")

def get_live_coros():
"""Get the number of live coroutines.
Expand Down Expand Up @@ -878,7 +894,6 @@ ELSE:
# ================================================================================

cdef public class sched [ object sched_object, type sched_type ]:

def __init__ (self, stack_size=4*1024*1024):
self.stack_size = stack_size
# tried using mmap & MAP_STACK, always got ENOMEM
Expand All @@ -889,7 +904,7 @@ cdef public class sched [ object sched_object, type sched_type ]:
# <int>self.stack_base,
# <int>self.stack_base + stack_size
# ))
self._current = None
self._current = the_main_coro
self._last = None
self.pending = []
self.staging = []
Expand Down Expand Up @@ -1082,9 +1097,6 @@ cdef public class sched [ object sched_object, type sched_type ]:
"""
cdef timebomb tb
cdef event e
IF CORO_DEBUG:
# can't call with_timeout() from main...
assert self._current is not None

# Negative timeout is treated the same as 0.
if delta < 0:
Expand Down Expand Up @@ -1328,6 +1340,8 @@ IF COMPILE_LIO:
# python. 'global' doesn't do the trick. However, defining global
# functions to access them works...

# singletons
the_main_coro = main_stub()
the_scheduler = sched()
the_poller = queue_poller()
_the_scheduler = the_scheduler
Expand All @@ -1341,15 +1355,8 @@ def print_stderr (s):
:param s: A string to print.
"""
try:
current_thread = current()
if current_thread is None:
thread_id = -1
else:
thread_id = current().thread_id()
output = '%i:\t%s %s' % (
thread_id,
tsc_time_module.now_tsc().ctime(),
s)
timestamp = tsc_time_module.now_tsc().ctime()
output = '%i:\t%s %s' % (current().thread_id(), timestamp, s)
saved_stderr.write(output)
if not output.endswith('\n'):
saved_stderr.write('\n')
Expand Down Expand Up @@ -1433,9 +1440,8 @@ cdef void info(int sig):

co = the_scheduler._current
frame = _PyThreadState_Current.frame
if co:
stdio.fprintf (
stdio.stderr, 'coro %i "%s" at %s: %s %i\n',
if co is not the_main_coro:
stdio.fprintf(stdio.stderr, 'coro %i "%s" at %s: %s %i\n',
co.id,
co.name,
<bytes>frame.f_code.co_filename,
Expand Down
4 changes: 4 additions & 0 deletions coro/linux_poller.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ cdef public class queue_poller [ object queue_poller_object, type queue_poller_t
cdef _register_event(self, event_key ek, unsigned int flags):
cdef int r
cdef epoll_event org_e

if self.ep_fd == -1:
raise YieldFromMain("is the event loop running?")

org_e.data.fd = ek.fd
org_e.events = ek.events | flags

Expand Down
12 changes: 3 additions & 9 deletions coro/local.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

# $Header: //prod/main/ap/shrapnel/coro/local.pyx#3 $

cdef object __dummy_main
__dummy_main = {}

cdef class ThreadLocal:

Expand Down Expand Up @@ -95,13 +93,9 @@ cdef class ThreadLocal:
cdef coro co

co = the_scheduler._current
if co is None:
# Really shouldn't use these until coro is started.
_tdict = __dummy_main
else:
if co._tdict is None:
co._tdict = {}
_tdict = co._tdict
if co._tdict is None:
co._tdict = {}
_tdict = co._tdict
ldict = PyDict_GET_ITEM_SAFE(_tdict, self.key, None)
if ldict is None:
ldict = {}
Expand Down
2 changes: 2 additions & 0 deletions test/runner
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ python test_isemaphore.py
python test_lio.py
python test_local.py
python test_lru.py
python test_main.py
python test_mutex.py
#python test_notify_of_close.py
python test_poller.py
python test_profile.py
python test_read_stream.py
#python test_readv.py
python test_reenter.py
python test_rw_lock.py
python test_semaphore.py
python test_socket.py
Expand Down
16 changes: 16 additions & 0 deletions test/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import coro
import unittest

class CoroMain(unittest.TestCase):
def test_coro_main_connect(self):
s = coro.tcp_sock()
with self.assertRaises(coro.YieldFromMain):
s.connect (('127.0.0.1', 80))

def test_coro_main_yield(self):
s = coro.tcp_sock()
with self.assertRaises(coro.YieldFromMain):
coro.yield_slice()

if __name__ == '__main__':
unittest.main()
21 changes: 21 additions & 0 deletions test/test_reenter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import coro
import unittest

def exit_coro():
coro.sleep_relative(0.1)
coro.set_exit()


class CoroMain(unittest.TestCase):
def test_coro_main(self):
coro.set_print_exit_string(None)

for x in range(4):
try:
coro.spawn(exit_coro)
coro.event_loop()
except SystemExit:
pass

if __name__ == '__main__':
unittest.main()

0 comments on commit 218fd02

Please sign in to comment.