@@ -1,3 +1,4 @@
+import logging
import os
import threading
import _fsevents
@@ -10,11 +11,33 @@
IN_MOVED_FROM = 0x 00000040
IN_MOVED_TO = 0x 00000080
+# flags from FSEvents to match event types:
+FSE_CREATED_FLAG = 0x 0100
+FSE_MODIFIED_FLAG = 0x 1000
+FSE_REMOVED_FLAG = 0x 0200
+FSE_RENAMED_FLAG = 0x 0800
+
+
+# loggin
+def logger_init ():
+ log = logging.getLogger(" fsevents" )
+ console_handler = logging.StreamHandler()
+ console_handler.setFormatter(
+ logging.Formatter(" [%(asctime)s %(name)s %(levelname)s ] %(message)s " ))
+ log.addHandler(console_handler)
+ log.setLevel(20 )
+ return log
+
+log = logger_init()
+
+
class Observer (threading .Thread ):
event = None
runloop = None
- def __init__ (self ):
+ def __init__ (self , latency = 0.01 , process_asap = False ):
+ self .process_asap = process_asap
+ self .latency = latency
self .streams = set ()
self .schedulings = {}
self .lock = threading.Lock()
@@ -45,28 +68,47 @@ def run(self):
def _schedule (self , stream ):
if not stream.paths:
- raise ValueError (" No paths to observe." )
+ msg = " No paths to observe."
+ log.error(msg)
+ raise ValueError (msg)
+
if stream.file_events:
callback = FileEventCallback(stream.callback, stream.paths)
else :
def callback (paths , masks ):
for path, mask in zip (paths, masks):
stream.callback(path, mask)
- _fsevents.schedule(self , stream, callback, stream.paths)
+ _fsevents.schedule(self , stream, callback, stream.paths,
+ stream.raw_file_events, latency = self .latency)
def schedule (self , stream ):
- self .lock.acquire()
- try :
+
+ def schedule_callback () :
if self .streams is None :
self ._schedule(stream)
elif stream in self .streams:
- raise ValueError (" Stream already scheduled." )
+ msg = " Stream already scheduled."
+ log.error(msg)
+ raise ValueError (msg)
else :
self .streams.add(stream)
if self .event is not None :
self .event.set()
- finally :
- self .lock.release()
+ if self .process_asap and self .is_alive():
+ while self .streams is not None :
+ pass
+
+ # decide if we want to block and thefore listen for events before all
+ # streams have been added or do the oposite
+ if self .process_asap:
+ log.debug(' Processing events asap' )
+ schedule_callback()
+ else :
+ self .lock.acquire()
+ try :
+ schedule_callback()
+ finally :
+ self .lock.release()
def unschedule (self , stream ):
self .lock.acquire()
@@ -86,9 +128,11 @@ def stop(self):
self .event = None
event.set()
+
class Stream (object ):
def __init__ (self , callback , * paths , ** options ):
file_events = options.pop(' file_events' , False )
+ raw_file_events = options.pop(' raw_file_events' , False )
assert len (options) == 0 , " Invalid option(s): %s " % repr (options.keys())
for path in paths:
if not isinstance (path, str ):
@@ -98,6 +142,8 @@ def __init__(self, callback, *paths, **options):
self .callback = callback
self .paths = list (paths)
self .file_events = file_events
+ self .raw_file_events = raw_file_events
+
class FileEvent (object ):
__slots__ = ' mask' , ' cookie' , ' name'
@@ -110,6 +156,7 @@ def __init__(self, mask, cookie, name):
def __repr__ (self ):
return repr ((self .mask, self .cookie, self .name))
+
class FileEventCallback (object ):
def __init__ (self , callback , paths ):
self .snapshots = {}
@@ -122,7 +169,9 @@ def __call__(self, paths, masks):
events = []
deleted = {}
- for path in sorted (paths):
+ paths_masks = zip (paths, masks)
+ log.debug(' Processing paths with masks:%s ' , paths_masks)
+ for path, mask in sorted (paths_masks):
path = path.rstrip(' /' )
snapshot = self .snapshots[path]
@@ -138,21 +187,32 @@ def __call__(self, paths, masks):
pass
observed = set (current)
-
for name, snap_stat in snapshot.items():
filename = os.path.join(path, name)
if name in observed:
+ log.debug(' File "%s " is observed' )
stat = current[name]
- if stat.st_mtime > snap_stat.st_mtime:
- events.append(FileEvent(IN_MODIFY , None , filename))
- elif stat.st_ctime > snap_stat.st_ctime:
- events.append(FileEvent(IN_ATTRIB , None , filename))
+ if stat.st_mtime != snap_stat.st_mtime:
+ event = FileEvent(IN_MODIFY , None , filename)
+ log.debug(' Appending event "%s "' , event)
+ events.append(event)
+ if not mask & FSE_MODIFIED_FLAG :
+ log.debug(" No matching flag for detected modify" )
+ elif stat.st_ctime != snap_stat.st_ctime:
+ event = FileEvent(IN_ATTRIB , None , filename)
+ log.debug(' Appending event "%s "' , event)
+ events.append(event)
observed.discard(name)
else :
event = FileEvent(IN_DELETE , None , filename)
deleted[snap_stat.st_ino] = event
+ log.debug(' Appending event "%s "' , event)
events.append(event)
+ if ((not mask & FSE_REMOVED_FLAG ) and
+ (not mask & FSE_RENAMED_FLAG )):
+ log.debug(" delete detected with no "
+ " delete or rename flag" )
for name in observed:
stat = current[name]
@@ -163,15 +223,31 @@ def __call__(self, paths, masks):
self .cookie += 1
event.mask = IN_MOVED_FROM
event.cookie = self .cookie
- event = FileEvent(IN_MOVED_TO , self .cookie, filename)
+ moved_to_event = FileEvent(IN_MOVED_TO , self .cookie,
+ filename)
+ log.debug(' Appending event "%s "' , event)
+ events.append(moved_to_event)
+ if not mask & FSE_RENAMED_FLAG :
+ log.debug(' Rename detected without matching flag' )
else :
- event = FileEvent(IN_CREATE , None , filename)
+ in_create_event = FileEvent(IN_CREATE , None , filename)
+ log.debug(' Appending event "%s "' , in_create_event)
+ events.append(in_create_event)
+ modified_event = FileEvent(IN_MODIFY , None , filename)
+ log.debug(' Appending event "%s "' , modified_event)
+ events.append(modified_event)
+
+ if not mask & FSE_MODIFIED_FLAG :
+ log.debug(' Adding IN_MODIFY event when the flag was'
+ ' missing. Possible reason was a copy.' )
+
+ if not mask & FSE_CREATED_FLAG :
+ log.debug(" Create detected from snapshot"
+ " but event is not marked as create" )
if os.path.isdir(filename):
self .snapshot(filename)
- events.append(event)
-
snapshot.clear()
snapshot.update(current)
this does not look safe at all