Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added optoin to rerun script to issue multiple concurrent tasks

  • Loading branch information...
commit 4c3970d8671010b4a918bb1577d2d49b546c9584 1 parent f77f842
Jeff Terrace jterrace authored
Showing with 48 additions and 14 deletions.
  1. +48 −14 scripts/rerun.py
62 scripts/rerun.py
View
@@ -9,33 +9,58 @@
'generate_metadata' : 'celery_tasks.generate_metadata.generate_metadata',
'generate_progressive' : 'celery_tasks.generate_progressive.generate_progressive'}
+running_tasks = []
+NUM_CONCURRENT_TASKS = None
+
+def emit_finished_tasks():
+ global running_tasks
+
+ tokeep = []
+ for (t, task_string) in running_tasks:
+ if t.state == 'SUCCESS' or t.state == 'FAILURE':
+ print 'Completed', task_string,
+ print t.state
+ if t.state == 'FAILED':
+ print 'Printing exception:'
+ print
+ print str(t.result)
+ print
+ else:
+ tokeep.append((t,task_string))
+
+ running_tasks = tokeep
+
+def wait_if_needed():
+ while len(running_tasks) >= NUM_CONCURRENT_TASKS:
+ emit_finished_tasks()
+ time.sleep(1)
+
+def wait_all():
+ while len(running_tasks) > 0:
+ emit_finished_tasks()
+ time.sleep(1)
+
def do_task(taskname, path, type, timestamp=None):
- print 'Issuing %s task for %s type=%s timestamp=%s...' % (taskname, path, type, str(timestamp)),
- sys.stdout.flush()
+ task_string = '%s task for %s type=%s timestamp=%s' % (taskname, path, type, str(timestamp))
+ print 'Issuing', task_string
t = send_task(task_names[taskname], args=[path, type])
- t.wait(propagate=False)
- print t.state
- if t.state == 'FAILED':
- print 'Printing exception:'
- print
- print str(t.result)
- print
- time.sleep(1)
+ running_tasks.append((t, task_string))
+ wait_if_needed()
def do_single(task, path, type=None):
metadata = get_file_metadata(path)
types_to_do = []
if type is None:
- for type in metadata['types']:
- types_to_do.append(type)
+ for t in metadata['types']:
+ types_to_do.append(t)
else:
if type not in metadata['types']:
print >> sys.stderr, 'Invalid type', type, 'for path', path
return
types_to_do.append(type)
- for type in types_to_do:
- do_task(task, path, type)
+ for t in types_to_do:
+ do_task(task, path, t)
def do_all(task, timestamp=None, type=None):
next_start = ""
@@ -52,7 +77,10 @@ def do_all(task, timestamp=None, type=None):
do_task(task, path, existing_type, timestamp)
def main():
+ global NUM_CONCURRENT_TASKS
+
parser = argparse.ArgumentParser(description='Reprocess tasks')
+ parser.add_argument('--concurrency', help='number of concurrent outstanding tasks', default=1, type=int)
parser.add_argument('task', help='task to execute', choices=task_names.keys())
subparsers = parser.add_subparsers()
all = subparsers.add_parser('all', help='reprocess all')
@@ -65,10 +93,16 @@ def main():
single.add_argument('--type', help='only reprocess this type of the file')
args = parser.parse_args()
+
+ NUM_CONCURRENT_TASKS = args.concurrency
+
parsing_result = vars(args)
to_execute = parsing_result['func']
del parsing_result['func']
+ del parsing_result['concurrency']
to_execute(**parsing_result)
+
+ wait_all()
def add_dirs():
thisdir = os.path.dirname( os.path.realpath( __file__ ) )
Please sign in to comment.
Something went wrong with that request. Please try again.