Skip to content

Commit

Permalink
setStatus and counter increment compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
plelevier committed Aug 26, 2013
1 parent c75c74d commit ffc0cdf
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
import org.apache.hadoop.mapreduce.Mapper;

import com.linkedin.camus.etl.kafka.common.EtlKey;
import com.twitter.elephantbird.util.HadoopCompat;

/**
* KafkaETL mapper
*
*
* input -- EtlKey, AvroWrapper
*
*
* output -- EtlKey, AvroWrapper
*
*
*/
public class EtlMapper extends Mapper<EtlKey, AvroWrapper<Object>, EtlKey, AvroWrapper<Object>> {

@Override
public void map(EtlKey key, AvroWrapper<Object> val, Context context) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
Expand All @@ -25,6 +26,6 @@ public void map(EtlKey key, AvroWrapper<Object> val, Context context) throws IOE

long endTime = System.currentTimeMillis();
long mapTime = ((endTime - startTime));
context.getCounter("total", "mapper-time(ms)").increment(mapTime);
HadoopCompat.incrementCounter(context.getCounter("total", "mapper-time(ms)"), mapTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
statusMsg += statusMsg.length() > 0 ? "; " : "";
statusMsg += request.getTopic() + ":" + request.getNodeId() + ":"
+ request.getPartition();
context.setStatus(statusMsg);
HadoopCompat.setStatus(context, statusMsg);

if (reader != null) {
closeReader();
Expand All @@ -233,8 +233,8 @@ public boolean nextKeyValue() throws IOException, InterruptedException {

while (reader.getNext(key, msgValue)) {
context.progress();
mapperContext.getCounter("total", "data-read").increment(msgValue.getLength());
mapperContext.getCounter("total", "event-count").increment(1);
HadoopCompat.incrementCounter(mapperContext.getCounter("total", "data-read"), msgValue.getLength());
HadoopCompat.incrementCounter(mapperContext.getCounter("total", "event-count"), 1);
byte[] bytes = getBytes(msgValue);

// check the checksum of message
Expand Down Expand Up @@ -280,32 +280,30 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
}

if (timeStamp < beginTimeStamp) {
mapperContext.getCounter("total", "skip-old").increment(1);
HadoopCompat.incrementCounter(mapperContext.getCounter("total", "skip-old"), 1);
} else if (endTimeStamp == 0) {
DateTime time = new DateTime(timeStamp);
statusMsg += " begin read at " + time.toString();
context.setStatus(statusMsg);
HadoopCompat.setStatus(context, statusMsg);
System.out.println(key.getTopic() + " begin read at " + time.toString());
endTimeStamp = (time.plusHours(this.maxPullHours)).getMillis();
} else if (timeStamp > endTimeStamp || System.currentTimeMillis() > maxPullTime) {
statusMsg += " max read at " + new DateTime(timeStamp).toString();
context.setStatus(statusMsg);
HadoopCompat.setStatus(context, statusMsg);
System.out.println(key.getTopic() + " max read at "
+ new DateTime(timeStamp).toString());
mapperContext.getCounter("total", "request-time(ms)").increment(
reader.getFetchTime());
HadoopCompat.incrementCounter(mapperContext.getCounter("total", "request-time(ms)"), reader.getFetchTime());
closeReader();
}

long secondTime = System.currentTimeMillis();
value.datum(wrapper.getRecord());
long decodeTime = ((secondTime - tempTime));

mapperContext.getCounter("total", "decode-time(ms)").increment(decodeTime);
HadoopCompat.incrementCounter(mapperContext.getCounter("total", "decode-time(ms)"), decodeTime);

if (reader != null) {
mapperContext.getCounter("total", "request-time(ms)").increment(
reader.getFetchTime());
HadoopCompat.incrementCounter(mapperContext.getCounter("total", "request-time(ms)"), reader.getFetchTime());
}
return true;
}
Expand Down

0 comments on commit ffc0cdf

Please sign in to comment.