Skip to content
Browse files

New, cached status implementation

This is much faster. On a large testset it takes 2sec vs. 42 in previous implementation
  • Loading branch information...
1 parent 06e288f commit 0718128c8e6b38eb1f3b977b1962be1d97732951 Luis Pedro Coelho committed May 4, 2010
Showing with 241 additions and 38 deletions.
  1. +1 −0 .gitignore
  2. +4 −34 jug/jug.py
  3. +6 −1 jug/options.py
  4. +3 −1 jug/p3.py
  5. 0 jug/subcommands/__init__.py
  6. +225 −0 jug/subcommands/status.py
  7. +2 −2 jug/task.py
View
1 .gitignore
@@ -1,3 +1,4 @@
+.jugstatus.sqlite3
*pyc
jugdata
build
View
38 jug/jug.py
@@ -36,9 +36,9 @@
from . import backends
from .backends import memoize_store
from .task import Task
+from .subcommands.status import status
from .options import print_out
-
def do_print(store):
'''
do_print(store)
@@ -166,37 +166,6 @@ def execute(store, aggressive_unload=False):
if not task_names:
print_out('<no tasks>')
-def status(store):
- '''
- status(store)
-
- Implements the status command.
- '''
- Task.store = memoize_store(store)
- task_names = set(t.name for t in task.alltasks)
- tasks = task.alltasks
- tasks_ready = defaultdict(int)
- tasks_finished = defaultdict(int)
- tasks_running = defaultdict(int)
- tasks_waiting = defaultdict(int)
- for t in tasks:
- if t.can_load():
- tasks_finished[t.name] += 1
- elif t.can_run():
- if t.is_locked():
- tasks_running[t.name] += 1
- else:
- tasks_ready[t.name] += 1
- else:
- tasks_waiting[t.name] += 1
-
- print_out('%-40s%12s%12s%12s%12s' % ('Task name','Waiting','Ready','Finished','Running'))
- print_out('-' * (40+12+12+12+12))
- for t in task_names:
- print_out('%-40s%12s%12s%12s%12s' % (t[:40],tasks_waiting[t],tasks_ready[t],tasks_finished[t],tasks_running[t]))
- print_out('.' * (40+12+12+12+12))
- print_out('%-40s%12s%12s%12s%12s' % ('Total:',sum(tasks_waiting.values()),sum(tasks_ready.values()),sum(tasks_finished.values()),sum(tasks_running.values())))
- print_out()
def cleanup(store):
'''
@@ -283,14 +252,15 @@ def _sigterm(_,__):
def main():
options.parse()
- store,jugmodule = init(options.jugfile, options.jugdir)
+ if options.cmd != 'status':
+ store,jugmodule = init(options.jugfile, options.jugdir)
if options.cmd == 'execute':
execute(store, options.aggressive_unload)
elif options.cmd == 'count':
do_print(store)
elif options.cmd == 'status':
- status(store)
+ status()
elif options.cmd == 'invalidate':
invalidate(store, options.invalid_name)
elif options.cmd == 'cleanup':
View
7 jug/options.py
@@ -34,6 +34,7 @@
from __future__ import division
import logging
import string
+import sys
from .p3 import nprint
@@ -44,6 +45,7 @@
invalid_name = None
argv = None
print_out = nprint
+status_mode = 'cached'
_Commands = ('execute','status','stats','cleanup','count','invalidate','shell')
_Usage_string = \
@@ -85,12 +87,13 @@ def parse():
Parse the command line options and set the option variables.
'''
import optparse
- global jugdir, jugfile, cmd, aggressive_unload, invalid_name, argv
+ global jugdir, jugfile, cmd, aggressive_unload, invalid_name, argv, status_mode
parser = optparse.OptionParser()
parser.add_option('--aggressive-unload',action='store_true',dest='aggressive_unload',default=False)
parser.add_option('--invalid',action='store',dest='invalid_name',default=None)
parser.add_option('--jugdir',action='store',dest='jugdir',default='jugdata/')
parser.add_option('--verbose',action='store',dest='verbosity',default='QUIET')
+ parser.add_option('--no-cache',action='store_true',dest='cache',default=False)
options,args = parser.parse_args()
if not args:
usage()
@@ -124,6 +127,8 @@ def parse():
invalid_name = options.invalid_name
argv = args
sys.argv = args
+ status_mode = ('no-cached' if options.cache else 'cached')
jugdir = options.jugdir
+
# vim: set ts=4 sts=4 sw=4 expandtab smartindent:
View
4 jug/p3.py
@@ -28,4 +28,6 @@ def nprint(*args):
Works like Python3k's print
'''
- print args
+ for arg in args:
+ print arg,
+ print
View
0 jug/subcommands/__init__.py
No changes.
View
225 jug/subcommands/status.py
@@ -0,0 +1,225 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+# Copyright (C) 2008-2010, Luis Pedro Coelho <lpc@cmu.edu>
+# vim: set ts=4 sts=4 sw=4 expandtab smartindent:
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+from collections import defaultdict
+import sqlite3
+
+import jug
+from ..task import recursive_dependencies
+from .. import task
+from .. import backends
+from .. import options
+from ..task import Task
+from ..options import print_out
+from ..backends import memoize_store
+
+unknown,waiting,ready,running,finished = range(5)
+_jugstatus_file = '.jugstatus.sqlite3'
+
+def _create_sqlite3(ht, deps, rdeps):
+ connection = sqlite3.connect(_jugstatus_file)
+ connection.execute('''
+ CREATE TABLE ht (
+ id INTEGER PRIMARY KEY,
+ name CHAR(128),
+ hash CHAR(128),
+ status INT );
+ ''')
+ connection.execute('''
+ CREATE TABLE dep (
+ source INT,
+ target INT);
+ ''')
+
+ for h in ht:
+ connection.execute('''
+ INSERT INTO ht(name, hash, status) VALUES(?,?,?)
+ ''', (h[1], h[2], h[3]))
+
+ for i,cdeps in deps.iteritems():
+ for cd in cdeps:
+ connection.execute('''
+ INSERT INTO dep(source, target) VALUES(?,?)
+ ''', (i,cd))
+ connection.commit()
+ connection.close()
+
+def _retrieve_sqlite3():
+ connection = sqlite3.connect(_jugstatus_file)
+ cursor = connection.execute('''SELECT * FROM ht''')
+ ht = cursor.fetchall()
+ cursor = connection.execute('''SELECT * FROM dep''')
+ deps = defaultdict(list)
+ rdeps = defaultdict(list)
+ for d0,d1 in cursor:
+ deps[d0].append(d1)
+ rdeps[d1].append(d0)
+ return ht, deps, rdeps
+
+def _save_dirty3(dirty):
+ connection = sqlite3.connect(_jugstatus_file)
+ for id,status in dirty:
+ connection.execute('''UPDATE ht SET STATUS = ? WHERE id = ?''', (status, id))
+ connection.commit()
+
+
+def _load_jugfile():
+ store,_ = jug.init(options.jugfile, options.jugdir)
+ h2idx = {}
+ ht = []
+ deps = {}
+ rdeps = {}
+ for i,t in enumerate(task.alltasks):
+ hash = t.hash()
+ curdeps = [h2idx[dep.hash()] for dep in recursive_dependencies(t, 1)]
+ if curdeps:
+ deps[i] = curdeps
+ h = [i, t.name, hash, unknown]
+ ht.append(h)
+ h2idx[hash] = i
+
+ for k,v in deps.iteritems():
+ for rv in v:
+ if rv not in rdeps:
+ rdeps[rv] = [k]
+ else:
+ rdeps[rv].append(k)
+ return store, ht, deps, rdeps
+
+
+def _update_status(store, ht, deps, rdeps):
+ tasks_waiting = defaultdict(int)
+ tasks_ready = defaultdict(int)
+ tasks_running = defaultdict(int)
+ tasks_finished = defaultdict(int)
+
+ store = memoize_store(store)
+ dirty = []
+ active = []
+ for h in ht:
+ _,name,hash,status = h
+ if status == finished:
+ tasks_finished[name] += 1
+ else:
+ active.append(h)
+
+ for i,name,hash,status in active:
+ nstatus = None
+ if store.can_load(hash):
+ tasks_finished[name] += 1
+ nstatus = finished
+ else:
+ can_run = True
+ for dep in deps.get(i, []):
+ _,_,dhash,dstatus = ht[dep]
+ if dstatus != finished and not store.can_load(dhash):
+ can_run = False
+ break
+ if can_run:
+ lock = store.getlock(hash)
+ if lock.is_locked():
+ tasks_running[name] += 1
+ nstatus = running
+ else:
+ tasks_ready[name] += 1
+ nstatus = ready
+ else:
+ tasks_waiting[name] += 1
+ nstatus = waiting
+ assert nstatus is not None, '_update_status: nstatus not assigned'
+ if status != nstatus:
+ dirty.append((i,nstatus))
+ return tasks_waiting, tasks_ready, tasks_running, tasks_finished, dirty
+
+
+def _print_status(waiting, ready, running, finished):
+ names = set(waiting.keys())
+ names.update(ready.keys())
+ names.update(running.keys())
+ names.update(finished.keys())
+ format = '%-40s%12s%12s%12s%12s'
+ format_size = 40 +12 +12 +12 +12
+ print_out(format % ('Task name','Waiting','Ready','Finished','Running'))
+ print_out('-' * format_size)
+
+ for n in names:
+ n_cut = n[:40]
+ print_out(format % (n_cut,waiting[n],ready[n],finished[n],running[n]))
+ print_out('.' * format_size)
+ print_out(format % ('Total:',sum(waiting.values()),sum(ready.values()),sum(finished.values()),sum(running.values())))
+ print_out()
+
+
+def _status_cached():
+ create, update = range(2)
+ try:
+ ht, deps, rdeps = _retrieve_sqlite3()
+ store = backends.select(options.jugdir)
+ mode = update
+ except:
+ store, ht, deps, rdeps = _load_jugfile()
+ mode = create
+
+ tw,tre,tru,tf,dirty = _update_status(store, ht, deps, rdeps)
+ _print_status(tw, tre, tru, tf)
+ if mode == update:
+ _save_dirty3(dirty)
+ else:
+ for i,nstatus in dirty:
+ _,name,hash,_ = ht[i]
+ ht[i] = (i, name, hash, nstatus)
+ _create_sqlite3(ht, deps, rdeps)
+
+
+def _status_nocache():
+ store,jugmodule = jug.init(options.jugfile, options.jugdir)
+ Task.store = memoize_store(store)
+
+ task_names = set(t.name for t in task.alltasks)
+ tasks_waiting = defaultdict(int)
+ tasks_ready = defaultdict(int)
+ tasks_running = defaultdict(int)
+ tasks_finished = defaultdict(int)
+ for t in task.alltasks:
+ if t.can_load():
+ tasks_finished[t.name] += 1
+ elif t.can_run():
+ if t.is_locked():
+ tasks_running[t.name] += 1
+ else:
+ tasks_ready[t.name] += 1
+ else:
+ tasks_waiting[t.name] += 1
+ _print_status(tasks_waiting, tasks_ready, tasks_running, tasks_finished)
+
+
+def status():
+ '''
+ status(store)
+
+ Implements the status command.
+ '''
+ if options.status_mode == 'cached':
+ _status_cached()
+ else:
+ _status_nocache()
View
4 jug/task.py
@@ -289,16 +289,16 @@ def recursive_dependencies(t, max_level=-1):
if type(t) is dict:
t = t.itervalues()
for d in t:
+ if type(d) is Task:
+ yield d
for dd in recursive_dependencies(d, max_level):
yield dd
elif type(t) is Task:
- yield t
if max_level:
for dep in itertools.chain(t.dependencies, t.kwdependencies.itervalues()):
for d in recursive_dependencies(dep, max_level-1):
yield d
-
def value(elem):
'''
value = value(task)

0 comments on commit 0718128

Please sign in to comment.
Something went wrong with that request. Please try again.