Skip to content

Commit

Permalink
Replace time, add acceptable tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam DePrince committed Mar 13, 2013
1 parent 189768f commit f88e7e2
Showing 1 changed file with 53 additions and 26 deletions.
79 changes: 53 additions & 26 deletions autosync/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@
from Queue import Queue
# Duplicate gevent's semantics with thread
def spawn(func, *args, **kwargs):
lock = thread.allocate_lock()
def join_emulator(func, lock, *args, **kwargs):
func(*args, **kwargs)
lock.release()

lock = thread.allocate_lock()
lock.acquire(True)

args = (lock,) + tuple(args)
return thread.start_new_thread(func, args, kwargs)
args = (func, lock) + tuple(args)
thread.start_new_thread(join_emulator, args, kwargs)
return lock

def joinall(locks):
for lock in locks:
lock.acquire(True)
lock.release(True)

lock.release()


import autosync
Expand Down Expand Up @@ -79,39 +79,47 @@ def next(*iters):
results.append(i.next())
except StopIteration:
results.append(None)
if not results:
raise StopIteration
return results

original_sorted = sorted
def sorted(items, *args, **kwargs):
if hasattr(items, 'already_sorted') and items.already_sorted:
return item
return original_sorted(items, *args, **kwargs)
#original_sorted = sorted
#def sorted(items, *args, **kwargs):
# if hasattr(items, 'already_sorted') and items.already_sorted:
# return item
# return original_sorted(items, *args, **kwargs)


def merge(iter_a, iter_b, func_a, func_b, func_both):
next_a, next_b = next(iter_a, iter_b)
while True:
if not all((next_a, next_b)):
break
if next_a.key < next_b.key:
ka = next_a.key.encode('UTF-8')
kb = next_b.key.encode('UTF-8')
if ka < kb:
func_a(next_a)
next_a = next(iter_a)[0]
continue
if next_a.key > next_b.key:
if ka > kb:
func_b(next_b)
next_b = next(iter_b)[0]
continue
if next_a.size != next_b.size:
if next_a.md5 != next_b.md5:
func_both(next_a)
continue
print "Skipping", next_a, next_b
next_a, next_b = next(iter_a, iter_b)


while next_a:
print "Cleaning up a"
next_a = next(iter_a)[0]
if next_a: func_a(next_a)

while next_b:
print "Cleaning up b"
next_b = next(iter_b)[0]
if next_b: func_b(next_b)

Expand All @@ -125,6 +133,13 @@ def __init__(self, actor, files, prefix, que):
self.que = que
self.prefix = prefix

RE = None

def acceptable(self, s):
if self.RE is None:
self.RE = re.compile(FLAGS.source_filter)
return bool(self.RE.match(s))

def upload(self, key):
self.que.put(('u', key))

Expand All @@ -136,22 +151,22 @@ def filename_to_File(self, path):
name = path
if name.startswith(self.prefix):
name = name[len(self.prefix):]
while name.startswith('/'):
name = name[1:]
f = File(name, path)
return f


class SyncState(State):
RE = None

def acceptable(self, s):
if self.RE is None:
self.RE = re.compile(FLAGS.source_filter)
return bool(self.RE.match(s))

def add_path_to_que(self, path):
self.local_que.put(self.filename_to_File(path))
if self.acceptable(path):
try:
path = path.encode('utf-8')
except:
print "Can't encode:", path
return
self.local_que.put(self.filename_to_File(path))
else:
print "Rejecting", path

def walker(self,_ , dirname, fnames):
fnames.sort()
Expand All @@ -175,7 +190,7 @@ def run(self):
iter(sorted(self.actor.list())),
self.upload, self.delete, self.upload)
joinall((self.walker_job,))

print "Done phase 1"

class TrackState(State, ProcessEvent):
mask = EventsCodes.OP_FLAGS['IN_DELETE'] | EventsCodes.OP_FLAGS['IN_CREATE'] | EventsCodes.OP_FLAGS['IN_MODIFY']
Expand All @@ -188,20 +203,28 @@ def __init__(self, *args, **kwargs):
def process_IN_CREATE(self, event):
path = os.path.join(event.path, event.name)
if os.path.isfile(path):
self.upload(self.filename_to_File(path))
if self.acceptable(path):
print "Created", event
self.upload(self.filename_to_File(path))
elif os.path.isdir(path):
print "Created dir", event
self.wm.add_watch(path, self.mask, rec=True)

def process_IN_MODIFY(self, event):
path = os.path.join(event.path, event.name)
if os.path.isfile(path):
self.upload(self.filename_to_File(path))
if self.acceptable(path):
print "Modified", event
self.upload(self.filename_to_File(path))

def process_IN_DELETE(self, event):
path = os.path.join(event.path, event.name)
self.delete(self.filename_to_File(path))
if self.acceptable(path):
print "Deleted", event
self.delete(self.filename_to_File(path))

def run(self):
print "Starting track phase"
for f in self.files:
f = os.path.abspath(f)
self.wm.add_watch(f, self.mask, rec=True)
Expand All @@ -210,7 +233,11 @@ def run(self):
self.notifier.process_events()
if self.notifier.check_events(100):
self.notifier.read_events()
gevent.sleep(0)
if use_gevent:
gevent.sleep(0)
else:
import time
time.sleep(500)
except KeyboardInterrupt:
self.notifier.stop()

Expand Down

0 comments on commit f88e7e2

Please sign in to comment.