Permalink
Browse files

Merge branch 'master' of github.com:aphyr/riemann-java-client

  • Loading branch information...
2 parents 7176fb4 + 17d06eb commit 52c12ac4b6a848b995075e982114b60e00471bf6 @eribeiro eribeiro committed Jul 12, 2012
@@ -7,6 +7,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Arrays;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import com.aphyr.riemann.Proto.Event;
import com.aphyr.riemann.Proto.Query;
@@ -17,6 +19,7 @@
public static final int DEFAULT_PORT = 5555;
protected final InetSocketAddress server;
+ protected RiemannScheduler scheduler = null;
public AbstractRiemannClient(final InetSocketAddress server) {
this.server = server;
@@ -109,6 +112,25 @@ public void sendException(String service, Throwable t) {
public abstract void disconnect() throws IOException;
+ // Returns the scheduler for this client. Creates the scheduler on first use.
+ public synchronized RiemannScheduler scheduler() {
+ if (scheduler == null) {
+ scheduler = new RiemannScheduler(this);
+ }
+ return scheduler;
+ }
+
+ // Set up recurring tasks on this client's scheduler.
+ // This may be the lowest entropy for any code I've ever written.
+ public ScheduledFuture every(long interval, Runnable f) { return scheduler().every(interval, f); }
+ public ScheduledFuture every(long interval, RiemannScheduler.Task f) { return scheduler().every(interval, f); }
+ public ScheduledFuture every(long interval, TimeUnit unit, Runnable f) { return scheduler().every(interval, unit, f); }
+ public ScheduledFuture every(long interval, TimeUnit unit, RiemannScheduler.Task f) { return scheduler().every(interval, unit, f); }
+ public ScheduledFuture every(long interval, long delay, Runnable f) { return scheduler().every(interval, delay, f); }
+ public ScheduledFuture every(long interval, long delay, RiemannScheduler.Task f) { return scheduler().every(interval, delay, f); }
+ public ScheduledFuture every(long interval, long delay, TimeUnit unit, Runnable f) { return scheduler().every(interval, delay, unit, f); }
+ public ScheduledFuture every(long interval, long delay, TimeUnit unit, RiemannScheduler.Task f) { return scheduler().every(interval, delay, unit, f); }
+
// Asserts that the message is OK; if not, throws a ServerError.
public Msg validate(Msg message) throws IOException, ServerError {
if (message.hasOk() && !message.getOk()) {
@@ -49,7 +49,6 @@ public void setMinimumReconnectInterval(long interval) {
// Attempts to reconnect. Can be called many times; will only try reconnecting every few seconds.
// If another thread is reconnecting, or a connection attempt was made too recently, returns immediately.
public void reconnect() throws IOException {
-
synchronized (reconnectionLock) {
long latestAttempt = System.currentTimeMillis() - lastReconnectionAttempt;
if (!reconnecting && latestAttempt > minimumReconnectInterval) {
@@ -63,7 +62,7 @@ public void reconnect() throws IOException {
}
try {
- synchronized (this) {
+ synchronized (socketLock) {
disconnect();
connect();
}
@@ -0,0 +1,81 @@
+package com.aphyr.riemann.client;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+// Supports periodic reporting of events.
+public class RiemannScheduler {
+ public static abstract class Task {
+ public abstract void run(AbstractRiemannClient r);
+ }
+
+ public final ScheduledThreadPoolExecutor pool;
+ public final AbstractRiemannClient client;
+
+ public RiemannScheduler(AbstractRiemannClient client) {
+ this(client, 1);
+ }
+
+ // Finer control over threadpool
+ public RiemannScheduler(AbstractRiemannClient client, int poolSize) {
+ this.client = client;
+ pool = new ScheduledThreadPoolExecutor(poolSize);
+ pool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ pool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ }
+
+ public void shutdown() {
+ pool.shutdown();
+ }
+
+ // Converts a callback to a runnable by closing over this pool's client
+ protected Runnable runnableCallback(final Task c) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ c.run(client);
+ }
+ };
+ }
+
+ // Schedule an arbitrary runnable to be run periodically; useful when
+ // you already have a client handy. Interval is in ms.
+ public ScheduledFuture every(long interval, Runnable f) {
+ return every(interval, 0, f);
+ }
+
+ public ScheduledFuture every(long interval, Task c) {
+ return every(interval, runnableCallback(c));
+ }
+
+ // Schedule an arbitrary runnable to be run periodically. Adjustable
+ // units.
+ public ScheduledFuture every(long interval, TimeUnit unit, Runnable f) {
+ return every(interval, 0, unit, f);
+ }
+
+ public ScheduledFuture every(long interval, TimeUnit unit, Task c) {
+ return every(interval, unit, runnableCallback(c));
+ }
+
+ // Schedule an arbitrary runnable to be run periodically, with initial
+ // delay. Times in ms.
+ public ScheduledFuture every(long interval, long delay, Runnable f) {
+ return every(interval, delay, TimeUnit.MILLISECONDS, f);
+ }
+
+ public ScheduledFuture every(long interval, long delay, Task c) {
+ return every(interval, delay, runnableCallback(c));
+ }
+
+ // Schedule an arbitrary runnable to be run periodically, with initial
+ // delay and adjustable units.
+ public ScheduledFuture every(long interval, long delay, TimeUnit unit, Runnable f) {
+ return pool.scheduleAtFixedRate(f, delay, interval, unit);
+ }
+
+ public ScheduledFuture every(long interval, long delay, TimeUnit unit, Task c) {
+ return every(interval, delay, unit, runnableCallback(c));
+ }
+}

0 comments on commit 52c12ac

Please sign in to comment.