Skip to content

Commit

Permalink
add a kafka-overall-time metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Osheroff authored and smferguson committed Mar 2, 2017
1 parent 2e3a080 commit 77494ef
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
1 change: 1 addition & 0 deletions .dockerignore
@@ -1 +1,2 @@
.git
target
Expand Up @@ -10,12 +10,15 @@ public class CallbackCompleter {
private final MaxwellContext context;
private final BinlogPosition position;
private final boolean isTXCommit;
private final long sendTimeMS;
private Long completeTimeMS;

public CallbackCompleter(InflightMessageList inflightMessages, BinlogPosition position, boolean isTXCommit, MaxwellContext context) {
this.inflightMessages = inflightMessages;
this.context = context;
this.position = position;
this.isTXCommit = isTXCommit;
this.sendTimeMS = System.currentTimeMillis();
}

public void markCompleted() {
Expand All @@ -26,6 +29,12 @@ public void markCompleted() {
context.setPosition(newPosition);
}
}
completeTimeMS = System.currentTimeMillis();
}

public Long timeToSendMS() {
if ( completeTimeMS == null ) return null;
return completeTimeMS - sendTimeMS;
}
}

Expand Down
@@ -1,5 +1,8 @@
package com.zendesk.maxwell.producer;

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.zendesk.maxwell.MaxwellMetrics;
import com.zendesk.maxwell.replication.BinlogPosition;
import com.zendesk.maxwell.schema.ddl.DDLMap;
import com.zendesk.maxwell.MaxwellContext;
Expand All @@ -19,6 +22,7 @@
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;


class KafkaCallback implements Callback {
Expand All @@ -27,12 +31,14 @@ class KafkaCallback implements Callback {
private final BinlogPosition position;
private final String json;
private final String key;
private final Timer timer;

public KafkaCallback(AbstractAsyncProducer.CallbackCompleter cc, BinlogPosition position, String key, String json) {
public KafkaCallback(AbstractAsyncProducer.CallbackCompleter cc, BinlogPosition position, String key, String json, Timer timer) {
this.cc = cc;
this.position = position;
this.key = key;
this.json = json;
this.timer = timer;
}

@Override
Expand All @@ -52,6 +58,7 @@ public void onCompletion(RecordMetadata md, Exception e) {
}
}
cc.markCompleted();
timer.update(cc.timeToSendMS(), TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -84,6 +91,7 @@ class MaxwellKafkaProducerWorker extends AbstractAsyncProducer implements Runnab
private final MaxwellKafkaPartitioner ddlPartitioner;
private final KeyFormat keyFormat;
private final boolean interpolateTopic;
private final Timer metricsTimer;
private final ArrayBlockingQueue<RowMap> queue;

public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProperties, String kafkaTopic, ArrayBlockingQueue<RowMap> queue) {
Expand All @@ -110,6 +118,7 @@ public MaxwellKafkaProducerWorker(MaxwellContext context, Properties kafkaProper
else
keyFormat = KeyFormat.ARRAY;

this.metricsTimer = MaxwellMetrics.registry.timer(MetricRegistry.name(MaxwellKafkaProducer.class, "kafka-overall-time"));
this.queue = queue;
}

Expand Down Expand Up @@ -158,7 +167,7 @@ record = new ProducerRecord<>(topic, this.partitioner.kafkaPartition(r, getNumPa
if ( !KafkaCallback.LOGGER.isDebugEnabled() )
value = null;

KafkaCallback callback = new KafkaCallback(cc, r.getPosition(), key, value);
KafkaCallback callback = new KafkaCallback(cc, r.getPosition(), key, value, metricsTimer);

kafka.send(record, callback);
}
Expand Down

0 comments on commit 77494ef

Please sign in to comment.