Permalink
Browse files

Add new tube delay service codes.

  • Loading branch information...
1 parent 13eb2ff commit 55112dda079e3173e00883b8f697fc8aab5084a7 Richard committed Mar 4, 2012
@@ -9,7 +9,7 @@
#'collectionServiceApp.py',
#'collectionMonitorNonRealtimeNotifierServiceV2',
#'syncXmlRpcServerV2,
- 'tubeDelayService',
+ 'tubeDelayServiceV2',
'monitorServiceV2',
'folderScanner'
]
@@ -0,0 +1,60 @@
+'''
+Created on 2012-02-13
+
+@author: Richard
+'''
+import os
+import time
+import beanstalkc
+import localLibSys
+from localLibs.windows.changeNotifyThread import changeNotifyThread
+import wwjufsdatabase.libs.utils.simplejson as json
+import wwjufsdatabase.libs.utils.transform as transform
+
+
+gBeanstalkdServerHost = '127.0.0.1'
+gBeanstalkdServerPort = 11300
+gMonitorServiceTubeName = "monitorQueue"
+
+
+class beanstalkServiceBase(object):
+ '''
+ classdocs
+ '''
+ def __init__(self, tubeName = gMonitorServiceTubeName):
+ '''
+ Constructor
+ '''
+ self.tubeName = tubeName
+ self.beanstalk = beanstalkc.Connection(host=gBeanstalkdServerHost, port=gBeanstalkdServerPort)
+
+ def addItem(self, itemDict):
+ beanstalk = beanstalkc.Connection(host=gBeanstalkdServerHost, port=gBeanstalkdServerPort)
+ beanstalk.use(self.tubeName)
+ s = json.dumps(itemDict, sort_keys=True, indent=4)
+ job = beanstalk.put(s)
+
+ def watchTube(self):
+ self.beanstalk.watch(self.tubeName)
+ self.beanstalk.ignore('default')
+
+
+class beanstalkServiceApp(beanstalkServiceBase):
+ def startServer(self):
+ self.watchTube()
+ #!!!Not working. Kick all items to active when start, as we bury them in the previous processing
+ #kickedItemNum = beanstalk.kick(gMaxMonitoringItems)
+ #print kickedItemNum
+ while True:
+ job = self.beanstalk.reserve()
+ print "got job", job.body
+ item = json.loads(job.body)
+ print item
+ self.processItem(job, item)
+ def processItem(self, job, item):
+ job.delete()
+
+
+if __name__ == "__main__":
+ s = beanstalkServiceBase()
+ s.startServer()
@@ -51,7 +51,7 @@ def __init__(self, tubeName = gMonitorServiceTubeName):
def processCmd(self, job, item):
fullPath = transform.transformDirToInternal(item["fullPath"])
blackList = item["blackList"]
- targetTube = item["targetTube"]
+ targetTube = item["targetTubeName"]
if not os.path.exists(fullPath) or self.notifyThreads.has_key(fullPath):
job.delete()
t = changeNotifyForBeanstalkd(fullPath, targetTube, blackList)
@@ -6,17 +6,23 @@
import localLibSys
import localLibs.services.folderScanner as folderScanner
import localLibs.services.monitorServiceV2 as monitorService
+from localLibs.services.tubeDelayServiceV2 import tubeDelayService
'''
fileScanner->fileListTube
monitorServiceV2->fileListTube
'''
zippedListTubeName = "zippedListTube"
+delayedZippedInfoListTubeName = "delayedZippedInfoListTube"
if __name__ == "__main__":
s = folderScanner.folderScanner()
- s.addItem({"command": "folderScanner", "fullPath":"d:/tmp/target",
+ s.addItem({"command": "folderScanner", "fullPath":"d:/tmp/generating",
"targetTubeName": zippedListTubeName,"blackList":[]})
s = monitorService.monitorService()
- s.addItem({"command": "folderScanner", "fullPath":"d:/tmp/target",
- "targetTubeName": zippedListTubeName,"blackList":[]})
+ s.addItem({"command": "folderScanner", "fullPath":"d:/tmp/generating",
+ "targetTubeName": zippedListTubeName,"blackList":[]})
+
+ s = tubeDelayService('tubeDelayServiceCmdTube')
+ s.addItem({"inputTubeName":zippedListTubeName,
+ "outputTubeName": delayedZippedInfoListTubeName,"blackList":[]})
@@ -0,0 +1,100 @@
+'''
+Created on 2012-02-13
+
+@author: Richard
+'''
+import os
+import time
+import threading
+import beanstalkc
+import localLibSys
+import wwjufsdatabase.libs.utils.transform as transform
+from beanstalkServiceBaseV2 import beanstalkServiceBase, beanstalkServiceApp
+
+gBeanstalkdServerHost = '127.0.0.1'
+gBeanstalkdServerPort = 11300
+gInputTubeName = "fileListTube"
+gOutputTubeName = "fileListDelayed"
+gItemDelayTime = 5
+gDefaultTtr = 3600*24
+
+
+class tubeDelayThread(beanstalkServiceApp, threading.Thread):
+ def __init__ ( self, inputTubeName, outputTubeName, blackList = []):
+ self.blackList = blackList
+ self.outputTubeName = outputTubeName
+ self.itemToProcess = {}
+ super(tubeDelayThread, self).__init__(inputTubeName)
+ threading.Thread.__init__(self)
+
+ def run(self):
+ self.outputBeanstalk = beanstalkc.Connection(host=gBeanstalkdServerHost, port=gBeanstalkdServerPort)
+ self.outputBeanstalk.use(self.outputTubeName)
+ self.outputBeanstalk.ignore(self.outputTubeName)
+ #self.outputBeanstalk.ignore('default')
+ self.startServer()
+
+ def processItem(self, job, item):
+ monitoringFullPath = transform.transformDirToInternal(item["monitoringPath"])
+ fullPath = transform.transformDirToInternal(item["fullPath"])
+ #Check if item exists in local file sytem
+ if not os.path.exists(fullPath):
+ job.delete()
+ return
+ if not self.itemToProcess.has_key(monitoringFullPath):
+ self.itemToProcess[monitoringFullPath] = {}
+ #############################################
+ # Start processing
+ #############################################
+ #If the full path already in tube, check if the timestamp is updated
+ if self.itemToProcess[monitoringFullPath].has_key(fullPath):
+ savedItem = self.itemToProcess[monitoringFullPath][fullPath]
+ if savedItem["timestamp"] == item["timestamp"]:
+ #Item not updated for time out time, add it to output queue
+ self.outputBeanstalk.put(job.body)
+ print "output item:", item
+ job.delete()
+ elif savedItem["timestamp"] < item["timestamp"]:
+ #Received a new notification for an path, update saved info
+ self.itemToProcess[monitoringFullPath][fullPath] = item
+ job.release(priority = beanstalkc.DEFAULT_PRIORITY, delay = gItemDelayTime)
+ print "item updated"
+ else:
+ job.delete()
+ else:
+ #New notification, add it
+ self.itemToProcess[monitoringFullPath][fullPath] = item
+ #print item, job, gItemDelayTime
+ #priority is necessary to avoid error for requesting priority to be an int in beanstalkc
+ job.release(priority = beanstalkc.DEFAULT_PRIORITY, delay = gItemDelayTime)
+ print "new item added"
+
+
+class tubeDelayService(beanstalkServiceApp):
+ '''
+ classdocs
+ '''
+ def __init__(self, tubeName):
+ super(tubeDelayService, self).__init__(tubeName)
+ self.taskDict = {}
+
+ def processItem(self, job, item):
+ #fullPath = transform.transformDirToInternal(item["fullPath"])
+ #monitoringFullPath = transform.transformDirToInternal(item["monitoringPath"])
+ blackList = item["blackList"]
+ inputTubeName = item["inputTubeName"]
+ outputTubeName = item["outputTubeName"]
+ if not os.path.exists(inputTubeName) or self.taskDict.has_key(inputTubeName):
+ job.delete()
+ t = tubeDelayThread(inputTubeName, outputTubeName, blackList)
+ self.taskDict[inputTubeName] = t
+ t.start()
+ job.delete()
+
+
+
+
+
+if __name__ == "__main__":
+ s = tubeDelayService('tubeDelayServiceCmdTube')
+ s.startServer()
Binary file not shown.

0 comments on commit 55112dd

Please sign in to comment.