Skip to content

Commit

Permalink
Merge pull request #11 from aldanor/better-pytest-watch
Browse files Browse the repository at this point in the history
Multiple directories support, norecursedirs, passing args to pytest
  • Loading branch information
joeyespo committed Jun 13, 2015
2 parents cd236e6 + 0e6e14a commit de5d34d
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 48 deletions.
49 changes: 36 additions & 13 deletions pytest_watch/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,27 @@
Implements the command-line interface for pytest-watch.
All positional arguments after `--` are passed directly to py.test executable.
Usage:
ptw [options] [<directory>]
ptw [options] [-v | -q] [<directories>...] [-- <args>...]
Options:
-h --help Show this help.
--version Show version.
--ignore=<dirs> Command-separated list of directories to ignore when descending
(if relative: starting from the root of each watched dir).
-c --clear Automatically clear the screen before each run.
--onpass=<cmd> Run arbitrary programs on pass.
--onfail=<cmd> Run arbitrary programs on failure.
--onpass=<cmd> Run arbitrary command on pass.
--onfail=<cmd> Run arbitrary command on failure.
--nobeep Do not beep on failure.
--poll Use polling instead of events (useful in VMs)
-p --poll Use polling instead of events (useful in VMs).
--ext=<exts> Comma-separated list of file extensions that trigger a
new test run when changed (default: .py)
new test run when changed (default: .py).
--no-spool Disable event spooling (default: 200ms cooldown).
-v --verbose Increase verbosity of the output.
-q --quiet Decrease verbosity of the output.
"""

import sys
Expand All @@ -33,14 +40,30 @@ def main(argv=None):
"""The entry point of the application."""
colorama.init()

if argv is None:
argv = sys.argv[1:]
usage = '\n\n\n'.join(__doc__.split('\n\n\n')[1:])
usage = __doc__[__doc__.find('Usage:'):]
version = 'pytest-watch ' + __version__

argv = argv if argv is not None else sys.argv[1:]
args = docopt(usage, argv=argv, version=version)

extensions = args['--ext'].split(',') if args['--ext'] else []
return watch(args['<directory>'], args['--clear'], not args['--nobeep'],
args['--onpass'], args['--onfail'], args['--poll'],
extensions)
directories, pytest_args = args['<directories>'], []
if '--' in directories:
ix = directories.index('--')
directories, pytest_args = directories[:ix], directories[(ix + 1):]

ignore = (args['--ignore'] or '').split(',')
extensions = [
'.' * (not ext.startswith('.')) + ext
for ext in (args['--ext'] or '.py').split(',')
]

return watch(directories=directories,
ignore=ignore,
auto_clear=args['--clear'],
beep_on_failure=not args['--nobeep'],
onpass=args['--onpass'],
onfail=args['--onfail'],
poll=args['--poll'],
extensions=extensions,
args=pytest_args,
spool=not args['--no-spool'],
verbose=1 - bool(args['--quiet']) + bool(args['--verbose']))
42 changes: 42 additions & 0 deletions pytest_watch/spooler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8

from multiprocessing import Queue, Process, Event


class Timer(Process):
def __init__(self, interval, function, args=[], kwargs={}):
super(Timer, self).__init__()
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()

def cancel(self):
self.finished.set()

def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()


class EventSpooler(object):
def __init__(self, cooldown, callback):
self.cooldown = cooldown
self.callback = callback
self.inbox = Queue()
self.outbox = Queue()

def enqueue(self, event):
self.inbox.put(event)
Timer(self.cooldown, self.process).start()

def process(self):
self.outbox.put(self.inbox.get())
if self.inbox.empty():
events = []
while not self.outbox.empty():
events.append(self.outbox.get())
self.callback(events)
138 changes: 103 additions & 35 deletions pytest_watch/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,88 @@
import time
import subprocess

from colorama import Fore
from .spooler import EventSpooler

from colorama import Fore, Style
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from watchdog.events import (FileSystemEventHandler, FileModifiedEvent,
FileCreatedEvent, FileMovedEvent, FileDeletedEvent)


EVENT_NAMES = {
FileModifiedEvent: 'modified',
FileCreatedEvent: 'created',
FileMovedEvent: 'moved',
FileDeletedEvent: 'deleted',
}
WATCHED_EVENTS = list(EVENT_NAMES)
DEFAULT_EXTENSIONS = ['.py']
CLEAR_COMMAND = 'cls' if os.name == 'nt' else 'clear'
BEEP_CHARACTER = '\a'
STYLE_NORMAL = Fore.WHITE + Style.NORMAL + Style.DIM
STYLE_HIGHLIGHT = Fore.CYAN + Style.NORMAL + Style.BRIGHT


class ChangeHandler(FileSystemEventHandler):
"""Listens for changes to files and re-runs tests after each change."""
def __init__(self, directory=None, auto_clear=False, beep_on_failure=True,
onpass=None, onfail=None, extensions=[]):
def __init__(self, auto_clear=False, beep_on_failure=True,
onpass=None, onfail=None, extensions=[], args=None,
spool=True, verbose=1):
super(ChangeHandler, self).__init__()
self.directory = os.path.abspath(directory or '.')
self.auto_clear = auto_clear
self.beep_on_failure = beep_on_failure
self.onpass = onpass
self.onfail = onfail
self.extensions = extensions or DEFAULT_EXTENSIONS
self.args = args or []
self.spooler = None
if spool:
self.spooler = EventSpooler(0.2, self.on_queued_events)
self.verbose = verbose

def on_queued_events(self, events):
summary = []
for event in events:
paths = [event.src_path]
if isinstance(event, FileMovedEvent):
paths.append(event.dest_path)
event_name = EVENT_NAMES[type(event)]
paths = tuple(map(os.path.relpath, paths))
if any(os.path.splitext(path)[1].lower() in self.extensions for path in paths):
summary.append((event_name, paths))
if summary:
self.run(sorted(set(summary)))

def on_any_event(self, event):
if event.is_directory:
return
ext = os.path.splitext(event.src_path)[1].lower()
if ext in self.extensions:
self.run(event.src_path)

def run(self, filename=None):
"""Called when a file is changed to re-run the tests with nose."""
if self.auto_clear:
subprocess.call(CLEAR_COMMAND, cwd=self.directory, shell=True)
elif filename:
print()
print(Fore.CYAN + 'Change detected in ' + filename + Fore.RESET)
print()
print('Running unit tests...')
if isinstance(event, tuple(WATCHED_EVENTS)):
if self.spooler is not None:
self.spooler.enqueue(event)
else:
self.on_queued_events([event])

def run(self, summary=None):
"""Called when a file is changed to re-run the tests with py.test."""
if self.auto_clear:
subprocess.call(CLEAR_COMMAND)
command = ' '.join(['py.test'] + self.args)
if summary and not self.auto_clear:
print()
exit_code = subprocess.call('py.test', cwd=self.directory, shell=True)
if self.verbose:
highlight = lambda arg: STYLE_HIGHLIGHT + arg + STYLE_NORMAL
msg = 'Running: {}'.format(highlight(command))
if summary:
if self.verbose > 1:
msg = 'Changes detected in files:\n{}\n\nRerunning: {}'.format(
'\n'.join(' {:9s}'.format(event_name + ':') + ' ' +
' -> '.join(map(highlight, paths))
for event_name, paths in summary),
highlight(command)
)
else:
msg = 'Changes detected, rerunning: {}'.format(highlight(command))
print(STYLE_NORMAL + msg + Fore.RESET + Style.NORMAL)
exit_code = subprocess.call(['py.test'] + self.args)
passed = exit_code == 0

# Beep if failed
Expand All @@ -59,19 +99,44 @@ def run(self, filename=None):
os.system(self.onfail)


def watch(directory=None, auto_clear=False, beep_on_failure=True,
onpass=None, onfail=None, poll=False, extensions=[]):
"""
Starts a server to render the specified file or directory
containing a README.
"""
if directory and not os.path.isdir(directory):
raise ValueError('Directory not found: ' + directory)
directory = os.path.abspath(directory or '')
def watch(directories=[], ignore=[], auto_clear=False, beep_on_failure=True,
onpass=None, onfail=None, poll=False, extensions=[], args=[],
spool=True, verbose=1):
if not directories:
directories = ['.']
directories = [os.path.abspath(directory) for directory in directories]
for directory in directories:
if not os.path.isdir(directory):
raise ValueError('Directory not found: ' + directory)
recursive_dirs = directories
non_recursive_dirs = []
if ignore:
non_recursive_dirs = []
recursive_dirs = []
for directory in directories:
subdirs = [
os.path.join(directory, d)
for d in os.listdir(directory)
if os.path.isdir(d)
]
ok_subdirs = [
subd for subd in subdirs
if not any(os.path.samefile(os.path.join(directory, d), subd)
for d in ignore)
]
if len(subdirs) == len(ok_subdirs):
recursive_dirs.append(directory)
else:
non_recursive_dirs.append(directory)
recursive_dirs.extend(ok_subdirs)

recursive_dirs = sorted(set(recursive_dirs))
non_recursive_dirs = sorted(set(non_recursive_dirs))

# Initial run
event_handler = ChangeHandler(directory, auto_clear, beep_on_failure,
onpass, onfail, extensions)
event_handler = ChangeHandler(auto_clear, beep_on_failure,
onpass, onfail, extensions, args,
spool, verbose)
event_handler.run()

# Setup watchdog
Expand All @@ -80,13 +145,16 @@ def watch(directory=None, auto_clear=False, beep_on_failure=True,
else:
observer = Observer()

observer.schedule(event_handler, path=directory, recursive=True)
observer.start()
for directory in recursive_dirs:
observer.schedule(event_handler, path=directory, recursive=True)
for directory in non_recursive_dirs:
observer.schedule(event_handler, path=directory, recursive=False)

# Watch and run tests until interrupted by user
try:
observer.start()
while True:
time.sleep(1)
observer.join()
except KeyboardInterrupt:
observer.stop()
observer.join()

0 comments on commit de5d34d

Please sign in to comment.