Permalink
Browse files

Reload file if realpath changes, inode changes or if the file is trun…

…cated
  • Loading branch information...
1 parent 6fc0a54 commit c2d401fbbfc011439e9f7bed13666127fdf6c388 @silas silas committed Oct 15, 2010
Showing with 86 additions and 30 deletions.
  1. +86 −30 scribe_log
View
@@ -10,17 +10,76 @@ from thrift.transport.TTransport import TTransportException
from thrift.protocol import TBinaryProtocol
from scribe import scribe
-class ScribeLogError(Exception): pass
-
-def tail_file(file):
- while True:
- where = file.tell()
- line = file.readline()
- if not line:
- time.sleep(1.0)
- file.seek(where)
+class Error(Exception): pass
+class FileError(Error): pass
+
+class Tail(object):
+
+ def __init__(self, path, sleep=1.0, reopen_count=5):
+ self.path = path
+ self.sleep = sleep
+ self.reopen_count = reopen_count
+
+ def __iter__(self):
+ while True:
+ pos = self.file.tell()
+ line = self.file.readline()
+ if not line:
+ self.wait(pos)
+ else:
+ yield line
+
+ def open(self, tail=True):
+ try:
+ self.real_path = os.path.realpath(self.path)
+ self.inode = os.stat(self.path).st_ino
+ except OSError, error:
+ raise FileError(error)
+ try:
+ self.file = open(self.real_path)
+ except IOError, error:
+ raise FileError(error)
+ if tail:
+ self.file.seek(0, 2)
+
+ def close(self):
+ try:
+ self.file.close()
+ except Exception:
+ pass
+
+ def reopen(self):
+ self.close()
+ reopen_count = self.reopen_count
+ while reopen_count >= 0:
+ reopen_count -= 1
+ try:
+ self.open(tail=False)
+ return True
+ except FileError:
+ time.sleep(self.sleep)
+ return False
+
+ def check(self, pos):
+ try:
+ if self.real_path != os.path.realpath(self.path):
+ return True
+ stat = os.stat(self.path)
+ if self.inode != stat.st_ino:
+ return True
+ if pos > stat.st_size:
+ return True
+ except OSError:
+ return True
+ return False
+
+ def wait(self, pos):
+ if self.check(pos):
+ if not self.reopen():
+ raise Error('Unable to reopen file: %s' % self.path)
else:
- yield line
+ self.file.seek(pos)
+ time.sleep(self.sleep)
def scribe_fix_legacy():
global scribe
@@ -29,7 +88,7 @@ def scribe_fix_legacy():
return old_log_entry(kwargs)
scribe.LogEntry = new_log_entry
-def handle(file, category, host='127.0.0.1', port=1463, prefix='', postfix=''):
+def handle(path, category, host='127.0.0.1', port=1463, prefix='', postfix=''):
result = 0
socket = TSocket.TSocket(host=host, port=port)
@@ -41,13 +100,13 @@ def handle(file, category, host='127.0.0.1', port=1463, prefix='', postfix=''):
)
client = scribe.Client(iprot=protocol, oprot=protocol)
- transport.open()
-
try:
- log_file = open(file)
+ transport.open()
+
+ tail = Tail(path)
try:
- log_file.seek(0, 2)
- for line in tail_file(log_file):
+ tail.open()
+ for line in tail:
try:
log_entry = scribe.LogEntry(
category=category,
@@ -61,21 +120,19 @@ def handle(file, category, host='127.0.0.1', port=1463, prefix='', postfix=''):
)
result = client.Log(messages=[log_entry])
finally:
- log_file.close()
- except IOError, error:
- raise ScribeLogError('IOError: %s' % error)
-
- try:
- transport.close()
- except:
- pass
+ tail.close()
+ finally:
+ try:
+ transport.close()
+ except Exception:
+ pass
if result == scribe.ResultCode.OK:
pass
elif result == scribe.ResultCode.TRY_LATER:
- raise ScribeLogError('Scribe Error: TRY LATER')
+ raise Error('Scribe Error: TRY LATER')
else:
- raise ScribeLogError('Scribe Error: Unknown error code (%s)' % result)
+ raise Error('Scribe Error: Unknown error code (%s)' % result)
if __name__ == '__main__':
parser = optparse.OptionParser()
@@ -120,13 +177,12 @@ if __name__ == '__main__':
help='add to the end of each log line',
metavar='POSTFIX',
)
-
- (options, args) = parser.parse_args()
+ options, args = parser.parse_args()
if options.file and options.category:
try:
handle(
- file=options.file,
+ path=options.file,
category=options.category,
host=options.host,
port=options.port,
@@ -135,7 +191,7 @@ if __name__ == '__main__':
)
except KeyboardInterrupt:
sys.exit(0)
- except (ScribeLogError, TTransportException), error:
+ except (Error, TTransportException), error:
print >> sys.stderr, error
sys.exit(1)
else:

0 comments on commit c2d401f

Please sign in to comment.