Skip to content

Commit

Permalink
KAFKA-3417: Wrap metric reporter calls in try/catch blocks (apache#3635)
Browse files Browse the repository at this point in the history
Prevent exception thrown by metric reporters to impact request processing and other reporters.

Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
  • Loading branch information
mimaison authored and ying-zheng committed Jul 6, 2018
1 parent ddfa22a commit 5772bc7
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 16 deletions.
27 changes: 21 additions & 6 deletions clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
Expand Up @@ -524,8 +524,13 @@ public void addMetric(MetricName metricName, MetricValueProvider<?> metricValueP
public synchronized KafkaMetric removeMetric(MetricName metricName) {
KafkaMetric metric = this.metrics.remove(metricName);
if (metric != null) {
for (MetricsReporter reporter : reporters)
reporter.metricRemoval(metric);
for (MetricsReporter reporter : reporters) {
try {
reporter.metricRemoval(metric);
} catch (Exception e) {
log.error("Error when removing metric from " + reporter.getClass().getName(), e);
}
}
}
return metric;
}
Expand All @@ -552,8 +557,13 @@ synchronized void registerMetric(KafkaMetric metric) {
if (this.metrics.containsKey(metricName))
throw new IllegalArgumentException("A metric named '" + metricName + "' already exists, can't register another one.");
this.metrics.put(metricName, metric);
for (MetricsReporter reporter : reporters)
reporter.metricChange(metric);
for (MetricsReporter reporter : reporters) {
try {
reporter.metricChange(metric);
} catch (Exception e) {
log.error("Error when registering metric on " + reporter.getClass().getName(), e);
}
}
}

/**
Expand Down Expand Up @@ -634,8 +644,13 @@ public void close() {
}
}

for (MetricsReporter reporter : this.reporters)
reporter.close();
for (MetricsReporter reporter : reporters) {
try {
reporter.close();
} catch (Exception e) {
log.error("Error when closing " + reporter.getClass().getName(), e);
}
}
}

}
12 changes: 12 additions & 0 deletions core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
Expand Up @@ -154,6 +154,18 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
skipResponseHeader(response)
}

/**
* Sends a request built by the builder, waits for the response and parses it
*/
def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
val apiKey = requestBuilder.apiKey
val request = requestBuilder.build()
val header = new RequestHeader(apiKey, request.version, clientId, correlationId)
val response = requestAndReceive(socket, request.serialize(header).array)
val responseBuffer = skipResponseHeader(response)
apiKey.parseResponse(request.version, responseBuffer)
}

/**
* Serializes and sends the requestStruct to the given api.
* A ByteBuffer containing the response (without the response header) is returned.
Expand Down
@@ -0,0 +1,116 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package kafka.server

import java.net.Socket
import java.util.Properties

import kafka.utils.TestUtils
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.{ListGroupsRequest,ListGroupsResponse}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.metrics.KafkaMetric
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.protocol.Errors

import org.junit.Assert._
import org.junit.{Before, Test}
import org.junit.After
import java.util.concurrent.atomic.AtomicInteger

/*
* this test checks that a reporter that throws an exception will not affect other reporters
* and will not affect the broker's message handling
*/
class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {

override def numBrokers: Int = 1

override def propertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.MetricReporterClassesProp, classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," + classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
}

@Before
override def setUp() {
super.setUp()

// need a quota prop to register a "throttle-time" metrics after server startup
val quotaProps = new Properties()
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.1")
adminZkClient.changeClientIdConfig("<default>", quotaProps)
}

@After
override def tearDown() {
KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.set(0)
KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.set(0)

super.tearDown()
}

@Test
def testBothReportersAreInvoked() {
val port = anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val socket = new Socket("localhost", port)
socket.setSoTimeout(10000)

try {
TestUtils.retry(10000) {
val error = new ListGroupsResponse(requestResponse(socket, "clientId", 0, new ListGroupsRequest.Builder())).error()
assertEquals(Errors.NONE, error)
assertEquals(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get, KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.get)
assertTrue(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get > 0)
}
} finally {
socket.close()
}
}
}

object KafkaMetricReporterExceptionHandlingTest {
var goodReporterRegistered = new AtomicInteger
var badReporterRegistered = new AtomicInteger

class GoodReporter extends MetricsReporter {

def configure(configs: java.util.Map[String, _]) {
}

def init(metrics: java.util.List[KafkaMetric]) {
}

def metricChange(metric: KafkaMetric) {
if (metric.metricName.group == "Request") {
goodReporterRegistered.incrementAndGet
}
}

def metricRemoval(metric: KafkaMetric) {
}

def close() {
}
}

class BadReporter extends GoodReporter {

override def metricChange(metric: KafkaMetric) {
if (metric.metricName.group == "Request") {
badReporterRegistered.incrementAndGet
throw new RuntimeException(metric.metricName.toString)
}
}
}
}
10 changes: 0 additions & 10 deletions core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
Expand Up @@ -14,7 +14,6 @@

package kafka.server

import java.net.Socket
import java.nio.ByteBuffer
import java.util.{Collections, LinkedHashMap, Properties}
import java.util.concurrent.{Executors, Future, TimeUnit}
Expand Down Expand Up @@ -331,15 +330,6 @@ class RequestQuotaTest extends BaseRequestTest {
}
}

private def requestResponse(socket: Socket, clientId: String, correlationId: Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
val apiKey = requestBuilder.apiKey
val request = requestBuilder.build()
val header = new RequestHeader(apiKey, request.version, clientId, correlationId)
val response = requestAndReceive(socket, request.serialize(header).array)
val responseBuffer = skipResponseHeader(response)
apiKey.parseResponse(request.version, responseBuffer)
}

case class Client(clientId: String, apiKey: ApiKeys) {
var correlationId: Int = 0
val builder = requestBuilder(apiKey)
Expand Down

0 comments on commit 5772bc7

Please sign in to comment.