diff --git a/com.ibm.streamsx.slack/impl/java/src/com/ibm/streamsx/slack/SendSlackMessage.java b/com.ibm.streamsx.slack/impl/java/src/com/ibm/streamsx/slack/SendSlackMessage.java index f003225..854bc8a 100644 --- a/com.ibm.streamsx.slack/impl/java/src/com/ibm/streamsx/slack/SendSlackMessage.java +++ b/com.ibm.streamsx.slack/impl/java/src/com/ibm/streamsx/slack/SendSlackMessage.java @@ -42,6 +42,8 @@ import com.ibm.streams.operator.model.Parameter; import com.ibm.streams.operator.model.PrimitiveOperator; import com.ibm.streams.operator.samples.patterns.TupleConsumer; +import com.ibm.streams.operator.metrics.Metric; +import com.ibm.streams.operator.model.CustomMetric; @PrimitiveOperator( name="SendSlackMessage", @@ -66,6 +68,13 @@ }) public class SendSlackMessage extends TupleConsumer { + /** + * Metrics + */ + private Metric nFailedRequests; + private Metric nInsertedMessages; + private Metric rateLimitExceeded; + // ------------------------------------------------------------------------ // Documentation. // Attention: To add a newline, use \\n instead of \n. @@ -83,6 +92,34 @@ public class SendSlackMessage extends TupleConsumer { ; + + @CustomMetric(name = "rateLimitExceeded", kind = Metric.Kind.GAUGE, + description = "Describes whether we exceed a rate limit when sending messages to Slack server. " + + "This is set to 1 after response 429 is received. Otherwise the value is 0. " + ) + public void setRateLimitExceeded(Metric rateLimitExceeded) { + this.rateLimitExceeded = rateLimitExceeded; + } + + /** + * nFailedRequests describes the number of failed requests over the lifetime of the operator. + * @param nFailedRequests + */ + @CustomMetric(name = "nFailedRequests", kind = Metric.Kind.COUNTER, + description = "The number of failed messages over the lifetime of the operator.") + public void setnFailedRequests(Metric nFailedRequests) { + this.nFailedRequests = nFailedRequests; + } + + /** + * nInsertedMessages describes the number of successful requests over the lifetime of the operator. + * @param nInsertedMessages + */ + @CustomMetric(name = "nInsertedMessages", kind = Metric.Kind.COUNTER, + description = "The number of successful messages over the lifetime of the operator.") + public void setnInsertedMessages(Metric nInsertedMessages) { + this.nInsertedMessages = nInsertedMessages; + } @ContextCheck(runtime=false) public static void validateSchema(OperatorContextChecker checker) { @@ -137,7 +174,9 @@ private static boolean isStringAttribute(StreamSchema schema) { description="Specifies the Slack incoming web hook URL to send messages to." ) public void setSlackUrl(String slackUrl) throws IOException { - this.slackUrl = slackUrl; + if (!("".equals(slackUrl))) { + this.slackUrl = slackUrl; + } } @Parameter( @@ -268,15 +307,27 @@ protected boolean processBatch(Queue batch) throws Exception { // Send successful - remove message from batch queue. if (responseCode == HttpStatus.SC_OK) { batch.remove(); - + this.nInsertedMessages.increment(); + this.rateLimitExceeded.setValue(0); // Can only send 1 message to Slack, per second. Thread.sleep(1000); } else { + this.nFailedRequests.increment(); _trace.error(responseCode + response.toString()); // With a 404 maybe a URL has been updated in the application config. - if (responseCode == HttpStatus.SC_NOT_FOUND) + if (responseCode == HttpStatus.SC_NOT_FOUND) { fetchApplicationProperties(); + } + else if (responseCode == 429) { + // HTTP 429 Too Many Requests + this.rateLimitExceeded.setValue(1); + // TODO check Retry-After HTTP header + Thread.sleep(10000); + } + else { + this.rateLimitExceeded.setValue(0); + } } return true; @@ -335,3 +386,4 @@ protected void fetchApplicationProperties() { } } + diff --git a/com.ibm.streamsx.slack/info.xml b/com.ibm.streamsx.slack/info.xml index d80a1fd..9a86ebe 100644 --- a/com.ibm.streamsx.slack/info.xml +++ b/com.ibm.streamsx.slack/info.xml @@ -8,7 +8,7 @@ Typical use is to send alerts or informational messages from a Streams application to Slack channel using a incoming web hook. - 0.3.0 + 0.4.0 4.2.0.0