Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fill in some missing Javadocs.

  • Loading branch information...
commit a653a1e3320283414e66ac822ba5dff593d24bdd 1 parent c9a5544
@samstokes samstokes authored
Showing with 36 additions and 1 deletion.
  1. +36 −1 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
View
37 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
@@ -71,7 +71,12 @@
public static final String CONFIG_PREFETCH_COUNT = "amqp.prefetch.count";
private static final long DEFAULT_PREFETCH_COUNT = 100;
- private static final long WAIT_FOR_NEXT_MESSAGE = 1L;
+ /**
+ * Time in milliseconds to wait for a message from the queue if there is
+ * no message ready when the topology requests a tuple (via
+ * {@link #nextTuple()}).
+ */
+ public static final long WAIT_FOR_NEXT_MESSAGE = 1L;
private final String amqpHost;
private final int amqpPort;
@@ -120,6 +125,9 @@ public AMQPSpout(String host, int port, String username, String password, String
}
+ /**
+ * Acks the message with the AMQP broker.
+ */
@Override
public void ack(Object msgId) {
if (msgId instanceof Long) {
@@ -137,6 +145,9 @@ public void ack(Object msgId) {
}
+ /**
+ * Cancels the queue subscription, and disconnects from the AMQP broker.
+ */
@Override
public void close() {
try {
@@ -161,6 +172,15 @@ public void close() {
}
+ /**
+ * Tells the AMQP broker to drop (Basic.Reject) the message.
+ *
+ * <p><strong>N.B.</strong> this does <em>not</em> requeue the message:
+ * failed messages will simply be dropped. This is to prevent infinite
+ * redelivery in the event of non-transient failures (e.g. malformed
+ * messages). However it means that messages will <em>not</em> be retried
+ * in the event of transient failures.</p>
+ */
@Override
public void fail(Object msgId) {
if (msgId instanceof Long) {
@@ -178,6 +198,13 @@ public void fail(Object msgId) {
}
+ /**
+ * Emits the next message from the queue as a tuple.
+ *
+ * <p>If no message is ready to emit, this will wait a short time
+ * ({@link #WAIT_FOR_NEXT_MESSAGE}) for one to arrive on the queue,
+ * to avoid a tight loop in the spout worker.</p>
+ */
@Override
public void nextTuple() {
if (amqpConsumer != null) {
@@ -199,6 +226,10 @@ public void nextTuple() {
}
+ /**
+ * Connects to the AMQP broker, declares the queue and subscribes to
+ * incoming messages.
+ */
@Override
public void open(@SuppressWarnings("rawtypes") Map config, TopologyContext context, SpoutOutputCollector collector) {
Long prefetchCount = (Long) config.get(CONFIG_PREFETCH_COUNT);
@@ -243,6 +274,10 @@ private void setupAMQP(int prefetchCount) throws IOException {
}
+ /**
+ * Declares the output fields of this spout according to the provided
+ * {@link backtype.storm.spout.Scheme}.
+ */
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(serialisationScheme.getOutputFields());
Please sign in to comment.
Something went wrong with that request. Please try again.