diff --git a/EPFConfig.json b/EPFConfig.json index 7f4a91a..ec6c2b5 100644 --- a/EPFConfig.json +++ b/EPFConfig.json @@ -1,16 +1,16 @@ { - "fieldSep": "\u0001", - "recordSep": "\u0002\n", - "dbHost": "localhost", - "dbName": "epf", - "dbUser": "epfimporter", - "dbPassword": "epf123", - "tablePrefix": "epf", - "allowExtensions": false, + "fieldSep": "\u0001", + "recordSep": "\u0002\n", + "dbHost": "localhost", + "dbName": "epf", + "dbUser": "epfimporter", + "dbPassword": "epf123", + "tablePrefix": "epf", + "allowExtensions": false, "blackList": [ "^\\." - ], + ], "whiteList": [ ".*?" ] -} \ No newline at end of file +} diff --git a/EPFFlatConfig.json b/EPFFlatConfig.json index d946ab3..2bf0bd1 100644 --- a/EPFFlatConfig.json +++ b/EPFFlatConfig.json @@ -1,16 +1,16 @@ { - "fieldSep": "\t", - "recordSep": "\n", - "dbHost": "localhost", - "dbName": "epf", - "dbUser": "epfimporter", - "dbPassword": "epf123", - "tablePrefix": "epfflat", - "allowExtensions": true, + "fieldSep": "\t", + "recordSep": "\n", + "dbHost": "localhost", + "dbName": "epf", + "dbUser": "epfimporter", + "dbPassword": "epf123", + "tablePrefix": "epfflat", + "allowExtensions": true, "blackList": [ "^\\." - ], + ], "whiteList": [ ".*?" ] -} \ No newline at end of file +} diff --git a/EPFImporter.py b/EPFImporter.py index 336a512..9d5dc7d 100755 --- a/EPFImporter.py +++ b/EPFImporter.py @@ -2,39 +2,39 @@ # Copyright (c) 2010 Apple Inc. All rights reserved. -# IMPORTANT: This Apple software is supplied to you by Apple Inc. ("Apple") in -# consideration of your agreement to the following terms, and your use, -# installation, modification or redistribution of this Apple software -# constitutes acceptance of these terms. If you do not agree with these terms, +# IMPORTANT: This Apple software is supplied to you by Apple Inc. ("Apple") in +# consideration of your agreement to the following terms, and your use, +# installation, modification or redistribution of this Apple software +# constitutes acceptance of these terms. If you do not agree with these terms, # please do not use, install, modify or redistribute this Apple software. -# In consideration of your agreement to abide by the following terms, and subject -# to these terms, Apple grants you a personal, non-exclusive license, under Apple's -# copyrights in this original Apple software (the "Apple Software"), to use, -# reproduce, modify and redistribute the Apple Software, with or without -# modifications, in source and/or binary forms; provided that if you redistribute -# the Apple Software in its entirety and without modifications, you must retain -# this notice and the following text and disclaimers in all such redistributions -# of the Apple Software. Neither the name, trademarks, service marks or logos of -# Apple Inc. may be used to endorse or promote products derived from the Apple -# Software without specific prior written permission from Apple. Except as -# expressly stated in this notice, no other rights or licenses, express or implied, -# are granted by Apple herein, including but not limited to any patent rights that -# may be infringed by your derivative works or by other works in which the Apple +# In consideration of your agreement to abide by the following terms, and subject +# to these terms, Apple grants you a personal, non-exclusive license, under Apple's +# copyrights in this original Apple software (the "Apple Software"), to use, +# reproduce, modify and redistribute the Apple Software, with or without +# modifications, in source and/or binary forms; provided that if you redistribute +# the Apple Software in its entirety and without modifications, you must retain +# this notice and the following text and disclaimers in all such redistributions +# of the Apple Software. Neither the name, trademarks, service marks or logos of +# Apple Inc. may be used to endorse or promote products derived from the Apple +# Software without specific prior written permission from Apple. Except as +# expressly stated in this notice, no other rights or licenses, express or implied, +# are granted by Apple herein, including but not limited to any patent rights that +# may be infringed by your derivative works or by other works in which the Apple # Software may be incorporated. -# The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO -# WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED -# WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN -# COMBINATION WITH YOUR PRODUCTS. - -# IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION -# OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT -# (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN +# The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO +# WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED +# WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN +# COMBINATION WITH YOUR PRODUCTS. + +# IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION +# OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT +# (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import EPFIngester @@ -67,10 +67,10 @@ # INCREMENTAL_STATUS_PATH = "./EPFStatusIncremental.json" # FULL_STATUS_DICT = {"tablePrefix":None, "dirsToImport":[], "dirsLeft":[], "currentDict":{}} # INCREMENTAL_STATUS_DICT = {"tablePrefix":None, "dirsToImport":[], "dirsLeft":[], "currentDict":{}} -# +# # STATUS_MAP = {"full":(FULL_STATUS_DICT, FULL_STATUS_PATH), # "incremental":(INCREMENTAL_STATUS_DICT, INCREMENTAL_STATUS_PATH)} - + #Create a directory for rotating logs try: @@ -130,34 +130,34 @@ def doImport(directoryPath, fieldDelim='\x01'): """ Perform a full import of the EPF files in the directory specified by directoryPath. - + importMode can be 'full' or 'incremental' - - whiteList is a sequence of regular expressions. Only files whose basenames (i.e., the last - element in the path) match one or more of the regexes in whiteList will be imported. For - example, whiteList=[".*song.*", ".*video.*"] would result in all files containing "song" or + + whiteList is a sequence of regular expressions. Only files whose basenames (i.e., the last + element in the path) match one or more of the regexes in whiteList will be imported. For + example, whiteList=[".*song.*", ".*video.*"] would result in all files containing "song" or "video" anywhere in the filename being imported, and the rest being ignored. To import only exact matches, precede the name with a caret (^) and follow it with a dollar sign ($), e.g. "^video$". - + The default is for all files to be whitelisted. - - blackList works similarly; any filenames matching any of the items in blackList will be - excluded from the import, even if they are matched in whiteList. By default, any filename - with a dot (".") in it will be excluded. Since EPF filenames never include a dot, this permits + + blackList works similarly; any filenames matching any of the items in blackList will be + excluded from the import, even if they are matched in whiteList. By default, any filename + with a dot (".") in it will be excluded. Since EPF filenames never include a dot, this permits placing any file with an extension (e.g., .txt) in the directory without disrupting the import. - + Returns a list of any files for which the import failed (empty if all succeeded) - """ + """ #Exclude files with a dot (for example, the invisible .DSStore files HFS+ uses) if not allowExtensions: - blackList.append(r'.*\..*?') - + blackList.append(r'.*\..*?') + wListRe = (r"|".join(whiteList) if whiteList else r"$a^") #The latter can never match anything bListRe = (r"|".join(blackList) if blackList else r"$a^") #The latter can never match anything wMatcher = re.compile(wListRe) bMatcher = re.compile(bListRe) - + dirPath = os.path.abspath(directoryPath) fileList = os.listdir(dirPath) #filter the list down to the entries matching our whitelist/blacklist @@ -166,7 +166,7 @@ def doImport(directoryPath, filesLeft = copy.copy(fileList) filesImported = [] failedFiles = [] - + SNAPSHOT_DICT['tablePrefix'] = tablePrefix SNAPSHOT_DICT['wList'] = whiteList SNAPSHOT_DICT['bList'] = blackList @@ -175,7 +175,7 @@ def doImport(directoryPath, SNAPSHOT_DICT['dirsLeft'].remove(dirPath) except ValueError: pass - + currentDict = SNAPSHOT_DICT['currentDict'] currentDict['recordSep'] = recordDelim currentDict['fieldSep'] = fieldDelim @@ -184,11 +184,11 @@ def doImport(directoryPath, currentDict['filesLeft'] = filesLeft currentDict['filesImported'] = filesImported currentDict['failedFiles'] = failedFiles - - + + _dumpDict(SNAPSHOT_DICT, SNAPSHOT_PATH) pathList = [os.path.join(dirPath, fileName) for fileName in fileList] - + startTime = datetime.datetime.now() LOGGER.info("Starting import of %s...", dirPath) for aPath in pathList: @@ -199,8 +199,8 @@ def doImport(directoryPath, EPFIngester.__warningregistry__.clear() except AttributeError: pass - - try: + + try: ing = EPFIngester.Ingester(aPath, tablePrefix=tablePrefix, dbHost=dbHost, @@ -215,7 +215,7 @@ def doImport(directoryPath, failedFiles.append(fName) _dumpDict(SNAPSHOT_DICT, SNAPSHOT_PATH) continue - + try: ing.ingest(skipKeyViolators=skipKeyViolators) filesLeft.remove(fName) @@ -225,11 +225,11 @@ def doImport(directoryPath, failedFiles.append(fName) _dumpDict(SNAPSHOT_DICT, SNAPSHOT_PATH) continue - + endTime = datetime.datetime.now() ts = str(endTime - startTime) dirName = os.path.basename(dirPath) - LOGGER.info("Import of %s completed at: %s", dirName, + LOGGER.info("Import of %s completed at: %s", dirName, endTime.strftime(EPFIngester.DATETIME_FORMAT)) LOGGER.info("Total import time for %s: %s" , dirName, ts[:len(ts)-4]) if (failedFiles): @@ -258,7 +258,7 @@ def resumeImport(currentDict, wList = ["^%s$" % aFile for aFile in filesLeft] #anchor the regexes for exact matches filesImported = currentDict['filesImported'] bList = ["^%s$" % aFile for aFile in filesImported] #anchor the regexes for exact matches - + failedFiles = doImport(dirPath, tablePrefix=tablePrefix, dbHost=dbHost, @@ -270,7 +270,7 @@ def resumeImport(currentDict, recordDelim=recordDelim, fieldDelim=fieldDelim) return failedFiles - + def _dumpDict(aDict, filePath): """ @@ -283,7 +283,7 @@ def _dumpDict(aDict, filePath): with open(filePath, mode='w+') as f: json.dump(aDict, f, indent=4) - + def main(): """ Entry point for command-line execution @@ -314,12 +314,12 @@ def main(): recordSep='\n', fieldSep='\t') _dumpDict(flatOptions, FLAT_CONFIG_PATH) - + #Command-line parsing usage = """usage: %prog [-fxrak] [-d db_host] [-u db_user] [-p db_password] [-n db_name] - [-s record_separator] [-t field_separator] [-w regex [-w regex2 [...]]] + [-s record_separator] [-t field_separator] [-w regex [-w regex2 [...]]] [-b regex [-b regex2 [...]]] source_directory [source_directory2 ...]""" - + op = optparse.OptionParser(version="%prog " + VERSION, description=DESCRIPTION, usage=usage) op.add_option('-f', '--flat', action='store_true', dest='isFlat', default=False, help="""Import EPF Flat files, using values from EPFFlat.config if not overridden""") @@ -347,33 +347,33 @@ def main(): help="""A regular expression to add to the whiteList; repeated -b arguments will append""") op.add_option('-k', '--skipkeyviolators', action='store_true', dest='skipKeyViolators', default=False, help="""Ignore inserts which would violate a primary key constraint; only applies to full imports""") - + (options, args) = op.parse_args() #parse command-line options - + if not args and not options.isResume: #no directory args were given, and we're not in resume mode op.print_usage() sys.exit() - + #roll over the log file, so each import has its own log for aHandler in LOGGER.handlers: try: aHandler.doRollover() except AttributeError: pass #only the file handler has a doRollover() method - + configPath = (FLAT_CONFIG_PATH if options.isFlat else CONFIG_PATH) with open(configPath) as configFile: configDict = json.load(configFile) - + #iterate through the options dict. #For each entry which is None, replace it with the value from the config file optDict = options.__dict__ for aKey in optDict.keys(): if (not optDict[aKey]) and (configDict.has_key(aKey)): optDict[aKey] = configDict[aKey] - + failedFilesDict = {} - + #bind these to locals here; they will be rebound later if this is a resume dirsToImport = args tablePrefix = options.tablePrefix @@ -382,11 +382,11 @@ def main(): recordSep = options.recordSep fieldSep = options.fieldSep allowExtensions = options.allowExtensions - + global SNAPSHOT_DICT, SNAPSHOT_PATH SNAPSHOT_DICT['dirsToImport'] = copy.copy(dirsToImport) SNAPSHOT_DICT['dirsLeft'] = copy.copy(dirsToImport) - + startTime = datetime.datetime.now() #call the appropriate import function @@ -396,7 +396,7 @@ def main(): tablePrefix = SNAPSHOT_DICT['tablePrefix'] currentDict = SNAPSHOT_DICT['currentDict'] LOGGER.info("Resuming import for %s", currentDict['dirPath']) - + failedFiles = resumeImport(currentDict, tablePrefix=tablePrefix, dbHost=options.dbHost, @@ -414,7 +414,7 @@ def main(): dirsToImport = SNAPSHOT_DICT['dirsLeft'] wList = SNAPSHOT_DICT['wList'] bList = SNAPSHOT_DICT['bList'] - + #non-resume if dirsToImport: LOGGER.info("Beginning import for the following directories:\n %s", "\n ".join(dirsToImport)) @@ -436,15 +436,15 @@ def main(): if failedFiles: failedFilesDict[dirName] = failedFiles - + endTime = datetime.datetime.now() ts = str(endTime - startTime) - + if failedFilesDict: failedList = [" %s/%s" % (str(aKey), str(failedFilesDict[aKey])) for aKey in failedFilesDict.keys()] failedString = "\n".join(failedList) LOGGER.warning("The following files encountered errors and were not imported:\n %s", failedString) - + LOGGER.info("Total import time for all directories: %s", ts[:len(ts)-4]) #Execute @@ -452,4 +452,3 @@ def main(): main() - \ No newline at end of file diff --git a/EPFIngester.py b/EPFIngester.py index af64c90..c4d34e7 100644 --- a/EPFIngester.py +++ b/EPFIngester.py @@ -1,38 +1,38 @@ # Copyright (c) 2010 Apple Inc. All rights reserved. -# IMPORTANT: This Apple software is supplied to you by Apple Inc. ("Apple") in -# consideration of your agreement to the following terms, and your use, -# installation, modification or redistribution of this Apple software -# constitutes acceptance of these terms. If you do not agree with these terms, +# IMPORTANT: This Apple software is supplied to you by Apple Inc. ("Apple") in +# consideration of your agreement to the following terms, and your use, +# installation, modification or redistribution of this Apple software +# constitutes acceptance of these terms. If you do not agree with these terms, # please do not use, install, modify or redistribute this Apple software. -# In consideration of your agreement to abide by the following terms, and subject -# to these terms, Apple grants you a personal, non-exclusive license, under Apple's -# copyrights in this original Apple software (the "Apple Software"), to use, -# reproduce, modify and redistribute the Apple Software, with or without -# modifications, in source and/or binary forms; provided that if you redistribute -# the Apple Software in its entirety and without modifications, you must retain -# this notice and the following text and disclaimers in all such redistributions -# of the Apple Software. Neither the name, trademarks, service marks or logos of -# Apple Inc. may be used to endorse or promote products derived from the Apple -# Software without specific prior written permission from Apple. Except as -# expressly stated in this notice, no other rights or licenses, express or implied, -# are granted by Apple herein, including but not limited to any patent rights that -# may be infringed by your derivative works or by other works in which the Apple +# In consideration of your agreement to abide by the following terms, and subject +# to these terms, Apple grants you a personal, non-exclusive license, under Apple's +# copyrights in this original Apple software (the "Apple Software"), to use, +# reproduce, modify and redistribute the Apple Software, with or without +# modifications, in source and/or binary forms; provided that if you redistribute +# the Apple Software in its entirety and without modifications, you must retain +# this notice and the following text and disclaimers in all such redistributions +# of the Apple Software. Neither the name, trademarks, service marks or logos of +# Apple Inc. may be used to endorse or promote products derived from the Apple +# Software without specific prior written permission from Apple. Except as +# expressly stated in this notice, no other rights or licenses, express or implied, +# are granted by Apple herein, including but not limited to any patent rights that +# may be infringed by your derivative works or by other works in which the Apple # Software may be incorporated. -# The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO -# WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED -# WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN -# COMBINATION WITH YOUR PRODUCTS. - -# IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION -# OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT -# (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN +# The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO +# WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED +# WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN +# COMBINATION WITH YOUR PRODUCTS. + +# IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION +# OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT +# (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import EPFParser @@ -57,9 +57,9 @@ class Ingester(object): #Supress warnings that occur when we do a 'DROP TABLE IF EXISTS'; we expect these, #so there's no point in cluttering up the output with them. warnings.filterwarnings('ignore', 'Unknown table.*') - + def __init__(self, - filePath, + filePath, tablePrefix=None, dbHost='localhost', dbUser='epfimporter', @@ -92,7 +92,7 @@ def __init__(self, self.lastRecordCheck = 0 self.lastTimeCheck = datetime.datetime.now() - + def updateStatusDict(self): self.statusDict['fileName'] = self.fileName self.statusDict['filePath'] = self.filePath @@ -102,7 +102,7 @@ def updateStatusDict(self): self.statusDict['abortTime'] = (str(self.abortTime) if self.abortTime else None) self.statusDict['didAbort'] = self.didAbort - + def ingest(self, skipKeyViolators=False): """ Perform a full or incremental ingest, depending on self.parser.exportMode @@ -112,11 +112,11 @@ def ingest(self, skipKeyViolators=False): else: self.ingestFull(skipKeyViolators=skipKeyViolators) - + def ingestFull(self, skipKeyViolators=False): """ Perform a full ingest of the file at self.filePath. - + This is done as follows: 1. Create a new table with a temporary name 2. Populate the new table @@ -139,8 +139,8 @@ def ingestFull(self, skipKeyViolators=False): self.endTime = datetime.datetime.now() self.updateStatusDict() LOGGER.info("Full ingest of %s took %s", self.tableName, str(self.endTime - self.startTime)) - - + + def ingestFullResume(self, fromRecord=0, skipKeyViolators=False): """ Resume an interrupted full ingest, continuing from fromRecord. @@ -159,12 +159,12 @@ def ingestFullResume(self, fromRecord=0, skipKeyViolators=False): endTime = datetime.datetime.now() ts = str(self.endTime - self.startTime) LOGGER.info("Resumed full ingest of %s took %s", self.tableName, ts[:len(ts)-4]) - + def ingestIncremental(self, fromRecord=0, skipKeyViolators=False): """ Update the table with the data in the file at filePath. - + If the file to ingest has < 500,000 records, we do a simple REPLACE operation on the existing table. If it's larger than that, we use the following 3-step process: 1. Create a temporary table, and populate it exactly as though it were a Full ingest @@ -183,7 +183,7 @@ def ingestIncremental(self, fromRecord=0, skipKeyViolators=False): fileColCount = len(self.parser.columnNames) assert (tableColCount <= fileColCount) #It's possible for the existing table #to have fewer columns than the file we're importing, but it should never have more. - + if fileColCount > tableColCount: #file has "extra" columns LOGGER.warn("File contains additional columns not in the existing table. These will not be imported.") self.parser.columnNames = self.parser.columnNames[:tableColCount] #trim the columnNames @@ -192,15 +192,15 @@ def ingestIncremental(self, fromRecord=0, skipKeyViolators=False): s = ("Resuming" if fromRecord else "Beginning") LOGGER.info("%s incremental ingest of %s (%i records)", s, self.tableName, self.parser.recordsExpected) self.startTime = datetime.datetime.now() - + #Different ingest techniques are faster depending on the size of the input. #If there are a large number of records, it's much faster to do a prune-and-merge technique; #for fewer records, it's faster to update the existing table. try: if self.parser.recordsExpected < 500000: #update table in place self._populateTable(self.tableName, - resumeNum=fromRecord, - isIncremental=True, + resumeNum=fromRecord, + isIncremental=True, skipKeyViolators=skipKeyViolators) else: #Import as full, then merge the proper records into a new table self._createTable(self.incTableName) @@ -212,7 +212,7 @@ def ingestIncremental(self, fromRecord=0, skipKeyViolators=False): LOGGER.info("Applying primary key constraints...") self._applyPrimaryKeyConstraints(self.unionTableName) self._renameAndDrop(self.unionTableName, self.tableName) - + except MySQLdb.Error, e: #LOGGER.error("Error %d: %s", e.args[0], e.args[1]) LOGGER.error("Fatal error encountered while ingesting '%s'", self.filePath) @@ -226,35 +226,35 @@ def ingestIncremental(self, fromRecord=0, skipKeyViolators=False): ts = str(self.endTime - self.startTime) LOGGER.info("Incremental ingest of %s took %s", self.tableName, ts[:len(ts)-4]) self.updateStatusDict() - - + + def connect(self): """ Establish a connection to the database, returning the connection object. """ conn = MySQLdb.connect( - charset='utf8', - host=self.dbHost, - user=self.dbUser, - passwd=self.dbPassword, + charset='utf8', + host=self.dbHost, + user=self.dbUser, + passwd=self.dbPassword, db=self.dbName) return conn - - + + def tableExists(self, tableName=None, connection=None): """ Convenience method which returns True if tableName exists in the db, False if not. - + If tableName is None, uses self.tableName. - + If a connection object is specified, this method uses it and does not close it; if not, it creates one using connect(), uses it, and then closes it. """ - + exStr = """SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = %s AND table_name = %s""" - + tableName = (tableName if tableName else self.tableName) conn = (connection if connection else self.connect()) cur = conn.cursor() @@ -265,14 +265,14 @@ def tableExists(self, tableName=None, connection=None): if not connection: conn.close() return doesExist - - + + def columnCount(self, tableName=None, connection=None): """ Convenience method for returning the number of columns in tableName. - + If tableName is None, uses self.tableName. - + If a connection object is specified, this method uses it and does not close it; if not, it creates one using connect(), uses it, and then closes it. """ @@ -287,11 +287,11 @@ def columnCount(self, tableName=None, connection=None): conn.close() return colCount - + def _createTable(self, tableName): """ Connect to the db and create a table named self.tableName_TMP, dropping previous one if it exists. - + Also adds primary key constraint to the new table. """ conn = self.connect() @@ -307,14 +307,14 @@ def _createTable(self, tableName): #set the primary key conn.close() self._applyPrimaryKeyConstraints(tableName) - + def _applyPrimaryKeyConstraints(self, tableName): """ Apply the primary key specified in parser to tableName. """ pkLst = self.parser.primaryKey - + if pkLst: conn = self.connect() cur = conn.cursor() @@ -322,13 +322,13 @@ def _applyPrimaryKeyConstraints(self, tableName): exStr = """ALTER TABLE %s ADD CONSTRAINT PRIMARY KEY (%s)""" % (tableName, pkStr) cur.execute(exStr) conn.close() - + def _escapeRecords(self, recordList, connection=None): """ Appropriately escape the contents of a list of records (as returned by the parser) so that there are no illegal characters (e.g. internal quotes) in the SQL query. - + This is done here rather than in the parser because it uses the literal() method of the connection object. """ @@ -338,12 +338,12 @@ def _escapeRecords(self, recordList, connection=None): escRec = [conn.literal(aField) for aField in aRec] escapedRecords.append(escRec) return escapedRecords - + def _populateTable(self, tableName, resumeNum=0, isIncremental=False, skipKeyViolators=False): """ Populate tableName with data fetched by the parser, first advancing to resumePos. - + For Full imports, if skipKeyViolators is True, any insertions which would violate the primary key constraint will be skipped and won't log errors. """ @@ -352,10 +352,10 @@ def _populateTable(self, tableName, resumeNum=0, isIncremental=False, skipKeyVio ignoreString = ("IGNORE" if (skipKeyViolators and not isIncremental) else "") exStrTemplate = """%s %s INTO %s %s VALUES %s""" colNamesStr = "(%s)" % (", ".join(self.parser.columnNames)) - + self.parser.seekToRecord(resumeNum) #advance to resumeNum conn = self.connect() - + while (True): #By default, we concatenate 200 inserts into a single INSERT statement. #a large batch size per insert improves performance, until you start hitting max_packet_size issues. @@ -363,10 +363,10 @@ def _populateTable(self, tableName, resumeNum=0, isIncremental=False, skipKeyVio records = self.parser.nextRecords(maxNum=200) if (not records): break - + escapedRecords = self._escapeRecords(records) #This will sanitize the records stringList = ["(%s)" % (", ".join(aRecord)) for aRecord in escapedRecords] - + cur = conn.cursor() colVals = unicode(", ".join(stringList), 'utf-8') exStr = exStrTemplate % (commandString, ignoreString, tableName, colNamesStr, colVals) @@ -388,12 +388,12 @@ def _populateTable(self, tableName, resumeNum=0, isIncremental=False, skipKeyVio conn.close() - + def _checkProgress(self, recordGap=5000, timeGap=datetime.timedelta(0, 120, 0)): """ Checks whether recordGap or more records have been ingested since the last check; if so, checks whether timeGap seconds have elapsed since the last check. - + If both checks pass, returns self.lastRecordIngested; otherwise returns None. """ if self.lastRecordIngested - self.lastRecordCheck >= recordGap: @@ -403,7 +403,7 @@ def _checkProgress(self, recordGap=5000, timeGap=datetime.timedelta(0, 120, 0)): self.lastRecordCheck = self.lastRecordIngested return self.lastRecordCheck return None - + def _dropTable(self, tableName): """A convenience method that just connects, drops tableName if it exists, and disconnects""" @@ -411,7 +411,7 @@ def _dropTable(self, tableName): cur = conn.cursor() cur.execute("""DROP TABLE IF EXISTS %s""" % tableName) conn.close() - + def _renameAndDrop(self, sourceTable, targetTable): """ @@ -436,8 +436,8 @@ def _renameAndDrop(self, sourceTable, targetTable): #Drop sourceTable so it's not hanging around #drop the old table cur.execute("""DROP TABLE IF EXISTS %s""" % targetOld) - - + + def _createUnionTable(self): """ After incremental ingest data has been written to self.incTableName, union the pruned @@ -449,7 +449,7 @@ def _createUnionTable(self): exStr = """CREATE TABLE %s %s""" % (self.unionTableName, self._incrementalUnionString()) cur.execute(exStr) conn.close() - + def _incrementalWhereClause(self): """ @@ -462,23 +462,23 @@ def _incrementalWhereClause(self): whereClause = "WHERE %s.export_date <= %s.export_date AND %s" % (self.tableName, self.incTableName, joinedString) return whereClause - + def _incrementalSelectString(self): """ Creates and returns the appropriate SELECT statement used when pruning the target table during an incremental ingest """ whereClause = self._incrementalWhereClause() - selectString = ("SELECT * FROM %s WHERE 0 = (SELECT COUNT(*) FROM %s %s)" % + selectString = ("SELECT * FROM %s WHERE 0 = (SELECT COUNT(*) FROM %s %s)" % (self.tableName, self.incTableName, whereClause)) return selectString - + def _incrementalUnionString(self): """ Creates and returns the appropriate UNION string used when merging the pruned table with the temporary incrmental table. - + The ingest and pruning process should preclude any dupes, so we can use ALL, which should be faster. """ selectString = self._incrementalSelectString() diff --git a/EPFLogger.conf b/EPFLogger.conf index a5673dd..e91143f 100644 --- a/EPFLogger.conf +++ b/EPFLogger.conf @@ -8,7 +8,7 @@ level = INFO keys = root [formatter_simpleFormatter] -datefmt = +datefmt = format = %(asctime)s [%(levelname)s]: %(message)s [handlers] diff --git a/EPFParser.py b/EPFParser.py index 0e646b6..796cd46 100644 --- a/EPFParser.py +++ b/EPFParser.py @@ -1,38 +1,38 @@ # Copyright (c) 2010 Apple Inc. All rights reserved. -# IMPORTANT: This Apple software is supplied to you by Apple Inc. ("Apple") in -# consideration of your agreement to the following terms, and your use, -# installation, modification or redistribution of this Apple software -# constitutes acceptance of these terms. If you do not agree with these terms, +# IMPORTANT: This Apple software is supplied to you by Apple Inc. ("Apple") in +# consideration of your agreement to the following terms, and your use, +# installation, modification or redistribution of this Apple software +# constitutes acceptance of these terms. If you do not agree with these terms, # please do not use, install, modify or redistribute this Apple software. -# In consideration of your agreement to abide by the following terms, and subject -# to these terms, Apple grants you a personal, non-exclusive license, under Apple's -# copyrights in this original Apple software (the "Apple Software"), to use, -# reproduce, modify and redistribute the Apple Software, with or without -# modifications, in source and/or binary forms; provided that if you redistribute -# the Apple Software in its entirety and without modifications, you must retain -# this notice and the following text and disclaimers in all such redistributions -# of the Apple Software. Neither the name, trademarks, service marks or logos of -# Apple Inc. may be used to endorse or promote products derived from the Apple -# Software without specific prior written permission from Apple. Except as -# expressly stated in this notice, no other rights or licenses, express or implied, -# are granted by Apple herein, including but not limited to any patent rights that -# may be infringed by your derivative works or by other works in which the Apple +# In consideration of your agreement to abide by the following terms, and subject +# to these terms, Apple grants you a personal, non-exclusive license, under Apple's +# copyrights in this original Apple software (the "Apple Software"), to use, +# reproduce, modify and redistribute the Apple Software, with or without +# modifications, in source and/or binary forms; provided that if you redistribute +# the Apple Software in its entirety and without modifications, you must retain +# this notice and the following text and disclaimers in all such redistributions +# of the Apple Software. Neither the name, trademarks, service marks or logos of +# Apple Inc. may be used to endorse or promote products derived from the Apple +# Software without specific prior written permission from Apple. Except as +# expressly stated in this notice, no other rights or licenses, express or implied, +# are granted by Apple herein, including but not limited to any patent rights that +# may be infringed by your derivative works or by other works in which the Apple # Software may be incorporated. -# The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO -# WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED -# WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR -# PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN -# COMBINATION WITH YOUR PRODUCTS. - -# IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE -# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION -# OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT -# (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN +# The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO +# WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED +# WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN +# COMBINATION WITH YOUR PRODUCTS. + +# IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE +# GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION +# OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT +# (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN # ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os @@ -46,17 +46,17 @@ class SubstringNotFoundException(Exception): """ Exception thrown when a comment character or other tag is not found in a situation where it's required. """ - + class Parser(object): """ Parses an EPF file. - + During initialization, all the file db metadata is stored, and the file seek position is set to the beginning of the first data record. The Parser object can then be used directly by an Ingester to create and populate the table. - + typeMap is a dictionary mapping datatype strings in the file to corresponding types for the database being used. The default map is for MySQL. """ @@ -84,9 +84,9 @@ def __init__(self, filePath, typeMap={"CLOB":"LONGTEXT"}, recordDelim='\x02\n', self.commentChar = Parser.commentChar self.recordDelim = recordDelim self.fieldDelim = fieldDelim - + self.eFile = open(filePath, mode="rU") #this will throw an exception if filePath does not exist - + #Seek to the end and parse the recordsWritten line self.eFile.seek(-40, os.SEEK_END) str = self.eFile.read() #reads from -40 to end of file @@ -97,18 +97,18 @@ def __init__(self, filePath, typeMap={"CLOB":"LONGTEXT"}, recordDelim='\x02\n', #Extract the column names line1 = self.nextRowString(ignoreComments=False) self.columnNames = self.splitRow(line1, requiredPrefix=self.commentChar) - + #We'll now grab the rest of the header data, without assuming a particular order primStart = self.commentChar+Parser.primaryKeyTag dtStart = self.commentChar+Parser.dataTypesTag exStart = self.commentChar+Parser.exportModeTag - + #Grab the next 6 lines, which should include all the header comments firstRows=[] for j in range(6): firstRows.append(self.nextRowString(ignoreComments=False)) firstRows = [aRow for aRow in firstRows if aRow] #strip None rows (possible if the file is < 6 rows) - + #Loop through the rows, extracting the header info for aRow in firstRows: if aRow.startswith(primStart): @@ -131,30 +131,30 @@ def __init__(self, filePath, typeMap={"CLOB":"LONGTEXT"}, recordDelim='\x02\n', self.numberColumns.append(j) #Build a dictionary of column names to data types self.typeMap = dict(zip(self.columnNames, self.dataTypes)) - - + + def setSeekPos(self, pos=0): """ Sets the underlying file's seek position. - + This is useful for resuming a partial ingest that was interrupted for some reason. """ self.eFile.seek(pos) - - + + def getSeekPos(self): """ Gets the underlying file's seek position. """ return self.eFile.tell() - + seekPos = property(fget=getSeekPos, fset=setSeekPos, doc="Seek position of the underlying file") - + def seekToRecord(self, recordNum): """ Set the seek position to the beginning of the recordNumth record. - + Seeks to the beginning of the file if recordNum <=0, or the end if it's greater than the number of records. """ @@ -165,14 +165,14 @@ def seekToRecord(self, recordNum): for j in range(recordNum): self.advanceToNextRecord() - + def nextRowString(self, ignoreComments=True): """ Returns (as a string) the next row of data (as delimited by self.recordDelim), ignoring comments if ignoreComments is True. - + Leaves the delimiters in place. - + Unfortunately Python doesn't allow line-based reading with user-supplied line separators (http://bugs.python.org/issue1152248), so we use normal line reading and then concatenate when we hit 0x02. @@ -196,8 +196,8 @@ def nextRowString(self, ignoreComments=True): else: rowString = "".join(lst) #concatenate the lines into a single string, which is the full content of the row return rowString - - + + def advanceToNextRecord(self): """ Performs essentially the same task as nextRowString, but without constructing or returning anything. @@ -212,13 +212,13 @@ def advanceToNextRecord(self): if (ln.find(self.recordDelim) != -1): #last textual line of this record break self.latestRecordNum += 1 - - + + def splitRow(self, rowString, requiredPrefix=None): """ Given rowString, strips requiredPrefix and self.recordDelim, then splits on self.fieldDelim, returning the resulting list. - + If requiredPrefix is not present in the row, throws a SubstringNotFound exception """ if (requiredPrefix): @@ -230,7 +230,7 @@ def splitRow(self, rowString, requiredPrefix=None): str = rowString.partition(self.recordDelim)[0] return str.split(self.fieldDelim) - + def nextRecord(self): """ Returns the next row of data as a list, or None if we're out of data. @@ -241,7 +241,7 @@ def nextRecord(self): rec = self.splitRow(rowString) rec = rec[:len(self.columnNames)] #if there are more data records than column names, #trim any surplus records via a slice - + #replace empty strings with NULL for i in range(len(rec)): val = rec[i] @@ -258,8 +258,8 @@ def nextRecord(self): return rec else: return None - - + + def nextRecords(self, maxNum=100): """ Returns the next maxNum records (or fewer if EOF) as a list of lists. @@ -271,8 +271,8 @@ def nextRecords(self, maxNum=100): break records.append(lst) return records - - + + def nextRecordDict(self): """ Returns the next row of data as a dictionary, keyed by the column names.