Permalink
Browse files

1. Added auto processor.

2. Added customizable delay for delay service.
  • Loading branch information...
1 parent 23731ec commit 63067bcbe35352f7e169e05ecf65b05e2487f9f3 @weijia committed Mar 27, 2012
@@ -0,0 +1,91 @@
+'''
+Created on 2012-02-13
+
+@author: Richard
+'''
+import beanstalkc
+import os
+import threading
+
+#from pprint import pprint
+
+import localLibSys
+from localLibs.storage.infoStorage.zippedCollectionWithInfo import zippedCollectionWithInfo
+from localLibs.storage.infoStorage.zippedInfoWithThumb import zippedInfoWithThumb
+from localLibs.localFs.tmpFile import getStorgePathWithDateFolder
+import localLibs.archiver.encryptionStorageBase as encryptionStorageBase
+from beanstalkServiceBaseV2 import beanstalkWorkingThread, beanstalkServiceApp
+import localLibs.objSys.objectDatabaseV3 as objectDatabase
+import wwjufsdatabase.libs.utils.misc as misc
+
+gBeanstalkdServerHost = '127.0.0.1'
+gBeanstalkdServerPort = 11300
+gMonitorServiceTubeName = "monitorQueue"
+gFileListTubeName = "fileListDelayed"
+
+gMaxZippedCollectionSize = 0.005*1024
+
+gZipCollectionRoot = "d:/tmp/generating"
+
+
+class autoProcessServiceThread(beanstalkWorkingThread):
+ def __init__ ( self, inputTubeName, appList):
+ '''
+ Constructor
+ '''
+ super(autoProcessServiceThread, self).__init__(inputTubeName)
+ self.appList
+
+ def processItem(self, job, item):
+ if not (item['monitoringPath'] in self.monitoringList):
+ self.monitoringList.append(item['monitoringPath'])
+ itemObj = self.dbInst.getFsObjFromFullPath(item["fullPath"])
+ #print itemObj["uuid"]
+ addedItemSize = self.storage.addItem(itemObj)
+ #print "zipped size", info.compress_size
+ self.curStorageSize += addedItemSize
+ #print "current size:", self.curStorageSize
+ if self.curStorageSize > gMaxZippedCollectionSize:
+ self.storage.addAdditionalInfo({"monitoringPathList": self.monitoringList})
+ zipFullPath = self.storage.finalizeZipFile()
+ targetPath = getStorgePathWithDateFolder(self.zipCollectionRoot)
+ self.encCopier.copy(zipFullPath, targetPath)
+ self.monitoringList = []
+ #print 'old file zipped, new file created'
+ #TODO: Remove tmp file.
+
+ #All jobs processed completely, return True
+ return True
+ #Return False as jobs are not processed completely
+ return True
+
+
+
+class autoProcessService(beanstalkServiceApp):
+ '''
+ classdocs
+ '''
+ def __init__(self, tubeName = "fileArchiveServiceTubeName"):
+ super(autoProcessService, self).__init__(tubeName)
+ self.taskDict = {}
+
+
+ def processItem(self, job, item):
+ #fullPath = transform.transformDirToInternal(item["fullPath"])
+ #monitoringFullPath = transform.transformDirToInternal(item["monitoringPath"])
+ workingDir = item["workingDir"]
+ misc.ensureDir(workingDir)
+ inputTubeName = item["inputTubeName"]
+ if self.taskDict.has_key(inputTubeName):
+ job.delete()
+ return False
+ t = autoProcessServiceThread(inputTubeName, zippedInfoWithThumb(workingDir))
+ self.taskDict[inputTubeName] = t
+ t.start()
+ return True
+
+
+if __name__ == "__main__":
+ workingDir = "d:/tmp/working"
+ s = autoProcessService()
+ s.startServer()
@@ -0,0 +1,45 @@
+'''
+Created on 2012-02-20
+
+@author: Richard
+'''
+import localLibSys
+import localLibs.services.folderScanner as folderScanner
+import localLibs.services.monitorServiceV2 as monitorService
+from localLibs.services.tubeDelayServiceV2 import tubeDelayService
+from localLibs.services.zippedCollectionListHandlerV2 import zippedCollectionListService
+from localLibs.services.autoProcessService import autoProcessService
+'''
+fileScanner->fileListTube
+monitorServiceV2->fileListTube
+'''
+
+
+gMonitoringPath = "D:\\codes\\nsn\\lte\\ueSim\\latest-uesim-codes"
+gWorkingDir = "D:/tmp/working/autoProcessWorkingDir"
+
+
+def autoProcess(workingDir = gWorkingDir,fullPath = gMonitoringPath):
+ inputTubeName = "collectionListTubeForAutoProcess"
+ delayedCollectionListTubeName = "delayedCollectionListTubeNameForAutoProcess"
+ '''
+ s1 = folderScanner.folderScanner()
+ s1.addItem({"command": "folderScanner", "fullPath":fullPath,
+ "targetTubeName": inputTubeName,"blackList":[]})
+ '''
+ s2 = monitorService.monitorService()
+ s2.addItem({"command": "monitorService", "fullPath":fullPath,
+ "targetTubeName": inputTubeName,"blackList":[]})
+
+ s3 = tubeDelayService()
+ s3.addItem({"inputTubeName":inputTubeName,
+ "outputTubeName": delayedCollectionListTubeName,
+ "delaySeconds": 15,
+ "blackList":[]})
+
+ s4 = autoProcessService()
+ s4.addItem({"inputTubeName":delayedCollectionListTubeName, "appsList":workingDir})
+
+
+if __name__ == "__main__":
+ autoProcess()
@@ -20,10 +20,11 @@
gDefaultTubeDelayServiceTubeName = 'tubeDelayServiceCmdTube'
class tubeDelayThread(beanstalkServiceApp, threading.Thread):
- def __init__ ( self, inputTubeName, outputTubeName, blackList = []):
+ def __init__ ( self, inputTubeName, outputTubeName, delaySeconds, blackList = []):
self.blackList = blackList
self.outputTubeName = outputTubeName
self.itemToProcess = {}
+ self.delaySeconds = delaySeconds
super(tubeDelayThread, self).__init__(inputTubeName)
threading.Thread.__init__(self)
@@ -85,12 +86,16 @@ def processItem(self, job, item):
blackList = item["blackList"]
inputTubeName = item["inputTubeName"]
outputTubeName = item["outputTubeName"]
+ try:
+ delaySeconds = item["delaySeconds"]
+ except KeyError:
+ delaySeconds = gItemDelayTime
if self.taskDict.has_key(inputTubeName):
#Job already exist, delete it
print "job already exist"
job.delete()
return False
- t = tubeDelayThread(inputTubeName, outputTubeName, blackList)
+ t = tubeDelayThread(inputTubeName, outputTubeName, delaySeconds, blackList)
self.taskDict[inputTubeName] = t
t.start()
return True

0 comments on commit 63067bc

Please sign in to comment.