Skip to content
This repository has been archived by the owner on Jan 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #160 from piyushnarang/task_counter_filters
Browse files Browse the repository at this point in the history
Add support to filter task / job counters
  • Loading branch information
vrushalivc committed Apr 10, 2017
2 parents e17ec79 + 75b8123 commit 2089d69
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ deploy.sh
# Where Maven generates its output.
*/target/*
*/build/*
*.idea/*
*.iml
4 changes: 2 additions & 2 deletions hraven-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,13 @@
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
<scope>provided</scope>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import com.twitter.hraven.datasource.JobHistoryService;
import com.twitter.hraven.util.ByteUtil;

/**
* Captures the details of tasks for a hadoop job
*/
Expand Down Expand Up @@ -72,6 +71,7 @@ public void setTaskId(String taskId) {
this.taskId = taskId;
}

@JsonProperty("taskType")
public String getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.ArrayList;


import javax.ws.rs.ext.ContextResolver;
import javax.ws.rs.ext.Provider;
Expand Down Expand Up @@ -136,8 +138,13 @@ public void serialize(TaskDetails td, JsonGenerator jsonGenerator,
SerializationContext context = RestResource.serializationContext
.get();
Predicate<String> includeFilter = context.getTaskFilter();
Predicate<String> includeCounterFilter = context.getCounterFilter();

if(includeCounterFilter != null && includeFilter == null) {
includeFilter = new SerializationContext.FieldNameFilter(new ArrayList<String>());
}

if (includeFilter == null) {
if (includeFilter == null && includeCounterFilter == null) {
// should generate the json for everything in the task details object
ObjectMapper om = new ObjectMapper();
om.registerModule(addJobMappings(createhRavenModule()));
Expand All @@ -157,8 +164,8 @@ public void serialize(TaskDetails td, JsonGenerator jsonGenerator,
filteredWrite("taskType", includeFilter, td.getType(), jsonGenerator);
filteredWrite("status", includeFilter, td.getStatus(), jsonGenerator);
filteredWrite("splits", includeFilter, td.getSplits(), jsonGenerator);
filteredWrite("counters", includeFilter, td.getCounters(),
jsonGenerator);
filteredCounterWrite("counters", includeFilter, includeCounterFilter,
td.getCounters(), jsonGenerator);
filteredWrite("taskAttemptId", includeFilter, td.getTaskAttemptId(),
jsonGenerator);
filteredWrite("trackerName", includeFilter, td.getTrackerName(),
Expand Down Expand Up @@ -190,8 +197,13 @@ public void serialize(JobDetails jd, JsonGenerator jsonGenerator,
SerializationContext context = RestResource.serializationContext
.get();
Predicate<String> includeFilter = context.getJobFilter();
Predicate<String> includeCounterFilter = context.getCounterFilter();

if (includeFilter == null) {
if (includeCounterFilter != null && includeFilter == null) {
includeFilter = new SerializationContext.FieldNameFilter(new ArrayList<String>());
}

if (includeFilter == null && includeCounterFilter == null) {
ObjectMapper om = new ObjectMapper();
om.registerModule(addJobMappings(createhRavenModule()));
om.writeValue(jsonGenerator, jd);
Expand Down Expand Up @@ -249,14 +261,14 @@ public void serialize(JobDetails jd, JsonGenerator jsonGenerator,
filteredWrite("megabyteMillis", includeFilter, jd.getMegabyteMillis(),
jsonGenerator);
filteredWrite("cost", includeFilter, jd.getCost(), jsonGenerator);
filteredWrite("counters", includeFilter, jd.getCounters(),
jsonGenerator);
filteredWrite("mapCounters", includeFilter, jd.getMapCounters(),
jsonGenerator);
filteredWrite("reduceCounters", includeFilter, jd.getReduceCounters(),
jsonGenerator);
filteredWrite("cost", includeFilter, jd.getCost(), jsonGenerator);
filteredWrite("configuration", includeFilter, jd.getConfiguration(), jsonGenerator);

filteredCounterWrite("counters", includeFilter, includeCounterFilter,
jd.getCounters(), jsonGenerator);
filteredCounterWrite("mapCounters", includeFilter, includeCounterFilter,
jd.getMapCounters(), jsonGenerator);
filteredCounterWrite("reduceCounters", includeFilter, includeCounterFilter,
jd.getReduceCounters(), jsonGenerator);
jsonGenerator.writeEndObject();
}
}
Expand Down Expand Up @@ -466,14 +478,94 @@ public static void filteredWrite(String member, Predicate<String> includeFilter,
}
}

/**
* checks if the member is to be filtered out or no if filter itself is
* null, writes out that member as a String
*
* @param member
* @param includeFilter
* @param taskObject
* @param jsonGenerator
* @throws JsonGenerationException
* @throws IOException
*/
public static void filteredWrite(String member, Predicate<String> includeFilter,
String taskObject, JsonGenerator jsonGenerator)
throws JsonGenerationException, IOException {
if (includeFilter != null) {
if (includeFilter.apply(member)) {
jsonGenerator.writeFieldName(member);
jsonGenerator.writeString(taskObject);
}
} else {
jsonGenerator.writeFieldName(member);
jsonGenerator.writeString(taskObject);
}
}


/**
* checks if the member is to be filtered out or no if filter itself is
* null, writes out that member
*
* @param member
* @param includeFilter
* @param jsonGenerator
* @throws JsonGenerationException
* @throws IOException
*/
public static void filteredCounterWrite(String member, Predicate<String> includeFilter,
Predicate<String> includeCounterFilter,
CounterMap counterMap, JsonGenerator jsonGenerator)
throws IOException {
if (includeFilter != null && includeCounterFilter == null) {
if (includeFilter.apply(member)) {
jsonGenerator.writeFieldName(member);
jsonGenerator.writeObject(counterMap);
}
} else {
if (includeCounterFilter != null) {
// get group name, counter name,
// check if it is wanted
// if yes print it.
boolean startObjectGroupMap = false;
jsonGenerator.writeFieldName(member);

String fullCounterName;
jsonGenerator.writeStartObject();

for (String group : counterMap.getGroups()) {
Map<String, Counter> groupMap = counterMap.getGroup(group);
for (Map.Entry<String, Counter> nameCounterEntry : groupMap.entrySet()) {
Counter counter = nameCounterEntry.getValue();
fullCounterName = group + "." + counter.getKey();
if (includeCounterFilter.apply(fullCounterName)) {
if (startObjectGroupMap == false) {
jsonGenerator.writeFieldName(group);
jsonGenerator.writeStartObject();
startObjectGroupMap = true;
}
jsonGenerator.writeFieldName(counter.getKey());
jsonGenerator.writeNumber(counter.getValue());
}
}
if (startObjectGroupMap) {
jsonGenerator.writeEndObject();
startObjectGroupMap = false;
}
}
jsonGenerator.writeEndObject();
}
}
}

/**
* Writes out the flow object
*
* @param jsonGenerator
* @param aFlow
* @param selectedSerialization
* @param includeFilter
* @param includeJobFieldFilter
* @throws JsonGenerationException
* @throws IOException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,23 @@ protected SerializationContext initialValue() {
@Produces(MediaType.APPLICATION_JSON)
public JobDetails getJobById(@PathParam("cluster") String cluster,
@PathParam("jobId") String jobId,
@QueryParam("include") List<String> includeFields) throws IOException {
@QueryParam("include") List<String> includeFields,
@QueryParam("includeCounter") List<String> includeCounters) throws IOException {
LOG.info("Fetching JobDetails for jobId=" + jobId);
Stopwatch timer = new Stopwatch().start();
Predicate<String> includeFilter = null;
if (includeFields != null && !includeFields.isEmpty()) {
includeFilter = new SerializationContext.FieldNameFilter(includeFields);
}

Predicate<String> includeCountersFilter = null;
if (includeCounters != null && !includeCounters.isEmpty()) {
includeCountersFilter = new SerializationContext.FieldNameFilter(includeCounters);
}

serializationContext.set(
new SerializationContext(SerializationContext.DetailLevel.EVERYTHING,
null, null, includeFilter, null));
null, null, includeFilter, null, includeCountersFilter));
JobHistoryService jobHistoryService =
new JobHistoryService(HBASE_CONF, HBASE_CONNECTION);
JobDetails jobDetails = jobHistoryService.getJobByJobID(cluster, jobId);
Expand All @@ -122,12 +129,14 @@ public JobDetails getJobById(@PathParam("cluster") String cluster,
LOG.info("For job/{cluster}/{jobId} with input query:" + " job/" + cluster
+ SLASH + jobId + "&"
+ StringUtil.buildParam("include", includeFields)
+ StringUtil.buildParam("includeCounter", includeCounters)
+ " fetched jobDetails for " + jobDetails.getJobName() + " in "
+ timer);
} else {
LOG.info("For job/{cluster}/{jobId} with input query:" + " job/" + cluster
+ SLASH + jobId + "&"
+ StringUtil.buildParam("include", includeFields)
+ StringUtil.buildParam("includeCounter", includeCounters)
+ " No jobDetails found, but spent " + timer);
}
// export latency metrics
Expand All @@ -143,17 +152,24 @@ public JobDetails getJobById(@PathParam("cluster") String cluster,
@Produces(MediaType.APPLICATION_JSON)
public List<TaskDetails> getJobTasksById(@PathParam("cluster") String cluster,
@PathParam("jobId") String jobId,
@QueryParam("include") List<String> includeFields) throws IOException {
@QueryParam("include") List<String> includeFields,
@QueryParam("includeCounter") List<String> includeCounters) throws IOException {
LOG.info("Fetching tasks info for jobId=" + jobId);
Stopwatch timer = new Stopwatch().start();

Predicate<String> includeFilter = null;
if (includeFields != null && !includeFields.isEmpty()) {
includeFilter = new SerializationContext.FieldNameFilter(includeFields);
}

Predicate<String> includeCountersFilter = null;
if (includeCounters != null && !includeCounters.isEmpty()) {
includeCountersFilter = new SerializationContext.FieldNameFilter(includeCounters);
}

serializationContext.set(
new SerializationContext(SerializationContext.DetailLevel.EVERYTHING,
null, null, null, includeFilter));
null, null, null, includeFilter, includeCountersFilter));

JobHistoryService jobHistoryService =
new JobHistoryService(HBASE_CONF, HBASE_CONNECTION);
Expand All @@ -164,11 +180,13 @@ public List<TaskDetails> getJobTasksById(@PathParam("cluster") String cluster,

if (tasks != null && !tasks.isEmpty()) {
LOG.info("For endpoint /tasks/" + cluster + "/" + jobId + "?"
+ StringUtil.buildParam("include", includeFields) + " fetched "
+ StringUtil.buildParam("include", includeFields)
+ StringUtil.buildParam("includeCounter", includeCounters) + " fetched "
+ tasks.size() + " tasks, spent time " + timer);
} else {
LOG.info("For endpoint /tasks/" + cluster + "/" + jobId + "?"
+ StringUtil.buildParam("include", includeFields)
+ StringUtil.buildParam("includeCounter", includeCounters)
+ ", found no tasks, spent time " + timer);
}
return tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,15 @@ public boolean apply(String potentialKey) {
private final Predicate<String> configFilter;
private final Predicate<String> flowFilter;
private final Predicate<String> jobFilter;
private final Predicate<String> counterFilter;
private final Predicate<String> taskFilter;

public SerializationContext(DetailLevel serializationLevel) {
this.level = serializationLevel;
this.configFilter = null;
this.flowFilter = null;
this.jobFilter = null;
this.counterFilter = null;
this.taskFilter = null;
}

Expand All @@ -149,6 +151,30 @@ public SerializationContext(DetailLevel serializationLevel,
this.flowFilter = flowFilter;
this.jobFilter = jobFilter;
this.taskFilter = taskFilter;
this.counterFilter = null;
}

/**
* constructor to set the config filter, job filter,
* task filter & counter filter
* @param serializationLevel
* @param configFilter
* @param jobFilter
* @param taskFilter
* @param counterFilter
*/
public SerializationContext(DetailLevel serializationLevel,
Predicate<String> configFilter,
Predicate<String> flowFilter,
Predicate<String> jobFilter,
Predicate<String> taskFilter,
Predicate<String> counterFilter) {
this.level = serializationLevel;
this.configFilter = configFilter;
this.flowFilter = flowFilter;
this.jobFilter = jobFilter;
this.taskFilter = taskFilter;
this.counterFilter = counterFilter;
}

public DetailLevel getLevel() {
Expand All @@ -170,4 +196,9 @@ public Predicate<String> getJobFilter() {
public Predicate<String> getTaskFilter() {
return taskFilter;
}

public Predicate<String> getCounterFilter() {
return counterFilter;
}

}
Loading

0 comments on commit 2089d69

Please sign in to comment.