Permalink
Browse files

implemented gracefull configuration reloading

  • Loading branch information...
1 parent 6e5cd16 commit bc58f13b85159782631be958234931e47c5425c4 r1j0 committed May 13, 2012
@@ -19,53 +19,91 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private StatsdConfiguration configuration;
private final LinkedBlockingQueue<String> queue;
- private final List<Backend> backends;
- private final int flushInterval;
+ private List<Backend> backends;
+ private int flushInterval;
+ private ExecutorService executor;
public FlushThread(StatsdConfiguration configuration, LinkedBlockingQueue<String> linkedBlockingQueue) {
super();
- queue = linkedBlockingQueue;
- backends = configuration.getBackends();
- flushInterval = configuration.getFlushInterval();
+ this.configuration = configuration;
+ this.queue = linkedBlockingQueue;
+ this.backends = configuration.getBackends();
+ this.flushInterval = configuration.getFlushInterval();
}
@Override
public void run() {
log.info("FlushThread started.");
- List<String> messages = new ArrayList<String>();
+
final int backendsSize = backends.size();
- final ExecutorService executor = Executors.newFixedThreadPool(backendsSize);
+ executor = Executors.newFixedThreadPool(backendsSize);
while (true) {
- messages.clear();
queueInformation();
- String message = "";
+ List<String> messages = pollQueue();
- while ((message = queue.poll()) != null) {
- log.info("Message taken from the queue: " + message);
- messages.add(message);
+ if (!messages.isEmpty()) {
+ notifyBackends(executor, messages);
}
- if (!messages.isEmpty()) {
- final List<String> unmodifiableMessages = Collections.unmodifiableList(messages);
+ ThreadUtility.doSleep(flushInterval);
+ }
+ }
- for (Backend backend : backends) {
- log.info("Notifying backend: " + backend.getClass().getSimpleName());
- final Runnable backendWorker = new BackendWorker(backend, unmodifiableMessages);
- executor.execute(backendWorker);
- }
+ public void forceFlush(StatsdConfiguration statsdConfiguration) {
+ log.info("Forced flushing of messages.");
- log.info("Finished backend threads");
- }
+ ExecutorService oldExecutor = executor;
+ executor = Executors.newFixedThreadPool(backends.size());
- ThreadUtility.doSleep(flushInterval);
+ queueInformation();
+
+ List<String> messages = pollQueue();
+
+ if (!messages.isEmpty()) {
+ notifyBackends(oldExecutor, messages);
}
+
+ configuration = statsdConfiguration;
+ backends = configuration.getBackends();
+ flushInterval = configuration.getFlushInterval();
+ oldExecutor.shutdown();
+
+ log.info("Finished flushing.");
+ }
+
+
+ private List<String> pollQueue() {
+ final List<String> messages = new ArrayList<String>();
+ String message = "";
+
+ while ((message = queue.poll()) != null) {
+ log.info("Message taken from the queue: " + message);
+ messages.add(message);
+ }
+
+ return messages;
+ }
+
+
+ private void notifyBackends(ExecutorService executor, List<String> messages) {
+ final List<String> unmodifiableMessages = Collections.unmodifiableList(messages);
+
+ for (Backend backend : backends) {
+ log.info("Notifying backend: " + backend.getClass().getSimpleName());
+
+ final Runnable backendWorker = new BackendWorker(backend, unmodifiableMessages);
+ executor.execute(backendWorker);
+ }
+
+ log.info("Finished backend threads");
}
@@ -17,6 +17,7 @@
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<String>();;
private static StatsdConfiguration configuration = null;
+ private static FlushThread flushThread = null;
public static void main(String[] args) throws IOException {
@@ -27,12 +28,15 @@ public static void main(String[] args) throws IOException {
public void handle(Signal signal) {
log.info("SIGHUP received. Reloading configuration.");
configuration = new StatsdConfiguration(null);
+
+ flushThread.forceFlush(configuration);
}
});
ServerThread serverThread = ServerFactory.getInstance(configuration.getNetworkFramework(), configuration, linkedBlockingQueue);
serverThread.start();
- new FlushThread(configuration, linkedBlockingQueue).start();
-
+
+ flushThread = new FlushThread(configuration, linkedBlockingQueue);
+ flushThread.start();
}
}

0 comments on commit bc58f13

Please sign in to comment.