Skip to content

Commit

Permalink
Addressed comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
archolewa committed Mar 1, 2017
1 parent 8f514bf commit 067af4d
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onNext(String jobView) {
* @param asyncResponse The channel over which to send the response
*/
private void send(Response response, AsyncResponse asyncResponse) {
if (RequestLog.isStarted(RESPONSE_WORKFLOW_TIMER)) {
if (RequestLog.isRunning(RESPONSE_WORKFLOW_TIMER)) {
RequestLog.stopTiming(RESPONSE_WORKFLOW_TIMER);
}
asyncResponse.resume(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void onNext(PreResponse preResponse) {
* @param response The Response to send back to the user
*/
private void publishResponse(Response response) {
if (RequestLog.isStarted(RESPONSE_WORKFLOW_TIMER)) {
if (RequestLog.isRunning(RESPONSE_WORKFLOW_TIMER)) {
RequestLog.stopTiming(RESPONSE_WORKFLOW_TIMER);
}
asyncResponse.resume(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,7 @@ private void init() {
* @return the map containing all the recorded times per phase in milliseconds
*/
private Map<String, Long> getDurations() {
return times.values()
.stream()
.peek(
phase -> {
if (phase.isStarted()) {
LOG.warn(
"Exporting duration while timer is running. Measurement might be wrong: {}",
phase.getName()
);
}
}
)
.collect(Collectors.toMap(TimedPhase::getName, TimedPhase::getDuration));
return times.values().stream().collect(Collectors.toMap(TimedPhase::getName, TimedPhase::getDuration));
}

/**
Expand Down Expand Up @@ -174,15 +162,15 @@ private Map<String, Float> aggregateDurations() {
}

/**
* Check if a stopwatch is started.
* Check if a stopwatch is currently running.
*
* @param caller the caller to name this stopwatch with its class's simple name
*
* @return whether this stopwatch is started
*/

public static boolean isStarted(Object caller) {
return isStarted(caller.getClass().getSimpleName());
public static boolean isRunning(Object caller) {
return isRunning(caller.getClass().getSimpleName());
}

/**
Expand All @@ -192,10 +180,10 @@ public static boolean isStarted(Object caller) {
*
* @return whether this stopwatch is started
*/
public static boolean isStarted(String timePhaseName) {
public static boolean isRunning(String timePhaseName) {
RequestLog current = RLOG.get();
TimedPhase timePhase = current.times.get(timePhaseName);
return timePhase != null && timePhase.isStarted();
return timePhase != null && timePhase.isRunning();
}

/**
Expand Down Expand Up @@ -265,9 +253,6 @@ public static void stopTiming(Object caller) {
* @param stoppedPhase The phase that has been stopped, and whose duration needs to be stored
*/
public static void registerTime(TimedPhase stoppedPhase) {
if (stoppedPhase.isStarted()) {
LOG.warn("Timer '{}' is still running. Timings may be incorrect.", stoppedPhase.getName());
}
RequestLog.REGISTRY.timer(stoppedPhase.getName()).update(stoppedPhase.getDuration(), stoppedPhase.getUnit());
}

Expand Down Expand Up @@ -423,8 +408,8 @@ public static void accumulate(RequestLog ctx) {
.stream()
.filter(
e -> e.getKey().contains(DRUID_QUERY_TIMER) ||
(e.getKey().equals(REQUEST_WORKFLOW_TIMER) && !e.getValue().isStarted()) ||
(e.getKey().equals(RESPONSE_WORKFLOW_TIMER) && e.getValue().isStarted())
(e.getKey().equals(REQUEST_WORKFLOW_TIMER) && !e.getValue().isRunning()) ||
(e.getKey().equals(RESPONSE_WORKFLOW_TIMER) && e.getValue().isRunning())
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
/**
* Represents a phase that is timed.
* TimedPhase is used to associate a Timer located in the registry with the exact duration of such a phase for a
* specific request.
* specific request. Times are in nanoseconds.
* <p>
* Note: This class is NOT thread-safe. Timers are intended to be started once by one thread, and stopped once by
* one thread (though those threads are not necessarily the same).
*/
public class TimedPhase implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TimedPhase.class);
Expand All @@ -21,7 +24,6 @@ public class TimedPhase implements AutoCloseable {

/**
* Constructor.
* Times are in nanoseconds.
*
* @param name Name of the phase
*/
Expand All @@ -35,7 +37,7 @@ public TimedPhase(String name) {
* @return This phase after being started
*/
public TimedPhase start() {
if (isStarted()) {
if (!isRunning()) {
LOG.warn("Tried to start timer that is already running: {}", name);
} else {
start = System.nanoTime();
Expand All @@ -45,17 +47,44 @@ public TimedPhase start() {

/**
* Stop the phase.
* <p>
* This method just stops the timer. It does not register the time with the {@link RequestLog}. To register
* the timer, invoke {@link TimedPhase#registerTime()}. To do both with a single method call, see
* {@link TimedPhase#close()}
*
* @see TimedPhase#registerTime()
* @see TimedPhase#close()
*/
public void stop() {
if (!isStarted()) {
if (!isRunning()) {
LOG.warn("Tried to stop timer that has not been started: {}", name);
return;
}
duration += System.nanoTime() - start;
start = 0;
}

/**
* Registers the duration of this timer with the RequestLog.
* <p>
* It is highly recommended that you {@link TimedPhase#stop()}} the timer first. Otherwise, the timings may
* be inaccurate. To both stop and register the timer at once see {@link TimedPhase#close}.
*
* @see TimedPhase#stop()
* @see TimedPhase#close()
*/
public void registerTime() {
RequestLog.registerTime(this);
}

/**
* Return the duration of the timer in nanoseconds.
*
* @return The duration of the timer in nanoseconds
*/
public long getDuration() {
if (isRunning()) {
LOG.warn("Timer '{}' is still running. Timings may be incorrect.", getName());
}
return duration;
}

Expand All @@ -67,13 +96,23 @@ public TimeUnit getUnit() {
return TimeUnit.NANOSECONDS;
}

public boolean isStarted() {
public boolean isRunning() {
return start != 0;
}

/**
* Stops the timer, and registers the timer with the RequestLog.
* <p>
* This is primarily meant to be used by the try-with-resources block, which both stops the timer and registers it
* with the RequestLog, though it can of course be called manually as well. If you want to stop the timer, but
* don't want to register the timer just yet, then see {@link TimedPhase#stop}.
*
* @see TimedPhase#stop()
* @see TimedPhase#registerTime()
*/
@Override
public void close() {
stop();
RequestLog.registerTime(this);
registerTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public FailureCallback getFailureCallback(final DruidAggregationQuery<?> druidQu
return new FailureCallback() {
@Override
public void invoke(Throwable error) {
if (RequestLog.isStarted(REQUEST_WORKFLOW_TIMER)) {
if (RequestLog.isRunning(REQUEST_WORKFLOW_TIMER)) {
RequestLog.stopTiming(REQUEST_WORKFLOW_TIMER);
}
next.getFailureCallback(druidQuery).invoke(error);
Expand All @@ -51,7 +51,7 @@ public HttpErrorCallback getErrorCallback(final DruidAggregationQuery<?> druidQu
return new HttpErrorCallback() {
@Override
public void invoke(int statusCode, String reason, String responseBody) {
if (RequestLog.isStarted(REQUEST_WORKFLOW_TIMER)) {
if (RequestLog.isRunning(REQUEST_WORKFLOW_TIMER)) {
RequestLog.stopTiming(REQUEST_WORKFLOW_TIMER);
}
next.getErrorCallback(druidQuery).invoke(statusCode, reason, responseBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class WeightCheckResponseProcessorSpec extends Specification{
then: "no timer is stopped and proccessing continues to the next processor"
// This check is artifial. Is meant to check that there is no call to stopTiming() for this timer in the
// regular execution path of the WeightCheckResponseProcessor
RequestLog.isStarted(REQUEST_WORKFLOW_TIMER) == true
RequestLog.isRunning(REQUEST_WORKFLOW_TIMER) == true
1 * next.processResponse(json, groupByQuery, null)

}
Expand All @@ -57,7 +57,7 @@ class WeightCheckResponseProcessorSpec extends Specification{
wcrp.getFailureCallback(groupByQuery).invoke(t)

then: "The REQUEST_WORKFLOW_TIMER is stopped"
RequestLog.isStarted(REQUEST_WORKFLOW_TIMER) == false
RequestLog.isRunning(REQUEST_WORKFLOW_TIMER) == false

then: "and the failure callback of the next processor is called"
1 * next.getFailureCallback(groupByQuery) >> nextFail
Expand All @@ -77,7 +77,7 @@ class WeightCheckResponseProcessorSpec extends Specification{
wcrp.getErrorCallback(groupByQuery).invoke(statusCode, reason, body)

then: "The REQUEST_WORKFLOW_TIMER is stopped"
RequestLog.isStarted(REQUEST_WORKFLOW_TIMER) == false
RequestLog.isRunning(REQUEST_WORKFLOW_TIMER) == false

then: "and the http error callback of the next processor is called"
1 * next.getErrorCallback(groupByQuery) >> nextError
Expand Down

0 comments on commit 067af4d

Please sign in to comment.