Skip to content

Commit

Permalink
[pymorse] mv poll thread class to stream module
Browse files Browse the repository at this point in the history
  • Loading branch information
PierrickKoch committed Nov 25, 2014
1 parent 8b50ddd commit 743e17b
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
30 changes: 9 additions & 21 deletions bindings/pymorse/src/pymorse/pymorse.py
Expand Up @@ -297,7 +297,7 @@ def done(evt):
import threading

from .future import MorseExecutor
from .stream import Stream, StreamJSON
from .stream import Stream, StreamJSON, PollThread

logger = logging.getLogger("pymorse")
logger.setLevel(logging.WARNING)
Expand Down Expand Up @@ -434,20 +434,8 @@ def cancel_all():
condition.notify_all()
del ResponseCallback._conditions[:] # clear list

class PollThread(threading.Thread):
def __init__(self, timeout=0.01):
threading.Thread.__init__(self)
self.keep_polling = True
self.timeout = timeout
def run(self):
while asyncore.socket_map and self.keep_polling:
asyncore.poll(self.timeout, asyncore.socket_map)
def syncstop(self, timeout=None):
self.keep_polling = False
return self.join(timeout)

class Morse(object):
_asyncore_thread = None
poll_thread = None
def __init__(self, host = "localhost", port = 4000):
""" Creates an instance of the MORSE simulator proxy.
Expand All @@ -459,9 +447,9 @@ def __init__(self, host = "localhost", port = 4000):
self.host = host
self.simulator_service = Stream(host, port)
self.simulator_service_id = 0
if not Morse._asyncore_thread:
Morse._asyncore_thread = PollThread()
Morse._asyncore_thread.start()
if not Morse.poll_thread:
Morse.poll_thread = PollThread()
Morse.poll_thread.start()
logger.debug("Morse thread started")
else:
logger.debug("Morse thread was already started")
Expand Down Expand Up @@ -603,14 +591,14 @@ def close(self, cancel_async_services = False, wait_publishers = True):
logger.info('Waiting for all asynchronous requests to complete...')
self.executor.shutdown(wait = True)
# Close all other asyncore sockets (StreanJSON)
if Morse._asyncore_thread:
Morse._asyncore_thread.syncstop(TIMEOUT)
if Morse.poll_thread:
Morse.poll_thread.syncstop(TIMEOUT)
asyncore.close_all()
Morse._asyncore_thread = None # in case we want to re-create
Morse.poll_thread = None # in case we want to re-create
logger.info('Done. Bye bye!')

def spin(self):
Morse._asyncore_thread.join()
Morse.poll_thread.join()


#####################################################################
Expand Down
19 changes: 15 additions & 4 deletions bindings/pymorse/src/pymorse/stream.py
@@ -1,16 +1,15 @@
"""
import asyncore
import threading
from stream import Stream
from stream import Stream, PollThread
s = Stream('python.org', 80)
threading.Thread( target = asyncore.loop, kwargs = {'timeout': .1} ).start()
PollThread().start()
s.is_up()
s.publish("GET /\r\n")
s.get(.5) or s.last()
"""
import json
import socket
import logging
import asyncore
import asynchat
import threading
# Double-ended queue, thread-safe append/pop.
Expand All @@ -22,6 +21,18 @@

MSG_SEPARATOR=b"\n"

class PollThread(threading.Thread):
def __init__(self, timeout=0.01):
threading.Thread.__init__(self)
self.keep_polling = True
self.timeout = timeout
def run(self):
while asyncore.socket_map and self.keep_polling:
asyncore.poll(self.timeout, asyncore.socket_map)
def syncstop(self, timeout=None):
self.keep_polling = False
return self.join(timeout)

class StreamB(asynchat.async_chat):
""" Asynchrone I/O stream handler (raw bytes)
Expand Down

0 comments on commit 743e17b

Please sign in to comment.