Permalink
Browse files

Update file list handler.

  • Loading branch information...
1 parent 2220e90 commit e0a606e33cb2d801e5174c0a10230d42c8e838df Richard committed Feb 25, 2012
View
2 trunk/prodRoot/desktopApp/lib/compress/zipClass.py
@@ -46,6 +46,8 @@ def addfile(self, path, arcname=None):
# file system by this path, check zipfile line 541
#self.zfile.write(encode2Local(path), encode2Local(arcname))
self.zfile.write(encode2Local(path), encode2Local(arcname))
+ #return the info of the newly added file
+ return self.zfile.filelist[-1]
def addfiles(self, paths):
for path in paths:
View
0 trunk/prodRoot/localLibs/localFs/__init__.py
No changes.
View
112 trunk/prodRoot/localLibs/localFs/tmpFile.py
@@ -1,109 +1,19 @@
-import encZipInfoCollectionEnum as encZipInfoCollectionEnum
import localLibSys
import wwjufsdatabase.libs.utils.transform as transform
from localLibs.logSys.logSys import *
-import wwjufsdatabase.libs.utils.simplejson as json
-import desktopApp.lib.compress.zipClass as zipClass
import wwjufsdatabase.libs.utils.fileTools as fileTools
import wwjufsdatabase.libs.utils.misc as misc
import os
-MAX_FILE_CNT_IN_INFO_FILE = 10000
+def getStorgePathWithDateFolder(rootPath, ext = ".enc"):
+ gTimeV = time.gmtime()
+ yearStr = time.strftime("%Y", gTimeV)
+ monthStr = time.strftime("%m", gTimeV)
+ dayStr = time.strftime("%d", gTimeV)
+ dateTimeDir = yearStr+"/"+monthStr+"/"+dayStr
+ newEncDir = unicode(os.path.join(rootPath, dateTimeDir))
+ misc.ensureDir(newEncDir)
+ targetPath = transform.transformDirToInternal(
+ fileTools.getTimestampWithFreeName(newEncDir, ext))
+ return targetPath
-
-class encZipInfoCollection(encZipInfoCollectionEnum.encZipInfoCollectionEnum):
- def __init__(self, collectionId, logCollectionId, workingDir, passwd, dbInst):
- '''
- collectionId is the ID of the enc zip storage collection, it should be the dir path storing
- the enc zip files, and it will be used to retieve all enc zip files.
- '''
- super(encZipInfoCollection, self).__init__(collectionId, logCollectionId, workingDir, passwd, dbInst)
-
- ########################################
- #Internal used vars
- ########################################
- #The info of files in the current zip file
- self.zippedFileInfo = {}
-
- #File count for the current info storage, encZipCollection does
- #not have this
- self.fileCnt = 0
-
- self.zipStorageDir = transform.transformDirToInternal(collectionId)
- misc.ensureDir(self.zipStorageDir)
-
- def store(self, processingObj, pendingCollection):
- '''
- processingObj = {"fullPath": "D:/tmp/good.txt", "size":100}
- '''
- ncl(processingObj)
- #relaPath = transform.formatRelativePath(item.getRelaPath())
- relaPath = processingObj.getIdInCol()
- ncl('Got relaPath')
- if (pendingCollection.has_key(relaPath)) and (pendingCollection[relaPath] != processingObj["uuid"]):
- #Item exists in pending but uuid is not the same, update the uuid for the pending item
- pendingCollection[relaPath] = processingObj["uuid"]
- cl('Added to pending')
- fullPath = transform.transformDirToInternal(processingObj["fullPath"])
-
- #Add the file to zip
- try:
- #If there is already an item with the same name, ignore the current?
- existingElem = self.zippedFileInfo[relaPath]
- return
- except:
- pass
-
-
- if (self.fileCnt > MAX_FILE_CNT_IN_INFO_FILE):
- self.encInfoZip(pendingCollection)
-
- processingObj["parentEncZip"] = self.targetPath.replace(".zip", ".enc")
- self.zippedFileInfo[relaPath] = processingObj.getItemInfo()
- cl('return from store')
-
-
- def encInfoZip(self, pendingCollection):
- ############################
- # Save info for zipped files
- ############################
- logFilePath = transform.transformDirToInternal(
- fileTools.getTimestampWithFreeName(self.workingDir, '.log'))
- s = json.dumps(self.zippedFileInfo, sort_keys=True, indent=4)
- f = open(logFilePath,'w')
- f.write(s)
- f.close()
- logZipPath = logFilePath.replace(u'.log',u'.log.zip')
- logZip = zipClass.ZFile(logZipPath, 'w')
- logZip.addfile(unicode(logFilePath), os.path.basename(logFilePath))
- logZip.close()
-
- gTimeV = time.gmtime()
- yearStr = time.strftime("%Y", gTimeV)
- monthStr = time.strftime("%m", gTimeV)
- dayStr = time.strftime("%d", gTimeV)
- dateTimeDir = yearStr+"/"+monthStr+"/"+dayStr
- newEncDir = unicode(os.path.join(self.zipStorageDir, dateTimeDir))
- misc.ensureDir(newEncDir)
- targetPath = transform.transformDirToInternal(
- fileTools.getTimestampWithFreeName(newEncDir, '.enc'))
- self.encCopier.copy(logZipPath, targetPath.replace('.enc', '.encziplog'))
-
-
- ############################
- # Update state in storage state
- ############################
- self.updateZipLog(self.zippedFileInfo, pendingCollection)
- #Clean the current zipped file info
- self.zippedFileInfo = {}
-
-
-
- def updateZipLog(self, newLog, pendingCollection):
- for i in newLog:
- relaPath = transform.formatRelativePath(i)
- del pendingCollection[relaPath]
- self.logCollection.updateObjUuid(relaPath, newLog[i]["uuid"])
-
- def enumEnd(self, pendingCollection):
- self.encInfoZip(pendingCollection)
View
54 trunk/prodRoot/localLibs/services/fileListHandler.py
@@ -4,16 +4,24 @@
@author: Richard
'''
import beanstalkc
+import os
+
+from pprint import pprint
+
import localLibSys
from localLibs.windows.changeNotifyThread import changeNotifyThread
-import wwjufsdatabase.libs.utils.simplejson as json
-
+import wwjufsdatabase.libs.utils.simplejson as json
+from localLibs.storage.infoStorage.zippedCollectionWithInfo import zippedCollectionWithInfo
+from localLibs.localFs.tmpFile import getStorgePathWithDateFolder
+import desktopApp.lib.archiver.encryptionStorageBase as encryptionStorageBase
gBeanstalkdServerHost = '127.0.0.1'
gBeanstalkdServerPort = 11300
gMonitorServiceTubeName = "monitorQueue"
gFileListTubeName = "fileList"
+gMaxZippedCollectionSize = 0.5*1024
+
class changeNotifyForBeanstalkd(changeNotifyThread):
def callback(self, monitoringPath, fullPath, changeType):
itemDict = {"monitoringPath": monitoringPath, "fullPath": fullPath, "changeType":changeType}
@@ -23,18 +31,23 @@ def callback(self, monitoringPath, fullPath, changeType):
s = json.dumps(itemDict, sort_keys=True, indent=4)
job = beanstalk.put(s)
-
+gZipCollectionRoot = "d:/tmp/generating"
class fileListService(object):
'''
classdocs
'''
- def __init__(self):
+ def __init__(self, zipCollectionRoot = gZipCollectionRoot, passwd = "123", workingDir = "d:/tmp/working"):
'''
Constructor
'''
self.notifyThreads = []
- pass
+ self.storage = zippedCollectionWithInfo(workingDir)
+ self.zipCollectionRoot = zipCollectionRoot
+ self.encCopier = encryptionStorageBase.arc4EncSimpleCopier(passwd)
+ self.decCopier = encryptionStorageBase.arc4DecSimpleCopier(passwd)
+ self.curStorageSize = 0
+ self.addedList = []
def addItem(self, fullPath):
beanstalk = beanstalkc.Connection(host=gBeanstalkdServerHost, port=gBeanstalkdServerPort)
beanstalk.use(gFileListTubeName)
@@ -48,13 +61,36 @@ def startServer(self):
beanstalk.ignore('default')
while True:
job = beanstalk.reserve()
- print "got job", job.body
+ print "got job", job.body
item = json.loads(job.body)
#self.notifyThreads.append(changeNotifyForBeanstalkd(item["fullPath"]))
-
-
+ if not os.path.exists(item["fullPath"]):
+ print 'Path not exists'
+ job.delete()
+ continue
+ info = self.storage.addItem(item["fullPath"])
+ #print "zipped size", info.compress_size
+ self.curStorageSize += info.compress_size
+ self.addedList.append([job, item])
+ if self.curStorageSize > gMaxZippedCollectionSize:
+ zipFullPath = self.storage.finalizeZipFile()
+ targetPath = getStorgePathWithDateFolder(self.zipCollectionRoot)
+ self.encCopier.copy(zipFullPath, targetPath)
+ print 'old file zipped, new file created'
+ #pprint(beanstalk.stats_tube(gFileListTubeName))
+ #print self.addedList
+ for addedJob, addedItem in self.addedList:
+ #print dir(addedJob)
+ #print addedJob.state()
+ try:
+ addedJob.delete()
+ except:
+ pass
+ print "removed addedItem from tube", addedItem
+ self.addedList = []
+ #TODO: Remove tmp file.
if __name__ == "__main__":
- print 'starting fileListHandler'
+ print 'starting fileListHandler'
s = fileListService()
s.startServer()
View
15 trunk/prodRoot/localLibs/services/monitorService.py
@@ -4,11 +4,12 @@
@author: Richard
'''
import os
+import time
import beanstalkc
import localLibSys
from localLibs.windows.changeNotifyThread import changeNotifyThread
-import wwjufsdatabase.libs.utils.simplejson as json
-import wwjufsdatabase.libs.utils.transform as transform
+import wwjufsdatabase.libs.utils.simplejson as json
+import wwjufsdatabase.libs.utils.transform as transform
gBeanstalkdServerHost = '127.0.0.1'
@@ -17,8 +18,10 @@
gFileListTubeName = "fileList"
class changeNotifyForBeanstalkd(changeNotifyThread):
- def callback(self, monitoringPath, fullPath, changeType):
- itemDict = {"monitoringPath": monitoringPath, "fullPath": fullPath, "changeType":changeType}
+ def callback(self, pathToWatch, relativePath, changeType):
+ fullPath = transform.transformDirToInternal(os.path.join(pathToWatch, relativePath))
+ itemDict = {"monitoringPath": pathToWatch, "fullPath": fullPath, "changeType":changeType,
+ "timestamp": time.time()}
s = json.dumps(itemDict, sort_keys=True, indent=4)
beanstalk = beanstalkc.Connection(host=gBeanstalkdServerHost, port=gBeanstalkdServerPort)
beanstalk.use(gFileListTubeName)
@@ -52,7 +55,7 @@ def startServer(self):
beanstalk.ignore('default')
while True:
job = beanstalk.reserve()
- print "got job", job.body
+ print "got job", job.body
item = json.loads(job.body)
fullPath = transform.transformDirToInternal(item["fullPath"])
if not os.path.exists(item["fullPath"]) or self.notifyThreads.has_key(fullPath):
@@ -64,6 +67,6 @@ def startServer(self):
-if __name__ == "__main__":
+if __name__ == "__main__":
s = monitorService()
s.startServer()
View
4 trunk/prodRoot/localLibs/services/tests/monitorServiceTest.py
@@ -6,6 +6,6 @@
import localLibSys
import localLibs.services.monitorService as monitorService
-if __name__ == "__main__":
+if __name__ == "__main__":
s = monitorService.monitorService()
- s.addItem("d:/tmp")
+ s.addItem("d:/tmp/monitoring")
View
64 trunk/prodRoot/localLibs/storage/infoStorage/zippedCollectionWithInfo.py
@@ -1,33 +1,33 @@
-import localLibSys
-import wwjufsdatabase.libs.utils.simplejson as json
-import wwjufsdatabase.libs.utils.transform as transform
-import localLibs.objSys.ufsObj as ufsObj
-import wwjufsdatabase.libs.utils.fileTools as fileTools
-import desktopApp.lib.compress.zipClass as zipClass
-
-gWorkingDir = "d:/tmp"
-
-class zippedCollectionWithInfo(object):
- def __init__(self):
- self.collectionInfoDict = {}
- self.zipFile = None
- self.zipFilePath = None
- def addItem(self, fullPath):
- #Get file info and add info to info dict
- fullPath = transform.transformDirToInternal(fullPath)
- itemObj = ufsObj.detailedFsObj(fullPath)
- self.collectionInfoDict[itemObj.ufsUrl()] = itemObj.getItemInfo()
- #Add file to zip
- self.getZipFile().addfile(unicode(fullPath), unicode(fullPath))
- #If size exceed certain value, generate a package and submit info to database
- pass
- def getZipFile(self):
- if self.zipFile is None:
- self.zipFilePath = transform.transformDirToInternal(
- fileTools.getTimestampWithFreeName(gWorkingDir, '.zip'))
- self.zipFile = zipClass.ZFile(self.zipFilePath, 'w')
- return self.zipFile
- def finalizeZipFile(self):
- self.zipFile.close()
- self.zipFile = None
+import localLibSys
+import wwjufsdatabase.libs.utils.simplejson as json
+import wwjufsdatabase.libs.utils.transform as transform
+import localLibs.objSys.ufsObj as ufsObj
+import wwjufsdatabase.libs.utils.fileTools as fileTools
+import desktopApp.lib.compress.zipClass as zipClass
+
+gWorkingDir = "d:/tmp"
+
+class zippedCollectionWithInfo(object):
+ def __init__(self, workingDir = gWorkingDir):
+ self.collectionInfoDict = {}
+ self.zipFile = None
+ self.zipFilePath = None
+ self.workingDir = workingDir
+ def addItem(self, fullPath):
+ #Get file info and add info to info dict
+ fullPath = transform.transformDirToInternal(fullPath)
+ itemObj = ufsObj.detailedFsObj(fullPath)
+ self.collectionInfoDict[itemObj.ufsUrl()] = itemObj.getItemInfo()
+ #Add file to zip
+ return self.getZipFile().addfile(unicode(fullPath), unicode(fullPath))
+
+ def getZipFile(self):
+ if self.zipFile is None:
+ self.zipFilePath = transform.transformDirToInternal(
+ fileTools.getTimestampWithFreeName(self.workingDir, '.zip'))
+ self.zipFile = zipClass.ZFile(self.zipFilePath, 'w')
+ return self.zipFile
+ def finalizeZipFile(self):
+ self.zipFile.close()
+ self.zipFile = None
return self.zipFilePath
View
2 trunk/prodRoot/localLibs/windows/changeNotifyThread.py
@@ -62,7 +62,7 @@ def run ( self ):
#print full_filename, ACTIONS.get (action, "Unknown")
self.callback(self.path_to_watch, file, ACTIONS.get (action, "Unknown"))
- def callback(self, fullPath):
+ def callback(self, pathToWatch, relativePath, action):
pass
def exit(self):
View
2 trunk/prodRoot/startBeanstalkd.bat
@@ -1,2 +1,2 @@
-mkdir ..\..\..\beanstalkdData
+mkdir ..\..\..\beanstalkdData
..\otherBin\beanstalk\bin\beanstalkd.exe -b ../../../beanstalkdData -p 11300 -l 127.0.0.1
View
3 trunk/prodRoot/wwjufsdatabase/libs/utils/transform.py
@@ -83,3 +83,6 @@ def autoDecoder(orig):
shall decode the string to unicode
'''
return orig
+
+if __name__ == "__main__":
+ print transformDirToInternal('d:/tmp')

0 comments on commit e0a606e

Please sign in to comment.