Skip to content

Commit

Permalink
Use BaseMonintorChange
Browse files Browse the repository at this point in the history
Use BaseChangeMonitor class the is inherits from a thread when run on windows,but from process otherwise.

Move the initialization of the database in the run code to avoid cross threading or pickling issues with the couchdb database object.
  • Loading branch information
jpoyau committed Feb 8, 2012
1 parent d94ad54 commit e629957
Showing 1 changed file with 16 additions and 12 deletions.
28 changes: 16 additions & 12 deletions LR/lr/lib/couch_change_monitor/change_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
@author: jpoyau
'''
from multiprocessing import Process, Queue
from multiprocessing import Queue
from threading import Thread
import couchdb
import logging
from threading import Thread
import pprint
from base_change_handler import BaseChangeHandler
from base_change_monitor import BaseChangeMonitor

_DEFAULT_CHANGE_OPTIONS = {'feed': 'continuous',
'include_docs':True}
'include_docs':True}

log = logging.getLogger(__name__)

class MonitorChanges(Process):
class MonitorChanges(BaseChangeMonitor):
"""Class that monitors continously a couchdb database changes to apply a list
of handlers the changes of the database"""
# Number time the run will try to restart listening to the feed after an error without
Expand All @@ -30,8 +31,9 @@ def __init__(self, serverUrl, databaseName, changeHandlers=None, changeOptions
be pickeable and not tied in any way to the calling process otherwise to application
may get unstable.
"""
Process.__init__(self, None, None, "learningRegistryChangeMonitor", args, kwargs)
self._database = couchdb.Server(serverUrl)[databaseName]
BaseChangeMonitor.__init__(self, None, None, "learningRegistryChangeMonitor", args, kwargs)
self._serverUrl = serverUrl
self._databaseName = databaseName
self._callerThread = None
self._addHandlerQueue = Queue()
self._removeHandlerQueue = Queue()
Expand Down Expand Up @@ -127,9 +129,11 @@ def removeHandler(self, handler):
self._removeHandlerQueue.put(handler)

def run(self):
#initialize the database in run side
self._database = couchdb.Server(self._serverUrl)[self._databaseName]
# As long as we are running keep monitoring the change feed for changes.
log.debug("Start monitoring database : {0} changes PID: {1} since:{2}\n\n".format(
str(self._database), self.pid, self._lastChangeSequence))
log.debug("\n\nStart monitoring database : {0} changes PID: {1} since:{2}\n\n".format(
str(self._database), self.monitorId, self._lastChangeSequence))
self._errorCount = 0
while(self.is_alive() and self._errorCount < self._MAX_ERROR_RESTART):
try:
Expand All @@ -145,15 +149,15 @@ def run(self):
log.error("Change monitor for database {0} exceeded max errors\n\n".format(str(self._database)))

def terminate(self):
Process.terminate(self)
BaseChangeMonitor.terminate(self)
log.debug("\n\n------------I got terminated ...---------------\n\n")

def start(self, callerThread=None):
if isinstance(callerThread, Thread):
self._callerThread = callerThread
self._selfTerminatorThread()
Process.start(self)

BaseChangeMonitor.start(self)

if __name__=="__main__":
logging.basicConfig(level=logging.DEBUG)
Expand Down

0 comments on commit e629957

Please sign in to comment.