Skip to content
This repository has been archived by the owner on Aug 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #31 from Psycojoker/watcher_testing
Browse files Browse the repository at this point in the history
Fix PEP issues in hamlpy_watcher and add tests
  • Loading branch information
rowanseymour committed Dec 1, 2016
2 parents 13dbeeb + ad0feee commit cc7695b
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
.DS_Store
.idea/
.tox/
.watcher_test/
build/
htmlcov/
env/
Expand Down
57 changes: 33 additions & 24 deletions hamlpy/hamlpy_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
from . import hamlpy
from . import nodes as hamlpynodes

try:
str = unicode
except NameError:
pass

class Options(object):
CHECK_INTERVAL = 3 # in seconds
Expand All @@ -29,8 +25,9 @@ class Options(object):
# dict of compiled files [fullpath : timestamp]
compiled = dict()


class StoreNameValueTagPair(argparse.Action):
def __call__(self, parser, namespace, values, option_string = None):
def __call__(self, parser, namespace, values, option_string=None):
tags = getattr(namespace, 'tags', {})
if tags is None:
tags = {}
Expand All @@ -41,16 +38,24 @@ def __call__(self, parser, namespace, values, option_string = None):
setattr(namespace, 'tags', tags)

arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('-v', '--verbose', help = 'Display verbose output', action = 'store_true')
arg_parser.add_argument('-i', '--input-extension', metavar = 'EXT', default = '.hamlpy', help = 'The file extensions to look for.', type = str, nargs = '+')
arg_parser.add_argument('-ext', '--extension', metavar = 'EXT', default = Options.OUTPUT_EXT, help = 'The output file extension. Default is .html', type = str)
arg_parser.add_argument('-r', '--refresh', metavar = 'S', default = Options.CHECK_INTERVAL, help = 'Refresh interval for files. Default is {} seconds. Ignored if the --once flag is set.'.format(Options.CHECK_INTERVAL), type = int)
arg_parser.add_argument('input_dir', help = 'Folder to watch', type = str)
arg_parser.add_argument('output_dir', help = 'Destination folder', type = str, nargs = '?')
arg_parser.add_argument('--tag', help = 'Add self closing tag. eg. --tag macro:endmacro', type = str, nargs = 1, action = StoreNameValueTagPair)
arg_parser.add_argument('--attr-wrapper', dest = 'attr_wrapper', type = str, choices = ('"', "'"), default = "'", action = 'store', help = "The character that should wrap element attributes. This defaults to ' (an apostrophe).")
arg_parser.add_argument('--jinja', help = 'Makes the necessary changes to be used with Jinja2.', default = False, action = 'store_true')
arg_parser.add_argument('--once', help = 'Runs the compiler once and exits on completion. Returns a non-zero exit code if there were any compile errors.', default = False, action = 'store_true')
arg_parser.add_argument('-v', '--verbose', help='Display verbose output', action='store_true')
arg_parser.add_argument('-i', '--input-extension', metavar='EXT', default='.hamlpy',
help='The file extensions to look for.', type=str, nargs='+')
arg_parser.add_argument('-ext', '--extension', metavar='EXT', default=Options.OUTPUT_EXT,
help='The output file extension. Default is .html', type=str)
arg_parser.add_argument('-r', '--refresh', metavar='S', default=Options.CHECK_INTERVAL, type=int,
help='Refresh interval for files. Default is {} seconds. Ignored if the --once flag is set.'.format(Options.CHECK_INTERVAL))
arg_parser.add_argument('input_dir', help='Folder to watch', type=str)
arg_parser.add_argument('output_dir', help='Destination folder', type=str, nargs='?')
arg_parser.add_argument('--tag', type=str, nargs=1, action=StoreNameValueTagPair,
help='Add self closing tag. eg. --tag macro:endmacro')
arg_parser.add_argument('--attr-wrapper', dest='attr_wrapper', type=str, choices=('"', "'"), default="'", action='store',
help="The character that should wrap element attributes. This defaults to ' (an apostrophe).")
arg_parser.add_argument('--jinja', default=False, action='store_true',
help='Makes the necessary changes to be used with Jinja2.')
arg_parser.add_argument('--once', default=False, action='store_true',
help='Runs the compiler once and exits on completion. Returns a non-zero exit code if there were any compile errors.')


def watched_extension(extension):
"""Return True if the given extension is one of the watched extensions"""
Expand All @@ -59,9 +64,9 @@ def watched_extension(extension):
return True
return False


def watch_folder():
"""Main entry point. Expects one or two arguments (the watch folder + optional destination folder)."""
argv = sys.argv[1:] if len(sys.argv) > 1 else []
args = arg_parser.parse_args(sys.argv[1:])
compiler_args = {}

Expand Down Expand Up @@ -95,9 +100,9 @@ def watch_folder():
hamlpynodes.TagNode.may_contain.pop(k, None)

hamlpynodes.TagNode.self_closing.update({
'macro' : 'endmacro',
'call' : 'endcall',
'raw' : 'endraw'
'macro': 'endmacro',
'call': 'endcall',
'raw': 'endraw'
})

hamlpynodes.TagNode.may_contain['for'] = 'else'
Expand All @@ -120,6 +125,7 @@ def watch_folder():
# allow graceful exit (no stacktrace output)
sys.exit(0)


def _watch_folder(folder, destination, compiler_args):
"""Compares "modified" timestamps against the "compiled" dict, calls compiler
if necessary. Returns a tuple of the number of files hit and the number
Expand All @@ -140,17 +146,19 @@ def _watch_folder(folder, destination, compiler_args):
os.makedirs(compiled_folder)

compiled_path = _compiled_path(compiled_folder, filename)
if not fullpath in compiled or compiled[fullpath] < mtime or not os.path.isfile(compiled_path):
if fullpath not in compiled or compiled[fullpath] < mtime or not os.path.isfile(compiled_path):
compiled[fullpath] = mtime
total_files += 1
if not compile_file(fullpath, compiled_path, compiler_args):
num_failed += 1

return (total_files, num_failed)
return total_files, num_failed


def _compiled_path(destination, filename):
return os.path.join(destination, filename[:filename.rfind('.')] + Options.OUTPUT_EXT)


def compile_file(fullpath, outfile_name, compiler_args):
"""Calls HamlPy compiler. Returns True if the file was compiled and
written successfully."""
Expand All @@ -159,10 +167,10 @@ def compile_file(fullpath, outfile_name, compiler_args):
try:
if Options.DEBUG:
print("Compiling %s -> %s" % (fullpath, outfile_name))
haml_lines = codecs.open(fullpath, 'r', encoding = 'utf-8').read().splitlines()
haml_lines = codecs.open(fullpath, 'r', encoding='utf-8').read().splitlines()
compiler = hamlpy.Compiler(compiler_args)
output = compiler.process_lines(haml_lines)
outfile = codecs.open(outfile_name, 'w', encoding = 'utf-8')
outfile = codecs.open(outfile_name, 'w', encoding='utf-8')
outfile.write(output)

return True
Expand All @@ -173,5 +181,6 @@ def compile_file(fullpath, outfile_name, compiler_args):

return False

if __name__ == '__main__':

if __name__ == '__main__': # pragma: no cover
watch_folder()
88 changes: 88 additions & 0 deletions hamlpy/test/test_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import print_function, unicode_literals

import os
import shutil
import sys
import time
import unittest

from mock import patch

from hamlpy.hamlpy_watcher import watch_folder


WORKING_DIR = '.watcher_test'
INPUT_DIR = WORKING_DIR + os.sep + 'input'
OUTPUT_DIR = WORKING_DIR + os.sep + 'output'


class ScriptExit(Exception):
def __init__(self, exit_code):
self.exit_code = exit_code


class WatcherTest(unittest.TestCase):

def test_watch_folder(self):
# remove working directory if it exists and re-create it
if os.path.exists(WORKING_DIR):
shutil.rmtree(WORKING_DIR)

os.makedirs(INPUT_DIR)
os.makedirs(OUTPUT_DIR)

# create some haml files for testing
self._write_file(INPUT_DIR + os.sep + 'test.haml', "%span{'class': 'test'}\n- macro\n")
self._write_file(INPUT_DIR + os.sep + 'error.haml', "%div{")

# run as once off pass - should return 1 for number of failed conversions
self._run_script([
'hamlpy_watcher.py',
INPUT_DIR, OUTPUT_DIR,
'--once', '--input-extension=.haml', '--verbose', '--tag=macro:endmacro'
], 1)

# check file without errors was converted
self.assertFileContents(OUTPUT_DIR + os.sep + 'test.html',
"<span class='test'></span>\n{% macro %}\n{% endmacro %}\n")

# run without output directory which should make it default to re-using the input directory
self._run_script([
'hamlpy_watcher.py',
INPUT_DIR,
'--once', '--input-extension=.haml', '--tag=macro:endmacro'
], 1)

self.assertFileContents(INPUT_DIR + os.sep + 'test.html',
"<span class='test'></span>\n{% macro %}\n{% endmacro %}\n")

# run in watch mode with 1 second refresh
self._run_script([
'hamlpy_watcher.py',
INPUT_DIR,
'--refresh=1', '--input-extension=.haml', '--tag=macro:endmacro'
], 1)

def assertFileContents(self, path, contents):
with open(path, 'r') as f:
self.assertEqual(f.read(), contents)

def _write_file(self, path, text):
with open(path, 'w') as f:
f.write(text)

def _run_script(self, script_args, expected_exit_code):
def raise_exception_with_code(code):
raise ScriptExit(code)

# patch sys.exit so it throws an exception so we can return execution to this test
# patch sys.argv to pass our arguments to the script
# patch time.sleep to be interrupted
with patch.object(sys, 'exit', side_effect=raise_exception_with_code), \
patch.object(sys, 'argv', script_args), \
patch.object(time, 'sleep', side_effect=KeyboardInterrupt), \
self.assertRaises(ScriptExit) as raises:

watch_folder()

assert raises.exception.exit_code == expected_exit_code

0 comments on commit cc7695b

Please sign in to comment.