Skip to content

Commit

Permalink
Merge 43d9f07 into 882e709
Browse files Browse the repository at this point in the history
  • Loading branch information
baroquebobcat committed May 8, 2015
2 parents 882e709 + 43d9f07 commit a687cd9
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 76 deletions.
82 changes: 48 additions & 34 deletions src/python/pants/base/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import os
import re
import threading
import time
import uuid
from contextlib import contextmanager

from six.moves import range

Expand Down Expand Up @@ -100,23 +102,28 @@ def __init__(self, run_info_dir, parent, name, labels=None, cmd=''):
if self.parent:
self.parent.children.append(self)

self._lock = threading.Lock()

def has_label(self, label):
return label in self.labels

def start(self):
"""Mark the time at which this workunit started."""
self.start_time = time.time()
with self._lock:
self.start_time = time.time()

def end(self):
"""Mark the time at which this workunit ended."""
self.end_time = time.time()
for output in self._outputs.values():
output.close()
return self.path(), self.duration(), self._self_time(), self.has_label(WorkUnit.TOOL)
with self._lock:
self.end_time = time.time()
for output in self._outputs.values():
output.close()
return self.path(), self._duration(), self._self_time(), self.has_label(WorkUnit.TOOL)

def outcome(self):
"""Returns the outcome of this workunit."""
return self._outcome
with self._lock:
return self._outcome

def set_outcome(self, outcome):
"""Set the outcome of this work unit.
Expand All @@ -126,10 +133,10 @@ def set_outcome(self, outcome):
worst outcome of any of its subunits and any outcome set on it directly."""
if outcome not in range(0, 5):
raise Exception('Invalid outcome: {}'.format(outcome))

if outcome < self._outcome:
self._outcome = outcome
if self.parent: self.parent.set_outcome(self._outcome)
with self._lock:
if outcome < self._outcome:
self._outcome = outcome
if self.parent: self.parent.set_outcome(self._outcome)

_valid_name_re = re.compile(r'\w+')

Expand All @@ -138,38 +145,44 @@ def output(self, name):
m = WorkUnit._valid_name_re.match(name)
if not m or m.group(0) != name:
raise Exception('Invalid output name: {}'.format(name))
if name not in self._outputs:
workunit_name = re.sub(r'\W', '_', self.name)
path = os.path.join(self.run_info_dir,
'tool_outputs', '{workunit_name}-{id}.{output_name}'
.format(workunit_name=workunit_name,
id=self.id,
output_name=name))
safe_mkdir_for(path)
self._outputs[name] = FileBackedRWBuf(path)
self._output_paths[name] = path
return self._outputs[name]

def outputs(self):
"""Returns the map of output name -> output buffer."""
return self._outputs

def output_paths(self):
"""Returns the map of output name -> path of the output file."""
return self._output_paths
with self._lock:
if name not in self._outputs:
workunit_name = re.sub(r'\W', '_', self.name)
path = os.path.join(self.run_info_dir,
'tool_outputs', '{workunit_name}-{id}.{output_name}'
.format(workunit_name=workunit_name,
id=self.id,
output_name=name))
safe_mkdir_for(path)
self._outputs[name] = FileBackedRWBuf(path)
self._output_paths[name] = path
return self._outputs[name]

@contextmanager
def safe_outputs(self):
with self._lock:
outputs = {k: v for k, v in self._outputs.items() if not v.is_closed()}
yield outputs

def duration(self):
"""Returns the time (in fractional seconds) spent in this workunit and its children."""
with self._lock:
return self._duration()

def _duration(self):
# For use under a lock
return (self.end_time or time.time()) - self.start_time

def start_time_string(self):
"""A convenient string representation of start_time."""
return time.strftime('%H:%M:%S', time.localtime(self.start_time))
with self._lock:
return time.strftime('%H:%M:%S', time.localtime(self.start_time))

def start_delta_string(self):
"""A convenient string representation of how long after the run started we started."""
delta = int(self.start_time) - int(self.root().start_time)
return '{:02}:{:02}'.format(int(delta / 60), delta % 60)
with self._lock:
delta = int(self.start_time) - int(self.root().start_time)
return '{:02}:{:02}'.format(int(delta / 60), delta % 60)

def root(self):
ret = self
Expand All @@ -196,7 +209,8 @@ def unaccounted_time(self):
This assumes that all major work should be done in leaves.
TODO: Is this assumption valid?
"""
return 0 if len(self.children) == 0 else self._self_time()
with self._lock:
return 0 if len(self.children) == 0 else self._self_time()

def to_dict(self):
"""Useful for providing arguments to templates."""
Expand All @@ -210,4 +224,4 @@ def to_dict(self):

def _self_time(self):
"""Returns the time spent in this workunit outside of any children."""
return self.duration() - sum([child.duration() for child in self.children])
return self._duration() - sum([child.duration() for child in self.children])
25 changes: 17 additions & 8 deletions src/python/pants/reporting/html_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import cgi
import os
import re
import threading
import uuid
from collections import defaultdict, namedtuple

Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(self, run_tracker, settings):
# We redirect stdout, stderr etc. of tool invocations to these files.
self._output_files = defaultdict(dict) # workunit_id -> {path -> fileobj}.
self._linkify_memo = {}
self._output_lock = threading.Lock()

def report_path(self):
"""The path to the main report file."""
Expand All @@ -65,11 +67,12 @@ def open(self):

def close(self):
"""Implementation of Reporter callback."""
self._report_file.close()
# Make sure everything's closed.
for files in self._output_files.values():
for f in files.values():
f.close()
with self._output_lock:
self._report_file.close()
# Make sure everything's closed.
for files in self._output_files.values():
for f in files.values():
f.close()

def start_workunit(self, workunit):
"""Implementation of Reporter callback."""
Expand Down Expand Up @@ -177,7 +180,9 @@ def fix_detail_id(e, _id):

def handle_output(self, workunit, label, s):
"""Implementation of Reporter callback."""
if os.path.exists(self._html_dir): # Make sure we're not immediately after a clean-all.
if not os.path.exists(self._html_dir): # Make sure we're not immediately after a clean-all.
return
with self._output_lock:
path = os.path.join(self._html_dir, '{}.{}'.format(workunit.id, label))
output_files = self._output_files[workunit.id]
if path not in output_files:
Expand Down Expand Up @@ -253,13 +258,17 @@ def _render_message(self, *msg_elements):

def _emit(self, s):
"""Append content to the main report file."""
if os.path.exists(self._html_dir): # Make sure we're not immediately after a clean-all.
if not os.path.exists(self._html_dir): # Make sure we're not immediately after a clean-all.
return
with self._output_lock:
self._report_file.write(s)
self._report_file.flush() # We must flush in the same thread as the write.

def _overwrite(self, filename, s):
"""Overwrite a file with the specified contents."""
if os.path.exists(self._html_dir): # Make sure we're not immediately after a clean-all.
if not os.path.exists(self._html_dir): # Make sure we're not immediately after a clean-all.
return
with self._output_lock:
with open(os.path.join(self._html_dir, filename), 'w') as f:
f.write(s)

Expand Down
9 changes: 5 additions & 4 deletions src/python/pants/reporting/plaintext_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ def end_workunit(self, workunit):

if workunit.outcome() != WorkUnit.SUCCESS and not self._show_output(workunit):
# Emit the suppressed workunit output, if any, to aid in debugging the problem.
for name, outbuf in workunit.outputs().items():
self.emit(self._prefix(workunit, b'\n==== {} ====\n'.format(name)))
self.emit(self._prefix(workunit, outbuf.read_from(0)))
self.flush()
with workunit.safe_outputs() as outputs:
for name, outbuf in outputs.items():
self.emit(self._prefix(workunit, b'\n==== {} ====\n'.format(name)))
self.emit(self._prefix(workunit, outbuf.read_from(0)))
self.flush()

def do_handle_log(self, workunit, level, *msg_elements):
"""Implementation of Reporter callback."""
Expand Down
62 changes: 32 additions & 30 deletions src/python/pants/reporting/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,69 +46,71 @@ def __init__(self):
self._reporters = {} # name -> Reporter instance.

# We synchronize on this, to support parallel execution.
self._lock = threading.Lock()
self._reporters_lock = threading.Lock()
self._workunits_lock = threading.Lock()

def _reporter_list(self):
with self._reporters_lock:
return self._reporters.values()

def open(self):
with self._lock:
for reporter in self._reporters.values():
reporter.open()
for reporter in self._reporter_list():
reporter.open()
self._emitter_thread.start()

# Note that if you addr/remove reporters after open() has been called you have
# to ensure that their state is set up correctly. Best only to do this with
# stateless reporters, such as ConsoleReporter.

def add_reporter(self, name, reporter):
with self._lock:
with self._reporters_lock:
self._reporters[name] = reporter

def remove_reporter(self, name):
with self._lock:
with self._reporters_lock:
ret = self._reporters[name]
del self._reporters[name]
return ret

def start_workunit(self, workunit):
with self._lock:
self._workunits[workunit.id] = workunit
for reporter in self._reporters.values():
reporter.start_workunit(workunit)
self._workunits[workunit.id] = workunit
for reporter in self._reporter_list():
reporter.start_workunit(workunit)

def log(self, workunit, level, *msg_elements):
"""Log a message.
Each element of msg_elements is either a message string or a (message, detail) pair.
"""
with self._lock:
for reporter in self._reporters.values():
reporter.handle_log(workunit, level, *msg_elements)
for reporter in self._reporter_list():
reporter.handle_log(workunit, level, *msg_elements)

def end_workunit(self, workunit):
with self._lock:
self._notify() # Make sure we flush everything reported until now.
for reporter in self._reporters.values():
reporter.end_workunit(workunit)
self._notify() # Make sure we flush everything reported until now.
for reporter in self._reporter_list():
reporter.end_workunit(workunit)
with self._workunits_lock:
if workunit.id in self._workunits:
del self._workunits[workunit.id]

def flush(self):
with self._lock:
self._notify()
self._notify()

def close(self):
self._emitter_thread.stop()
with self._lock:
self._notify() # One final time.
for reporter in self._reporters.values():
reporter.close()
self._notify() # One final time.
for reporter in self._reporter_list():
reporter.close()

def _notify(self):
# Notify for output in all workunits. Note that output may be coming in from workunits other
# than the current one, if work is happening in parallel.
# Assumes self._lock is held by the caller.
for workunit in self._workunits.values():
for label, output in workunit.outputs().items():
s = output.read()
if len(s) > 0:
for reporter in self._reporters.values():
reporter.handle_output(workunit, label, s)
with self._workunits_lock:
workunits = self._workunits.values()
for workunit in workunits:
with workunit.safe_outputs() as outputs:
for label, output in outputs.items():
s = output.read()
if len(s) > 0:
for reporter in self._reporter_list():
reporter.handle_output(workunit, label, s)
5 changes: 5 additions & 0 deletions src/python/pants/rwbuf/read_write_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def __init__(self, io):
self._lock = threading.Lock()
self._io = io
self._readpos = 0
self._closed = False

def is_closed(self):
return self._closed

def read(self, size=-1):
with self._lock:
Expand All @@ -42,6 +46,7 @@ def flush(self):
self._io.flush()

def close(self):
self._closed = True
self._io.close()

def do_write(self, s):
Expand Down
9 changes: 9 additions & 0 deletions tests/python/pants_test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ python_tests(
]
)

python_tests(
name = 'test_read_write_buffer',
sources = ['test_read_write_buffer.py'],
dependencies = [
'src/python/pants/rwbuf',
]
)

target(
name = 'integration',
dependencies = [
Expand All @@ -105,6 +113,7 @@ target(
dependencies = [
':test_binary_util',
':test_maven_layout',
':test_read_write_buffer',
':test_thrift_util',
'tests/python/pants_test/android',
'tests/python/pants_test/authentication:netrc',
Expand Down
28 changes: 28 additions & 0 deletions tests/python/pants_test/test_read_write_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# coding=utf-8
# Copyright 2015 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

import unittest

from pants.rwbuf.read_write_buffer import InMemoryRWBuf


class ReadWriteBufferTest(unittest.TestCase):

def test_closed_buffer_is_closed(self):
buff = InMemoryRWBuf()
buff.write('hello')
buff.close()

self.assertTrue(buff.is_closed())

def test_read_from_buffer(self):
buff = InMemoryRWBuf()
buff.write('hello')

ret = buff.read()

self.assertEqual('hello', ret)

0 comments on commit a687cd9

Please sign in to comment.