Permalink
Browse files

Adding sleepTime parameter to SqsQueueSpout.

This sets the amount of time a Task will sleep between calls to SQS
when the previous call was empty. Since Amazon charges per total
amount of calls to SQS, this is a way to control costs for lower-volume
queues.
  • Loading branch information...
1 parent 25c3348 commit 934c04279f5a7925584ff0e23d6a4eb062afc320 @apetresc apetresc committed Jan 24, 2012
Showing with 31 additions and 1 deletion.
  1. +31 −1 storm-contrib-sqs/src/main/java/storm/contrib/sqs/SqsQueueSpout.java
@@ -58,6 +58,8 @@
private final String queueUrl;
private final boolean reliable;
private LinkedBlockingQueue<Message> queue;
+
+ private int sleepTime;
/**
* @param queueUrl the URL for the Amazon SQS queue to consume from
@@ -66,6 +68,7 @@
public SqsQueueSpout(String queueUrl, boolean reliable) {
this.queueUrl = queueUrl;
this.reliable = reliable;
+ this.sleepTime = 100;
}
@Override
@@ -102,7 +105,7 @@ public void nextTuple() {
}
} else {
// Still empty, go to sleep.
- Utils.sleep(100);
+ Utils.sleep(sleepTime);
}
}
@@ -120,6 +123,33 @@ public String getStreamId(Message message) {
return Utils.DEFAULT_STREAM_ID;
}
+ /**
+ * Returns the number of milliseconds the spout will wait before making
+ * another call to SQS when the previous call came back empty. Defaults to
+ * {@code 100}.
+ *
+ * Since Amazon charges per SQS request, you can use this parameter to
+ * control costs for lower-volume queues.
+ *
+ * @return the number of milliseconds the spout will wait between SQS calls.
+ */
+ public int getSleepTime() {
+ return sleepTime;
+ }
+
+ /**
+ * Sets the number of milliseconds the spout will wait before making
+ * another call to SQS when the previous call came back empty.
+ *
+ * Since Amazon charges per SQS request, you can use this parameter to
+ * control costs for lower-volume queues.
+ *
+ * @param sleepTime the number of milliseconds the spout will wait between SQS calls.
+ */
+ public void setSleepTime(int sleepTime) {
+ this.sleepTime = sleepTime;
+ }
+
@Override
public void ack(Object msgId) {
// Only called in reliable mode.

0 comments on commit 934c042

Please sign in to comment.