Skip to content

Commit

Permalink
add metrics IBMStreams#25
Browse files Browse the repository at this point in the history
  • Loading branch information
markheger committed Apr 12, 2019
1 parent 6c61315 commit 28f6aa8
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.samples.patterns.TupleConsumer;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streams.operator.model.CustomMetric;

@PrimitiveOperator(
name="SendSlackMessage",
Expand All @@ -66,6 +68,13 @@
})
public class SendSlackMessage extends TupleConsumer {

/**
* Metrics
*/
private Metric nFailedRequests;
private Metric nInsertedMessages;
private Metric rateLimitExceeded;

// ------------------------------------------------------------------------
// Documentation.
// Attention: To add a newline, use \\n instead of \n.
Expand All @@ -83,6 +92,34 @@ public class SendSlackMessage extends TupleConsumer {


;

@CustomMetric(name = "rateLimitExceeded", kind = Metric.Kind.GAUGE,
description = "Describes whether we exceed a rate limit when sending messages to Slack server. "
+ "This is set to 1 after response 429 is received. Otherwise the value is 0. "
)
public void setRateLimitExceeded(Metric rateLimitExceeded) {
this.rateLimitExceeded = rateLimitExceeded;
}

/**
* nFailedRequests describes the number of failed requests over the lifetime of the operator.
* @param nFailedRequests
*/
@CustomMetric(name = "nFailedRequests", kind = Metric.Kind.COUNTER,
description = "The number of failed messages over the lifetime of the operator.")
public void setnFailedRequests(Metric nFailedRequests) {
this.nFailedRequests = nFailedRequests;
}

/**
* nInsertedMessages describes the number of successful requests over the lifetime of the operator.
* @param nInsertedMessages
*/
@CustomMetric(name = "nInsertedMessages", kind = Metric.Kind.COUNTER,
description = "The number of successful messages over the lifetime of the operator.")
public void setnInsertedMessages(Metric nInsertedMessages) {
this.nInsertedMessages = nInsertedMessages;
}

@ContextCheck(runtime=false)
public static void validateSchema(OperatorContextChecker checker) {
Expand Down Expand Up @@ -137,7 +174,9 @@ private static boolean isStringAttribute(StreamSchema schema) {
description="Specifies the Slack incoming web hook URL to send messages to."
)
public void setSlackUrl(String slackUrl) throws IOException {
this.slackUrl = slackUrl;
if (!("".equals(slackUrl))) {
this.slackUrl = slackUrl;
}
}

@Parameter(
Expand Down Expand Up @@ -268,15 +307,27 @@ protected boolean processBatch(Queue<BatchedTuple> batch) throws Exception {
// Send successful - remove message from batch queue.
if (responseCode == HttpStatus.SC_OK) {
batch.remove();

this.nInsertedMessages.increment();
this.rateLimitExceeded.setValue(0);
// Can only send 1 message to Slack, per second.
Thread.sleep(1000);
} else {
this.nFailedRequests.increment();
_trace.error(responseCode + response.toString());

// With a 404 maybe a URL has been updated in the application config.
if (responseCode == HttpStatus.SC_NOT_FOUND)
if (responseCode == HttpStatus.SC_NOT_FOUND) {
fetchApplicationProperties();
}
else if (responseCode == 429) {
// HTTP 429 Too Many Requests
this.rateLimitExceeded.setValue(1);
// TODO check Retry-After HTTP header
Thread.sleep(10000);
}
else {
this.rateLimitExceeded.setValue(0);
}
}

return true;
Expand Down Expand Up @@ -335,3 +386,4 @@ protected void fetchApplicationProperties() {
}
}


2 changes: 1 addition & 1 deletion com.ibm.streamsx.slack/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Typical use is to send alerts or informational messages from
a Streams application to Slack channel using a incoming web hook.
</info:description>
<info:version>0.3.0</info:version>
<info:version>0.4.0</info:version>
<info:requiredProductVersion>4.2.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down

0 comments on commit 28f6aa8

Please sign in to comment.