Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
  • 2 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
View
5 example-config.yml
@@ -9,7 +9,10 @@ Octobot:
workers: 1,
username: cilantro,
password: burrito,
- heartbeat: 10
+ heartbeat: 10,
+ arguments: {
+ x-ha-policy: all
+ }
}
metrics_port: 1228
View
5 src/main/java/com/urbanairship/octobot/Metrics.java
@@ -5,14 +5,11 @@
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.Timer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
public class Metrics {
- protected static final MetricsRegistry registry = new MetricsRegistry();
+ protected static final MetricsRegistry registry = com.yammer.metrics.Metrics.defaultRegistry();
// Updates internal metrics following task execution.
public static void update(String task, long time, boolean status, int retries) {
View
3  src/main/java/com/urbanairship/octobot/Queue.java
@@ -13,6 +13,7 @@
public String password;
public String vhost;
public Integer heartbeat;
+ public HashMap<String, Object> arguments;
public Queue(String queueType, String queueName, String host, Integer port,
String username, String password) {
@@ -44,6 +45,8 @@ public Queue(HashMap<String, Object> config) {
this.port = Integer.parseInt(((Long) config.get("port")).toString());
if (config.get("heartbeat") != null)
this.heartbeat = Integer.parseInt(((Long) config.get("heartbeat")).toString());
+ if (config.get("arguments") != null)
+ this.arguments = (HashMap<String, Object>)config.get("arguments");
}
View
142 src/main/java/com/urbanairship/octobot/QueueConsumer.java
@@ -92,7 +92,7 @@ private void consumeFromBeanstalk() {
try { job = beanstalkClient.reserve(1); }
catch (BeanstalkException e) {
logger.error("Beanstalk connection error.", e);
- beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
+ beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
queue.port, queue.queueName);
continue;
}
@@ -106,7 +106,7 @@ private void consumeFromBeanstalk() {
try { beanstalkClient.delete(job.getJobId()); }
catch (BeanstalkException e) {
logger.error("Error sending message receipt.", e);
- beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
+ beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
queue.port, queue.queueName);
}
}
@@ -122,14 +122,14 @@ private void consumeFromRedis() {
} catch (JedisConnectionException e) {
logger.error("Unable to connect to Redis.", e);
}
-
+
logger.info("Connected to Redis.");
jedis.subscribe(new JedisPubSub() {
@Override
- public void onMessage(String channel, String message) {
- invokeTask(message);
- }
+ public void onMessage(String channel, String message) {
+ invokeTask(message);
+ }
@Override
public void onPMessage(String string, String string1, String string2) {
@@ -155,85 +155,85 @@ public void onPUnsubscribe(String string, int i) {
public void onPSubscribe(String string, int i) {
logger.info("onPSubscribe Triggered - Not implemented.");
}
- }, queue.queueName);
+ }, queue.queueName);
}
-// Invokes a task based on the name of the task passed in the message via
-// reflection, accounting for non-existent tasks and errors while running.
-public boolean invokeTask(String rawMessage) {
- String taskName = "";
- JSONObject message;
- int retryCount = 0;
- long retryTimes = 0;
+ // Invokes a task based on the name of the task passed in the message via
+ // reflection, accounting for non-existent tasks and errors while running.
+ public boolean invokeTask(String rawMessage) {
+ String taskName = "";
+ JSONObject message;
+ int retryCount = 0;
+ long retryTimes = 0;
- long startedAt = System.nanoTime();
- String errorMessage = null;
- Throwable lastException = null;
- boolean executedSuccessfully = false;
+ long startedAt = System.nanoTime();
+ String errorMessage = null;
+ Throwable lastException = null;
+ boolean executedSuccessfully = false;
- while (retryCount < retryTimes + 1) {
- if (retryCount > 0)
- logger.info("Retrying task. Attempt " + retryCount + " of " + retryTimes);
+ while (retryCount < retryTimes + 1) {
+ if (retryCount > 0)
+ logger.info("Retrying task. Attempt " + retryCount + " of " + retryTimes);
- try {
- message = (JSONObject) JSONValue.parse(rawMessage);
- taskName = (String) message.get("task");
- if (message.containsKey("retries"))
- retryTimes = (Long) message.get("retries");
- } catch (Exception e) {
- logger.error("Error: Invalid message received: " + rawMessage);
- return executedSuccessfully;
+ try {
+ message = (JSONObject) JSONValue.parse(rawMessage);
+ taskName = (String) message.get("task");
+ if (message.containsKey("retries"))
+ retryTimes = (Long) message.get("retries");
+ } catch (Exception e) {
+ logger.error("Error: Invalid message received: " + rawMessage);
+ return executedSuccessfully;
+ }
+
+ // Locate the task, then invoke it, supplying our message.
+ // Cache methods after lookup to avoid unnecessary reflection lookups.
+ try {
+
+ TaskExecutor.execute(taskName, message);
+ executedSuccessfully = true;
+
+ } catch (ClassNotFoundException e) {
+ lastException = e;
+ errorMessage = "Error: Task requested not found: " + taskName;
+ logger.error(errorMessage);
+ } catch (NoClassDefFoundError e) {
+ lastException = e;
+ errorMessage = "Error: Task requested not found: " + taskName;
+ logger.error(errorMessage, e);
+ } catch (NoSuchMethodException e) {
+ lastException = e;
+ errorMessage = "Error: Task requested does not have a static run method.";
+ logger.error(errorMessage);
+ } catch (Throwable e) {
+ lastException = e;
+ errorMessage = "An error occurred while running the task.";
+ logger.error(errorMessage, e);
+ }
+
+ if (executedSuccessfully) break;
+ else retryCount++;
}
- // Locate the task, then invoke it, supplying our message.
- // Cache methods after lookup to avoid unnecessary reflection lookups.
- try {
+ // Deliver an e-mail error notification if enabled.
+ if (enableEmailErrors && !executedSuccessfully) {
+ String email = "Error running task: " + taskName + ".\n\n"
+ + "Attempted executing " + retryCount + " times as specified.\n\n"
+ + "The original input was: \n\n" + rawMessage + "\n\n"
+ + "Here's the error that resulted while running the task:\n\n"
+ + stackToString(lastException);
- TaskExecutor.execute(taskName, message);
- executedSuccessfully = true;
-
- } catch (ClassNotFoundException e) {
- lastException = e;
- errorMessage = "Error: Task requested not found: " + taskName;
- logger.error(errorMessage);
- } catch (NoClassDefFoundError e) {
- lastException = e;
- errorMessage = "Error: Task requested not found: " + taskName;
- logger.error(errorMessage, e);
- } catch (NoSuchMethodException e) {
- lastException = e;
- errorMessage = "Error: Task requested does not have a static run method.";
- logger.error(errorMessage);
- } catch (Throwable e) {
- lastException = e;
- errorMessage = "An error occurred while running the task.";
- logger.error(errorMessage, e);
+ try { MailQueue.put(email); }
+ catch (InterruptedException e) { }
}
-
- if (executedSuccessfully) break;
- else retryCount++;
- }
- // Deliver an e-mail error notification if enabled.
- if (enableEmailErrors && !executedSuccessfully) {
- String email = "Error running task: " + taskName + ".\n\n"
- + "Attempted executing " + retryCount + " times as specified.\n\n"
- + "The original input was: \n\n" + rawMessage + "\n\n"
- + "Here's the error that resulted while running the task:\n\n"
- + stackToString(lastException);
+ long finishedAt = System.nanoTime();
+ Metrics.update(taskName, finishedAt - startedAt, executedSuccessfully, retryCount);
- try { MailQueue.put(email); }
- catch (InterruptedException e) { }
+ return executedSuccessfully;
}
- long finishedAt = System.nanoTime();
- Metrics.update(taskName, finishedAt - startedAt, executedSuccessfully, retryCount);
-
- return executedSuccessfully;
-}
-
// Opens up a connection to RabbitMQ, retrying every five seconds
// if the queue server is unavailable.
private Channel getAMQPChannel(Queue queue) {
@@ -248,7 +248,7 @@ private Channel getAMQPChannel(Queue queue) {
channel = connection.createChannel();
consumer = new QueueingConsumer(channel);
channel.exchangeDeclare(queue.queueName, "direct", true);
- channel.queueDeclare(queue.queueName, true, false, false, null);
+ channel.queueDeclare(queue.queueName, true, false, false, queue.arguments);
channel.queueBind(queue.queueName, queue.queueName, queue.queueName);
channel.basicConsume(queue.queueName, false, consumer);
logger.info("Connected to RabbitMQ");

No commit comments for this range

Something went wrong with that request. Please try again.