Skip to content

Commit

Permalink
HBASE-7370 Remove Writable From ScanMetrics.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1423435 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
elliottneilclark committed Dec 18, 2012
1 parent e50eeb3 commit 36db216
Show file tree
Hide file tree
Showing 10 changed files with 1,281 additions and 120 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions hbase-protocol/src/main/protobuf/MapReduce.proto
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//This file includes protocol buffers used in MapReduce only.

option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "MapReduceProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "hbase.proto";

message ScanMetrics {

repeated NameInt64Pair metrics = 1;

}
5 changes: 5 additions & 0 deletions hbase-protocol/src/main/protobuf/hbase.proto
Expand Up @@ -263,3 +263,8 @@ message BytesBytesPair {
required bytes first = 1;
required bytes second = 2;
}

message NameInt64Pair {
optional string name = 1;
optional int64 value = 2;
}
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
Expand Down Expand Up @@ -215,7 +217,7 @@ private boolean nextScanner(int nbRows, final boolean done)
callable.withRetries();
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.inc();
this.scanMetrics.countOfRegions.incrementAndGet();
}
} catch (IOException e) {
close();
Expand Down Expand Up @@ -249,8 +251,8 @@ private void writeScanMetrics() throws IOException {
return;
}
final DataOutputBuffer d = new DataOutputBuffer();
scanMetrics.write(d);
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
}

public Result next() throws IOException {
Expand Down Expand Up @@ -329,7 +331,7 @@ public Result next() throws IOException {
}
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null ) {
this.scanMetrics.sumOfMillisSecBetweenNexts.inc(currentTime-lastNext);
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
}
lastNext = currentTime;
if (values != null && values.length > 0) {
Expand Down
Expand Up @@ -102,9 +102,9 @@ public void connect(boolean reload) throws IOException {
// HConnectionManager will call instantiateServer with reload==true
// if and only if for retries.
if (reload && this.scanMetrics != null) {
this.scanMetrics.countOfRPCRetries.inc();
this.scanMetrics.countOfRPCRetries.incrementAndGet();
if (isRegionServerRemote) {
this.scanMetrics.countOfRemoteRPCRetries.inc();
this.scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
}
}
}
Expand Down Expand Up @@ -197,7 +197,7 @@ private void checkIfRegionServerIsRemote() throws UnknownHostException {
// when what we need is to open scanner against new location.
// Attach NSRE to signal client that it needs to resetup scanner.
if (this.scanMetrics != null) {
this.scanMetrics.countOfNSRE.inc();
this.scanMetrics.countOfNSRE.incrementAndGet();
}
throw new DoNotRetryIOException("Reset scanner", ioe);
} else if (ioe instanceof RegionServerStoppedException) {
Expand All @@ -220,9 +220,9 @@ private void incRPCcallsMetrics() {
if (this.scanMetrics == null) {
return;
}
this.scanMetrics.countOfRPCcalls.inc();
this.scanMetrics.countOfRPCcalls.incrementAndGet();
if (isRegionServerRemote) {
this.scanMetrics.countOfRemoteRPCcalls.inc();
this.scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
}
}

Expand Down
Expand Up @@ -18,158 +18,125 @@

package org.apache.hadoop.hbase.client.metrics;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.metrics.util.MetricsBase;
import org.apache.hadoop.metrics.util.MetricsRegistry;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;


/**
* Provides client-side metrics related to scan operations
* The data can be passed to mapreduce framework or other systems.
* Currently metrics framework won't be able to support the scenario
* where multiple scan instances run on the same machine trying to
* update the same metric. We use metrics objects in the class,
* so that it can be easily switched to metrics framework later when it support
* this scenario.
* We use atomic longs so that one thread can increment,
* while another atomically resets to zero after the values are reported
* to hadoop's counters.
*
* Some of these metrics are general for any client operation such as put
* However, there is no need for this. So they are defined under scan operation
* for now.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ScanMetrics implements Writable {
public class ScanMetrics {


private static final byte SCANMETRICS_VERSION = (byte)1;
private static final Log LOG = LogFactory.getLog(ScanMetrics.class);
private MetricsRegistry registry = new MetricsRegistry();

/**
* Hash to hold the String -> Atomic Long mappings.
*/
private final Map<String, AtomicLong> counters = new HashMap<String, AtomicLong>();

// AtomicLongs to hold the metrics values. These are all updated through ClientScanner and
// ScannerCallable. They are atomic longs so that atomic getAndSet can be used to reset the
// values after progress is passed to hadoop's counters.


/**
* number of RPC calls
*/
public final MetricsTimeVaryingLong countOfRPCcalls =
new MetricsTimeVaryingLong("RPC_CALLS", registry);
public final AtomicLong countOfRPCcalls = createCounter("RPC_CALLS");

/**
* number of remote RPC calls
*/
public final MetricsTimeVaryingLong countOfRemoteRPCcalls =
new MetricsTimeVaryingLong("REMOTE_RPC_CALLS", registry);
public final AtomicLong countOfRemoteRPCcalls = createCounter("REMOTE_RPC_CALLS");

/**
* sum of milliseconds between sequential next calls
*/
public final MetricsTimeVaryingLong sumOfMillisSecBetweenNexts =
new MetricsTimeVaryingLong("MILLIS_BETWEEN_NEXTS", registry);
public final AtomicLong sumOfMillisSecBetweenNexts = createCounter("MILLIS_BETWEEN_NEXTS");

/**
* number of NotServingRegionException caught
*/
public final MetricsTimeVaryingLong countOfNSRE =
new MetricsTimeVaryingLong("NOT_SERVING_REGION_EXCEPTION", registry);
public final AtomicLong countOfNSRE = createCounter("NOT_SERVING_REGION_EXCEPTION");

/**
* number of bytes in Result objects from region servers
*/
public final MetricsTimeVaryingLong countOfBytesInResults =
new MetricsTimeVaryingLong("BYTES_IN_RESULTS", registry);
public final AtomicLong countOfBytesInResults = createCounter("BYTES_IN_RESULTS");

/**
* number of bytes in Result objects from remote region servers
*/
public final MetricsTimeVaryingLong countOfBytesInRemoteResults =
new MetricsTimeVaryingLong("BYTES_IN_REMOTE_RESULTS", registry);
public final AtomicLong countOfBytesInRemoteResults = createCounter("BYTES_IN_REMOTE_RESULTS");

/**
* number of regions
*/
public final MetricsTimeVaryingLong countOfRegions =
new MetricsTimeVaryingLong("REGIONS_SCANNED", registry);
public final AtomicLong countOfRegions = createCounter("REGIONS_SCANNED");

/**
* number of RPC retries
*/
public final MetricsTimeVaryingLong countOfRPCRetries =
new MetricsTimeVaryingLong("RPC_RETRIES", registry);
public final AtomicLong countOfRPCRetries = createCounter("RPC_RETRIES");

/**
* number of remote RPC retries
*/
public final MetricsTimeVaryingLong countOfRemoteRPCRetries =
new MetricsTimeVaryingLong("REMOTE_RPC_RETRIES", registry);
public final AtomicLong countOfRemoteRPCRetries = createCounter("REMOTE_RPC_RETRIES");

/**
* constructor
*/
public ScanMetrics () {
public ScanMetrics() {
}

/**
* serialize all the MetricsTimeVaryingLong
*/
public void write(DataOutput out) throws IOException {
out.writeByte(SCANMETRICS_VERSION);
Collection<MetricsBase> mbs = registry.getMetricsList();

// we only handle MetricsTimeVaryingLong for now.
int metricsCount = 0;
for (MetricsBase mb : mbs) {
if ( mb instanceof MetricsTimeVaryingLong) {
metricsCount++;
} else {
throw new IOException("unsupported metrics type. metrics name: "
+ mb.getName() + ", metrics description: " + mb.getDescription());
}
}

out.writeInt(metricsCount);
for (MetricsBase mb : mbs) {
out.writeUTF(mb.getName());
out.writeLong(((MetricsTimeVaryingLong) mb).getCurrentIntervalValue());
}
private AtomicLong createCounter(String counterName) {
AtomicLong c = new AtomicLong(0);
counters.put(counterName, c);
return c;
}

public void readFields(DataInput in) throws IOException {
int version = in.readByte();
if (version > (int)SCANMETRICS_VERSION) {
throw new IOException("version " + version + " not supported");
}

int metricsCount = in.readInt();
for (int i=0; i<metricsCount; i++) {
String metricsName = in.readUTF();
long v = in.readLong();
MetricsBase mb = registry.get(metricsName);
if ( mb instanceof MetricsTimeVaryingLong) {
((MetricsTimeVaryingLong) mb).inc(v);
} else {
LOG.warn("unsupported metrics type. metrics name: "
+ mb.getName() + ", metrics description: " + mb.getDescription());
}
public void setCounter(String counterName, long value) {
AtomicLong c = this.counters.get(counterName);
if (c != null) {
c.set(value);
}
}

public MetricsTimeVaryingLong[] getMetricsTimeVaryingLongArray() {
Collection<MetricsBase> mbs = registry.getMetricsList();
ArrayList<MetricsTimeVaryingLong> mlv =
new ArrayList<MetricsTimeVaryingLong>();
for (MetricsBase mb : mbs) {
if ( mb instanceof MetricsTimeVaryingLong) {
mlv.add((MetricsTimeVaryingLong) mb);
}
/**
* Get all of the values since the last time this function was called.
*
* Calling this function will reset all AtomicLongs in the instance back to 0.
*
* @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
//Create a builder
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
//For every entry add the value and reset the AtomicLong back to zero
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
builder.put(e.getKey(), e.getValue().getAndSet(0));
}
return mlv.toArray(new MetricsTimeVaryingLong[mlv.size()]);
//Build the immutable map so that people can't mess around with it.
return builder.build();
}

}
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -32,6 +33,7 @@
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.mapreduce.Counter;
Expand Down Expand Up @@ -262,19 +264,16 @@ private void updateCounters() throws IOException {
return;
}

DataInputBuffer in = new DataInputBuffer();
in.reset(serializedMetrics, 0, serializedMetrics.length);
ScanMetrics scanMetrics = new ScanMetrics();
scanMetrics.readFields(in);
MetricsTimeVaryingLong[] mlvs =
scanMetrics.getMetricsTimeVaryingLongArray();
ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(serializedMetrics);

try {
for (MetricsTimeVaryingLong mlv : mlvs) {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)this.getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, mlv.getName());
ct.increment(mlv.getCurrentIntervalValue());
HBASE_COUNTER_GROUP_NAME, entry.getKey());

ct.increment(entry.getValue());
}

((Counter) this.getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numRestarts);
} catch (Exception e) {
Expand Down

0 comments on commit 36db216

Please sign in to comment.