Skip to content

Commit

Permalink
Respond to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Clay Reimann committed Jan 26, 2017
1 parent f013e1a commit 6a283a6
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.druid.client.impl;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import static com.yahoo.bard.webservice.web.ErrorMessageFormat.DRUID_URL_INVALID;
import static com.yahoo.bard.webservice.web.handlers.workflow.DruidWorkflow.REQUEST_WORKFLOW_TIMER;
import static com.yahoo.bard.webservice.web.handlers.workflow.DruidWorkflow.RESPONSE_WORKFLOW_TIMER;

import com.yahoo.bard.webservice.application.MetricRegistryFactory;
import com.yahoo.bard.webservice.druid.client.DruidServiceConfig;
import com.yahoo.bard.webservice.druid.client.DruidWebService;
Expand All @@ -22,6 +18,16 @@
import com.yahoo.bard.webservice.logging.RequestLogUtils;
import com.yahoo.bard.webservice.logging.blocks.DruidResponse;
import com.yahoo.bard.webservice.web.handlers.RequestContext;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingJsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
Expand All @@ -32,17 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.yahoo.bard.webservice.web.ErrorMessageFormat.DRUID_URL_INVALID;
import static com.yahoo.bard.webservice.web.handlers.workflow.DruidWorkflow.REQUEST_WORKFLOW_TIMER;
import static com.yahoo.bard.webservice.web.handlers.workflow.DruidWorkflow.RESPONSE_WORKFLOW_TIMER;
import javax.ws.rs.core.Response.Status;

/**
* Represents the druid web service endpoint.
Expand Down Expand Up @@ -296,13 +299,13 @@ public void postDruidQuery(
) {
long seqNum = druidQuery.getContext().getSequenceNumber();
String entityBody;
RequestLog.startTiming("DruidQuerySerializationSeq" + seqNum);
RequestLogUtils.startTiming("DruidQuerySerializationSeq" + seqNum);
try {
entityBody = writer.writeValueAsString(druidQuery);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
} finally {
RequestLog.stopTiming("DruidQuerySerializationSeq" + seqNum);
RequestLogUtils.stopTiming("DruidQuerySerializationSeq" + seqNum);
}

long totalQueries = druidQuery.getContext().getNumberOfQueries();
Expand All @@ -312,7 +315,7 @@ public void postDruidQuery(

if (!(druidQuery instanceof WeightEvaluationQuery)) {
if (context.getNumberOfOutgoing().decrementAndGet() == 0) {
RequestLog.stopTiming(REQUEST_WORKFLOW_TIMER);
RequestLogUtils.stopTiming(REQUEST_WORKFLOW_TIMER);
}
outstanding = context.getNumberOfIncoming();
timerName = DRUID_QUERY_TIMER + String.format(format, seqNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.logging;

import com.codahale.metrics.MetricRegistry;
import static com.yahoo.bard.webservice.druid.client.impl.AsyncDruidWebServiceImpl.DRUID_QUERY_ALL_TIMER;
import static com.yahoo.bard.webservice.druid.client.impl.AsyncDruidWebServiceImpl.DRUID_QUERY_MAX_TIMER;
import static com.yahoo.bard.webservice.druid.client.impl.AsyncDruidWebServiceImpl.DRUID_QUERY_TIMER;

import com.yahoo.bard.webservice.application.MetricRegistryFactory;
import com.yahoo.bard.webservice.logging.blocks.Durations;
import com.yahoo.bard.webservice.logging.blocks.Preface;
import com.yahoo.bard.webservice.logging.blocks.Threads;
import com.yahoo.bard.webservice.web.ErrorMessageFormat;

import com.codahale.metrics.MetricRegistry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -22,9 +28,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.yahoo.bard.webservice.druid.client.impl.AsyncDruidWebServiceImpl.DRUID_QUERY_ALL_TIMER;
import static com.yahoo.bard.webservice.druid.client.impl.AsyncDruidWebServiceImpl.DRUID_QUERY_MAX_TIMER;
import static com.yahoo.bard.webservice.druid.client.impl.AsyncDruidWebServiceImpl.DRUID_QUERY_TIMER;

/**
* The RequestLog holds data that we would like to log about the request. In particular, various timings are
Expand Down Expand Up @@ -69,54 +72,6 @@ protected RequestLog(RequestLog rl) {
MDC.put(LOG_ID_KEY, logId);
}

/**
* Represents a phase that is timed.
* TimedPhase is used to associate a Timer located in the registry with the exact duration of such a phase for a
* specific request.
*/
protected static class TimedPhase {
protected final String name;
protected long start;
protected long duration;

/**
* Constructor.
*
* @param name Name of the phase
*/
protected TimedPhase(String name) {
this.name = name;
}

/**
* Start the phase.
*/
protected void start() {
if (start != 0) {
LOG.warn("Tried to start timer that is already running: {}", name);
return;
}
start = System.nanoTime();
}

/**
* Stop the phase.
*/
protected void stop() {
if (start == 0) {
LOG.warn("Tried to stop timer that has not been started: {}", name);
return;
}
duration += System.nanoTime() - start;
REGISTRY.timer(name).update(duration, TimeUnit.NANOSECONDS);
start = 0;
}

protected boolean isStarted() {
return start != 0;
}
}

/**
* Resets the contents of a request log at the calling thread.
*/
Expand All @@ -129,6 +84,21 @@ protected void clear() {
MDC.remove(LOG_ID_KEY);
}

/**
* Copys the data from source into the current request log
* @param source the request log to copy from
*/
protected void restoreFrom(RequestLog source) {
clear();
logId = source.logId;
info = source.info;
mostRecentTimer = source.mostRecentTimer;
times.putAll(source.times);
threadIds.addAll(source.threadIds);
threadIds.add(Thread.currentThread().getName());

}

/**
* Creates a new and empty request log at the calling thread.
*/
Expand Down Expand Up @@ -158,6 +128,25 @@ protected void init() {
MDC.put(LOG_ID_KEY, logId);
}

/**
* Retrieve a {@link TimedPhase} if it exists
*
* @param phaseName the name of the timed phase
* @return the phase
*/
protected TimedPhase getPhase(String phaseName) {
return times.get(phaseName);
}

/**
* Add a timed phase to the request log
*
* @param phase the phase to be timed
*/
protected void putPhase(TimedPhase phase) {
times.put(phase.name, phase);
}

/**
* Adds the durations in milliseconds of all the recorded timed phases to a map.
*
Expand All @@ -167,11 +156,10 @@ protected Map<String, Long> getDurations() {
return times.values()
.stream()
.peek(phase -> {
if (phase.start != 0) {
LOG.warn(
"Exporting duration while timer is running. Measurement might be wrong: {}", phase.name
);
if (!phase.isRunning()) {
return;
}
LOG.warn("Exporting duration while timer is running. Measurement might be wrong: {}", phase.name);
})
.collect(Collectors.toMap(phase -> phase.name, phase -> phase.duration));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import com.yahoo.bard.webservice.config.SystemConfig;
import com.yahoo.bard.webservice.config.SystemConfigProvider;
import com.yahoo.bard.webservice.logging.RequestLog.TimedPhase;
import com.yahoo.bard.webservice.logging.blocks.Durations;
import com.yahoo.bard.webservice.logging.blocks.Threads;
import org.slf4j.Logger;
Expand Down Expand Up @@ -61,8 +60,8 @@ public static boolean isStarted(Object caller) {
*/
public static boolean isStarted(String timePhaseName) {
RequestLog current = RLOG.get();
TimedPhase timePhase = current.times.get(timePhaseName);
return timePhase != null && timePhase.isStarted();
TimedPhase timePhase = current.getPhase(timePhaseName);
return timePhase != null && timePhase.isRunning();
}

/**
Expand Down Expand Up @@ -91,15 +90,15 @@ public static void startTiming(Object caller) {
*/
public static void startTiming(String timePhaseName) {
RequestLog current = RLOG.get();
TimedPhase timePhase = current.times.get(timePhaseName);
TimedPhase timePhase = current.getPhase(timePhaseName);
if (timePhase == null) {
// If it was the first phase in general, create logging context as well
if (current.info == null) {
current.init();
}

timePhase = new TimedPhase(timePhaseName);
current.times.put(timePhaseName, timePhase);
current.putPhase(timePhase);
}
current.mostRecentTimer = timePhase;
timePhase.start();
Expand Down Expand Up @@ -139,7 +138,7 @@ public static void stopTiming(Object caller) {
* @param timePhaseName the name of this stopwatch
*/
public static void stopTiming(String timePhaseName) {
TimedPhase timePhase = RLOG.get().times.get(timePhaseName);
TimedPhase timePhase = RLOG.get().getPhase(timePhaseName);
if (timePhase == null) {
LOG.warn("Tried to stop non-existent phase: {}", timePhaseName);
return;
Expand Down Expand Up @@ -267,13 +266,7 @@ public static void addIdPrefix(String idPrefix) {
*/
public static void restore(RequestLog ctx) {
RequestLog current = RLOG.get();
current.clear();
current.logId = ctx.logId;
current.info = ctx.info;
current.mostRecentTimer = ctx.mostRecentTimer;
current.times.putAll(ctx.times);
current.threadIds.addAll(ctx.threadIds);
current.threadIds.add(Thread.currentThread().getName());
current.restoreFrom(ctx);
MDC.put(LOG_ID_KEY, current.logId);
}

Expand All @@ -300,8 +293,8 @@ public static void accumulate(RequestLog ctx) {
.stream()
.filter(
e -> e.getKey().contains(DRUID_QUERY_TIMER) ||
(e.getKey().equals(REQUEST_WORKFLOW_TIMER) && !e.getValue().isStarted()) ||
(e.getKey().equals(RESPONSE_WORKFLOW_TIMER) && e.getValue().isStarted())
(e.getKey().equals(REQUEST_WORKFLOW_TIMER) && !e.getValue().isRunning()) ||
(e.getKey().equals(RESPONSE_WORKFLOW_TIMER) && e.getValue().isRunning())
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2017 Yahoo Inc. All rights reserved.
*/
package com.yahoo.bard.webservice.logging;

import com.yahoo.bard.webservice.application.MetricRegistryFactory;

import com.codahale.metrics.MetricRegistry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* Represents a phase that is timed.
* TimedPhase is used to associate a Timer located in the registry with the exact duration of such a phase for a
* specific request.
*/
public class TimedPhase {
private static final MetricRegistry REGISTRY = MetricRegistryFactory.getRegistry();
private static final Logger LOG = LoggerFactory.getLogger(RequestLog.class);

protected final String name;
protected long start;
protected long duration;

/**
* Constructor.
*
* @param name Name of the phase
*/
protected TimedPhase(String name) {
this.name = name;
}

/**
* Start the phase.
*/
protected void start() {
if (start != 0) {
LOG.warn("Tried to start timer that is already running: {}", name);
return;
}
start = System.nanoTime();
}

/**
* Stop the phase.
*/
protected void stop() {
if (start == 0) {
LOG.warn("Tried to stop timer that has not been started: {}", name);
return;
}
duration += System.nanoTime() - start;
REGISTRY.timer(name).update(duration, TimeUnit.NANOSECONDS);
start = 0;
}

protected boolean isRunning() {
return start != 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public class BardLoggingFilter implements ContainerRequestFilter, ContainerRespo
*/
@Override
public void filter(ContainerRequestContext request) throws IOException {
FiliTimingFilter.appendRequestId(request.getHeaders());
FiliInitializationFilter.appendRequestId(request.getHeaders());
RequestLogUtils.startTimingRequest();
RequestLogUtils.startTiming(this);
RequestLogUtils.record(new Preface(request));

Expand All @@ -109,7 +110,7 @@ public void filter(ContainerRequestContext request) throws IOException {
@Override
public void filter(ContainerRequestContext request, ContainerResponseContext response)
throws IOException {
FiliTimingFilter.appendRequestId(request.getHeaders());
FiliInitializationFilter.appendRequestId(request.getHeaders());
RequestLogUtils.startTiming(this);
StringBuilder debugMsgBuilder = new StringBuilder();

Expand Down Expand Up @@ -170,7 +171,7 @@ public void filter(ContainerRequestContext request, ContainerResponseContext res
*/
@Override
public void filter(ClientRequestContext request) throws IOException {
FiliTimingFilter.appendRequestId(request.getStringHeaders());
FiliInitializationFilter.appendRequestId(request.getStringHeaders());
RequestLogUtils.startTiming(CLIENT_TOTAL_TIMER);
request.setProperty(PROPERTY_NANOS, System.nanoTime());
}
Expand Down
Loading

0 comments on commit 6a283a6

Please sign in to comment.