Permalink
Fetching contributors…
Cannot retrieve contributors at this time
640 lines (526 sloc) 19.8 KB
""" Patch utility to apply unified diffs
Brute-force line-by-line non-recursive parsing
Copyright (c) 2008-2010 anatoly techtonik
Available under the terms of MIT license
NOTE: This version has been patched by Alex Stewart <alex@foogod.com> for
Python 3.x support and other misc fixups.
Project home: http://code.google.com/p/python-patch/
$Id: patch.py 92 2010-07-02 06:04:57Z techtonik $
$HeadURL: http://python-patch.googlecode.com/svn/trunk/patch.py $
"""
__author__ = "techtonik.rainforce.org"
__version__ = "10.04-2.pAS1"
import copy
import logging
import re
from logging import debug, info, warning
import sys
try:
# cStringIO doesn't support unicode in 2.5
from StringIO import StringIO
except ImportError:
# StringIO has been renamed to 'io' in 3.x
from io import StringIO
from os.path import exists, isfile, abspath
from os import unlink
_open = open
if sys.version_info >= (3,):
# Open files with universal newline support but no newline translation (3.x)
def open(filename, mode='r'):
return _open(filename, mode, newline='')
else:
# Open files with universal newline support but no newline translation (2.x)
def open(filename, mode='r'):
return _open(filename, mode + 'b')
# Python 3.x has changed iter.next() to be next(iter) instead, so for
# backwards compatibility, we'll just define a next() function under 2.x
def next(iter):
return iter.next()
#------------------------------------------------
# Logging is controlled by "python_patch" logger
debugmode = False
logger = logging.getLogger("python_patch")
loghandler = logging.StreamHandler()
logger.addHandler(loghandler)
debug = logger.debug
info = logger.info
warning = logger.warning
# If called as a library, don't log info/debug messages by default.
logger.setLevel(logging.WARN)
#------------------------------------------------
# constants for patch types
DIFF = PLAIN = "plain"
HG = MERCURIAL = "mercurial"
SVN = SUBVERSION = "svn"
def fromfile(filename):
""" Parse patch file and return Patch() object
"""
info("reading patch from file %s" % filename)
fp = open(filename, "r")
patch = Patch(fp)
fp.close()
return patch
def fromstring(s):
""" Parse text string and return Patch() object
"""
return Patch( StringIO(s) )
class HunkInfo(object):
""" Parsed hunk data container (hunk starts with @@ -R +R @@) """
def __init__(self):
self.startsrc=None #: line count starts with 1
self.linessrc=None
self.starttgt=None
self.linestgt=None
self.invalid=False
self.text=[]
def copy(self):
return copy.copy(self)
# def apply(self, estream):
# """ write hunk data into enumerable stream
# return strings one by one until hunk is
# over
#
# enumerable stream are tuples (lineno, line)
# where lineno starts with 0
# """
# pass
class Patch(object):
def __init__(self, stream=None):
# define Patch data members
# table with a row for every source file
#: list of source filenames
self.source=None
self.target=None
#: list of lists of hunks
self.hunks=None
#: file endings statistics for every hunk
self.hunkends=None
#: headers for each file
self.header=None
#: patch type - one of constants
self.type = None
if stream:
self.parse(stream)
def copy(self):
return copy.copy(self)
def parse(self, stream):
""" parse unified diff """
self.header = []
self.source = []
self.target = []
self.hunks = []
self.hunkends = []
# define possible file regions that will direct the parser flow
headscan = False # scanning header before the patch body
filenames = False # lines starting with --- and +++
hunkhead = False # @@ -R +R @@ sequence
hunkbody = False #
hunkskip = False # skipping invalid hunk mode
headscan = True
lineends = dict(lf=0, crlf=0, cr=0)
nextfileno = 0
nexthunkno = 0 #: even if index starts with 0 user messages number hunks from 1
# hunkinfo holds parsed values, hunkactual - calculated
hunkinfo = HunkInfo()
hunkactual = dict(linessrc=None, linestgt=None)
fe = enumerate(stream)
for lineno, line in fe:
# read out header
if headscan:
header = ''
try:
while not line.startswith("--- "):
header += line
lineno, line = next(fe)
except StopIteration:
# this is actually a loop exit
continue
self.header.append(header)
headscan = False
# switch to filenames state
filenames = True
# hunkskip and hunkbody code skipped until definition of hunkhead is parsed
if hunkbody:
# process line first
if re.match(r"^[- \+\\]", line):
# gather stats about line endings
if line.endswith("\r\n"):
self.hunkends[nextfileno-1]["crlf"] += 1
elif line.endswith("\n"):
self.hunkends[nextfileno-1]["lf"] += 1
elif line.endswith("\r"):
self.hunkends[nextfileno-1]["cr"] += 1
if line.startswith("-"):
hunkactual["linessrc"] += 1
elif line.startswith("+"):
hunkactual["linestgt"] += 1
elif not line.startswith("\\"):
hunkactual["linessrc"] += 1
hunkactual["linestgt"] += 1
hunkinfo.text.append(line)
# todo: handle \ No newline cases
else:
warning("invalid hunk no.%d at %d for target file %s" % (nexthunkno, lineno+1, self.target[nextfileno-1]))
# add hunk status node
self.hunks[nextfileno-1].append(hunkinfo.copy())
self.hunks[nextfileno-1][nexthunkno-1]["invalid"] = True
# switch to hunkskip state
hunkbody = False
hunkskip = True
# check exit conditions
if hunkactual["linessrc"] > hunkinfo.linessrc or hunkactual["linestgt"] > hunkinfo.linestgt:
warning("extra hunk no.%d lines at %d for target %s" % (nexthunkno, lineno+1, self.target[nextfileno-1]))
# add hunk status node
self.hunks[nextfileno-1].append(hunkinfo.copy())
self.hunks[nextfileno-1][nexthunkno-1]["invalid"] = True
# switch to hunkskip state
hunkbody = False
hunkskip = True
elif hunkinfo.linessrc == hunkactual["linessrc"] and hunkinfo.linestgt == hunkactual["linestgt"]:
self.hunks[nextfileno-1].append(hunkinfo.copy())
# switch to hunkskip state
hunkbody = False
hunkskip = True
# detect mixed window/unix line ends
ends = self.hunkends[nextfileno-1]
if ((ends["cr"]!=0) + (ends["crlf"]!=0) + (ends["lf"]!=0)) > 1:
warning("inconsistent line ends in patch hunks for %s" % self.source[nextfileno-1])
if debugmode:
debuglines = dict(ends)
debuglines.update(file=self.target[nextfileno-1], hunk=nexthunkno)
debug("crlf: %(crlf)d lf: %(lf)d cr: %(cr)d\t - file: %(file)s hunk: %(hunk)d" % debuglines)
if hunkskip:
match = re.match("^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))?", line)
if match:
# switch to hunkhead state
hunkskip = False
hunkhead = True
elif line.startswith("--- "):
# switch to filenames state
hunkskip = False
filenames = True
if debugmode and len(self.source) > 0:
debug("- %2d hunks for %s" % (len(self.hunks[nextfileno-1]), self.source[nextfileno-1]))
if filenames:
if line.startswith("--- "):
if nextfileno in self.source:
warning("skipping invalid patch for %s" % self.source[nextfileno])
del self.source[nextfileno]
# double source filename line is encountered
# attempt to restart from this second line
re_filename = "^--- ([^\t]+)"
match = re.match(re_filename, line)
# todo: support spaces in filenames
if match:
self.source.append(match.group(1).strip())
else:
warning("skipping invalid filename at line %d" % lineno)
# switch back to headscan state
filenames = False
headscan = True
elif not line.startswith("+++ "):
if nextfileno in self.source:
warning("skipping invalid patch with no target for %s" % self.source[nextfileno])
del self.source[nextfileno]
else:
# this should be unreachable
warning("skipping invalid target patch")
filenames = False
headscan = True
else:
if nextfileno in self.target:
warning("skipping invalid patch - double target at line %d" % lineno)
del self.source[nextfileno]
del self.target[nextfileno]
nextfileno -= 1
# double target filename line is encountered
# switch back to headscan state
filenames = False
headscan = True
else:
re_filename = "^\+\+\+ ([^\t]+)"
match = re.match(re_filename, line)
if not match:
warning("skipping invalid patch - no target filename at line %d" % lineno)
# switch back to headscan state
filenames = False
headscan = True
else:
self.target.append(match.group(1).strip())
nextfileno += 1
# switch to hunkhead state
filenames = False
hunkhead = True
nexthunkno = 0
self.hunks.append([])
self.hunkends.append(lineends.copy())
continue
if hunkhead:
match = re.match("^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))?", line)
if not match:
if nextfileno-1 not in self.hunks:
warning("skipping invalid patch with no hunks for file %s" % self.target[nextfileno-1])
# switch to headscan state
hunkhead = False
headscan = True
continue
else:
# switch to headscan state
hunkhead = False
headscan = True
else:
hunkinfo.startsrc = int(match.group(1))
hunkinfo.linessrc = 1
if match.group(3): hunkinfo.linessrc = int(match.group(3))
hunkinfo.starttgt = int(match.group(4))
hunkinfo.linestgt = 1
if match.group(6): hunkinfo.linestgt = int(match.group(6))
hunkinfo.invalid = False
hunkinfo.text = []
hunkactual["linessrc"] = hunkactual["linestgt"] = 0
# switch to hunkbody state
hunkhead = False
hunkbody = True
nexthunkno += 1
continue
if not hunkskip:
warning("patch file incomplete - %s" % filename)
# sys.exit(?)
else:
# duplicated message when an eof is reached
if debugmode and len(self.source) > 0:
debug("- %2d hunks for %s" % (len(self.hunks[nextfileno-1]), self.source[nextfileno-1]))
info("total files: %d total hunks: %d" % (len(self.source), sum([len(hset) for hset in self.hunks])))
def apply(self):
""" apply parsed patch """
total = len(self.source)
for fileno, filename in enumerate(self.source):
f2patch = filename
if not exists(f2patch):
f2patch = self.target[fileno]
if not exists(f2patch):
warning("source/target file does not exist\n--- %s\n+++ %s" % (filename, f2patch))
continue
if not isfile(f2patch):
warning("not a file - %s" % f2patch)
continue
filename = f2patch
info("processing %d/%d:\t %s" % (fileno+1, total, filename))
# validate before patching
f2fp = open(filename)
hunkno = 0
hunk = self.hunks[fileno][hunkno]
hunkfind = []
hunkreplace = []
validhunks = 0
canpatch = False
for lineno, line in enumerate(f2fp):
if lineno+1 < hunk.startsrc:
continue
elif lineno+1 == hunk.startsrc:
hunkfind = [x[1:].rstrip("\r\n") for x in hunk.text if x[0] in " -"]
hunkreplace = [x[1:].rstrip("\r\n") for x in hunk.text if x[0] in " +"]
#pprint(hunkreplace)
hunklineno = 0
# todo \ No newline at end of file
# check hunks in source file
if lineno+1 < hunk.startsrc+len(hunkfind)-1:
if line.rstrip("\r\n") == hunkfind[hunklineno]:
hunklineno+=1
else:
debug("hunk no.%d doesn't match source file %s" % (hunkno+1, filename))
# file may be already patched, but we will check other hunks anyway
hunkno += 1
if hunkno < len(self.hunks[fileno]):
hunk = self.hunks[fileno][hunkno]
continue
else:
break
# check if processed line is the last line
if lineno+1 == hunk.startsrc+len(hunkfind)-1:
debug("file %s hunk no.%d -- is ready to be patched" % (filename, hunkno+1))
hunkno+=1
validhunks+=1
if hunkno < len(self.hunks[fileno]):
hunk = self.hunks[fileno][hunkno]
else:
if validhunks == len(self.hunks[fileno]):
# patch file
canpatch = True
break
else:
if hunkno < len(self.hunks[fileno]):
warning("premature end of source file %s at hunk %d" % (filename, hunkno+1))
f2fp.close()
if validhunks < len(self.hunks[fileno]):
if self._match_file_hunks(filename, self.hunks[fileno]):
warning("already patched %s" % filename)
else:
warning("source file is different - %s" % filename)
if canpatch:
backupname = filename+".orig"
if exists(backupname):
warning("can't backup original file to %s - aborting" % backupname)
else:
import shutil
shutil.move(filename, backupname)
if self.write_hunks(backupname, filename, self.hunks[fileno]):
info("successfully patched %s" % filename)
unlink(backupname)
else:
warning("error patching file %s" % filename)
shutil.copy(filename, filename+".invalid")
warning("invalid version is saved to %s" % filename+".invalid")
# todo: proper rejects
shutil.move(backupname, filename)
# todo: check for premature eof
def can_patch(self, filename):
""" Check if specified filename can be patched. Returns None if file can
not be found among source filenames. False if patch can not be applied
clearly. True otherwise.
:returns: True, False or None
"""
idx = self._get_file_idx(filename, source=True)
if idx == None:
return None
return self._match_file_hunks(filename, self.hunks[idx])
def _match_file_hunks(self, filepath, hunks):
matched = True
fp = open(abspath(filepath))
class NoMatch(Exception):
pass
lineno = 1
line = fp.readline()
hno = None
try:
for hno, h in enumerate(hunks):
# skip to first line of the hunk
while lineno < h.starttgt:
if not len(line): # eof
debug("check failed - premature eof before hunk: %d" % (hno+1))
raise NoMatch
line = fp.readline()
lineno += 1
for hline in h.text:
if hline.startswith("-"):
continue
if not len(line):
debug("check failed - premature eof on hunk: %d" % (hno+1))
# todo: \ No newline at the end of file
raise NoMatch
if line.rstrip("\r\n") != hline[1:].rstrip("\r\n"):
debug("file is not patched - failed hunk: %d" % (hno+1))
raise NoMatch
line = fp.readline()
lineno += 1
except NoMatch:
matched = False
# todo: display failed hunk, i.e. expected/found
fp.close()
return matched
def patch_stream(self, instream, hunks):
""" Generator that yields stream patched with hunks iterable
Converts lineends in hunk lines to the best suitable format
autodetected from input
"""
# todo: At the moment substituted lineends may not be the same
# at the start and at the end of patching. Also issue a
# warning/throw about mixed lineends (is it really needed?)
hunks = iter(hunks)
srclineno = 1
lineends = {'\n':0, '\r\n':0, '\r':0}
def get_line():
"""
local utility function - return line from source stream
collecting line end statistics on the way
"""
line = instream.readline()
# 'U' mode works only with text files
if line.endswith("\r\n"):
lineends["\r\n"] += 1
elif line.endswith("\n"):
lineends["\n"] += 1
elif line.endswith("\r"):
lineends["\r"] += 1
return line
for hno, h in enumerate(hunks):
debug("hunk %d" % (hno+1))
# skip to line just before hunk starts
while srclineno < h.startsrc:
yield get_line()
srclineno += 1
for hline in h.text:
# todo: check \ No newline at the end of file
if hline.startswith("-") or hline.startswith("\\"):
get_line()
srclineno += 1
continue
else:
if not hline.startswith("+"):
get_line()
srclineno += 1
line2write = hline[1:]
# detect if line ends are consistent in source file
if sum([bool(lineends[x]) for x in lineends]) == 1:
newline = [x for x in lineends if lineends[x] != 0][0]
yield line2write.rstrip("\r\n")+newline
else: # newlines are mixed
yield line2write
for line in instream:
yield line
def write_hunks(self, srcname, tgtname, hunks):
src = open(srcname, "r")
tgt = open(tgtname, "w")
debug("processing target file %s" % tgtname)
tgt.writelines(self.patch_stream(src, hunks))
tgt.close()
src.close()
return True
def _get_file_idx(self, filename, source=None):
""" Detect index of given filename within patch.
:param filename:
:param source: search filename among sources (True),
targets (False), or both (None)
:returns: int or None
"""
filename = abspath(filename)
if source == True or source == None:
for i,fnm in enumerate(self.source):
if filename == abspath(fnm):
return i
if source == False or source == None:
for i,fnm in enumerate(self.target):
if filename == abspath(fnm):
return i
if __name__ == "__main__":
from optparse import OptionParser
from os.path import exists
import sys
opt = OptionParser(usage="%prog [options] unipatch-file", version="python-patch %s" % __version__)
opt.add_option("-d", "--debug", action="store_true", dest="debugmode", help="Print debugging messages")
opt.add_option("-q", "--quiet", action="store_true", dest="quiet", help="Only print messages on warning/error")
(options, args) = opt.parse_args()
if not args:
opt.print_version()
opt.print_help()
sys.exit()
debugmode = options.debugmode
patchfile = args[0]
if not exists(patchfile) or not isfile(patchfile):
sys.exit("patch file does not exist - %s" % patchfile)
if debugmode:
loglevel = logging.DEBUG
logformat = "%(levelname)8s %(message)s"
elif options.quiet:
loglevel = logging.WARN
logformat = "%(message)s"
else:
loglevel = logging.INFO
logformat = "%(message)s"
logger.setLevel(loglevel)
loghandler.setFormatter(logging.Formatter(logformat))
patch = fromfile(patchfile)
#pprint(patch)
patch.apply()
# todo: document and test line ends handling logic - patch.py detects proper line-endings
# for inserted hunks and issues a warning if patched file has incosistent line ends