Skip to content

Commit

Permalink
Chg: restart pyinotify if no data comes in.
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Raspaud <martin.raspaud@smhi.se>
  • Loading branch information
mraspaud committed Mar 9, 2015
1 parent 60a8dc7 commit b885ef2
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions trollcast/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,40 +278,66 @@ def read(self, data, f_elev=None):
FORMATS = [CADU, HRPT]


class FileWatcher(object):
class FileWatcher(Thread):

def __init__(self, holder, uri, schedule_reader):

Thread.__init__(self)
self._loop = True
self._error_event = Event()
self._wm = WatchManager()
self._holder = holder
self._uri = uri
self._schedule_reader = schedule_reader
self._notifier = ThreadedNotifier(self._wm,
_EventHandler(holder, uri,
schedule_reader))
_EventHandler(self._holder,
self._uri,
self._schedule_reader,
self._error_event))
self._path, self._pattern = os.path.split(urlparse(uri).path)

def start(self):
"""Start the file watcher
"""
self._notifier.start()
self._wm.add_watch(self._path, IN_OPEN | IN_CLOSE_WRITE | IN_MODIFY)
Thread.start(self)

def run(self):
while self._loop:
if self._error_event.wait(1):
self._error_event.clear()
self._notifier.stop()
del self._notifier
self._notifier = ThreadedNotifier(
self._wm,
_EventHandler(self._holder,
self._uri,
self._schedule_reader,
self._error_event))
self._notifier.start()
self._wm.add_watch(
self._path, IN_OPEN | IN_CLOSE_WRITE | IN_MODIFY)

def stop(self):
"""Stop the file watcher
"""
self._notifier.stop()
self._loop = False


class _EventHandler(ProcessEvent):

"""Watch files
"""

def __init__(self, holder, uri, schedule_reader):
def __init__(self, holder, uri, schedule_reader, error_event):
ProcessEvent.__init__(self)
self._holder = holder
self._uri = uri
self._current_pass = None
self._schedule_reader = schedule_reader
self._loop = True
self._error_event = error_event

self._path, self._pattern = os.path.split(urlparse(self._uri).path)

Expand All @@ -322,33 +348,35 @@ def __init__(self, holder, uri, schedule_reader):
self.sat = None
self.time = None
self.current_event = None
self._current_pass_timer = None
self._pass_end_timer = None

if self._schedule_reader.next_pass:
next_pass_in = (self._schedule_reader.next_pass[0]
- datetime.utcnow())
if next_pass_in.seconds > 0:
self._timer = Timer(next_pass_in.seconds + 5,
logger.error,
["Reception expected but not started"])
self.error)
self._timer.start()

def start_receiving(self, event):
def error(self):
logger.critical("Reception expected but not started")
self._error_event.set()

def set_reception_active(self, event):
self._receiving = True
if self._timer is not None:
self._timer.cancel()
if self._current_pass_timer is not None:
self._current_pass_timer.cancel()
self._current_pass_timer = Timer(60, self.clean_up, event)
if self._pass_end_timer is not None:
self._pass_end_timer.cancel()
self._pass_end_timer = Timer(60, self.clean_up, event)

def stop_receiving(self):
self._receiving = False
if self._schedule_reader.next_pass:
next_pass_in = (self._schedule_reader.next_pass[0]
- datetime.utcnow())
self._timer = Timer(next_pass_in.seconds + 5,
logger.error,
["Reception expected but not started"])
self.error)
self._timer.start()

def _reader(self, pathname, current_pass):
Expand Down Expand Up @@ -421,7 +449,7 @@ def process_IN_OPEN(self, event):
except KeyError:
logger.info("Could not retrieve satellite name from filename")

self.start_receiving(event)
self.set_reception_active(event)
return self._fp is not None

def process_IN_MODIFY(self, event):
Expand All @@ -438,7 +466,7 @@ def process_IN_MODIFY(self, event):
if not fnmatch(fname, globify(self._pattern)):
return

self.start_receiving(event)
self.set_reception_active(event)

for sat, key, elevation, qual, data in self._reader(event.pathname,
self._current_pass):
Expand Down

0 comments on commit b885ef2

Please sign in to comment.