Skip to content

Commit

Permalink
configure worker with a config file.
Browse files Browse the repository at this point in the history
  • Loading branch information
mattwigway committed Jul 15, 2015
1 parent 33e6409 commit 3a1fac9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 24 deletions.
Expand Up @@ -44,6 +44,7 @@
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;
Expand All @@ -59,6 +60,9 @@ public class AnalystWorker implements Runnable {

public static final int POLL_TIMEOUT = 10 * 1000;

/** should this worker shut down automatically */
public final boolean autoShutdown;

public static final Random random = new Random();

private TaskStatisticsStore statsStore;
Expand Down Expand Up @@ -117,7 +121,32 @@ public class AnalystWorker implements Runnable {

boolean isSinglePoint = false;

public AnalystWorker(TaskStatisticsStore statsStore) {
public AnalystWorker(Properties config) {

// parse the configuration
// set up the stats store
String statsQueue = config.getProperty("statistics-queue");
if (statsQueue != null)
this.statsStore = new SQSTaskStatisticsStore(statsQueue);
else
// a stats store that does nothing.
this.statsStore = s -> {};

String addr = config.getProperty("broker-address");
String port = config.getProperty("broker-port");

if (addr != null) {
if (port != null)
this.BROKER_BASE_URL = String.format("http://%s:%s", addr, port);
else
this.BROKER_BASE_URL = String.format("http://%s", addr);
}

this.pointSetDatastore = new PointSetDatastore(10, null, false, config.getProperty("pointsets-bucket"));
this.clusterGraphBuilder = new ClusterGraphBuilder(config.getProperty("graphs-bucket"));

Boolean autoShutdown = Boolean.parseBoolean(config.getProperty("auto-shutdown"));
this.autoShutdown = autoShutdown == null ? false : autoShutdown;

// Consider shutting this worker down once per hour, starting 55 minutes after it started up.
startupTime = System.currentTimeMillis();
Expand Down Expand Up @@ -150,12 +179,6 @@ public AnalystWorker(TaskStatisticsStore statsStore) {

objectMapper.registerModule(new GeoJsonModule());

/* These serve as lazy-loading caches for graphs and point sets. */
clusterGraphBuilder = new ClusterGraphBuilder(s3Prefix + "-graphs");
pointSetDatastore = new PointSetDatastore(10, null, false, s3Prefix + "-pointsets");

this.statsStore = statsStore;

instanceType = getInstanceType();
}

Expand All @@ -165,7 +188,7 @@ public void run() {
boolean idle = false;
while (true) {
// Consider shutting down if enough time has passed
if (System.currentTimeMillis() > nextShutdownCheckTime) {
if (System.currentTimeMillis() > nextShutdownCheckTime && autoShutdown) {
if (idle) {
try {
Process process = new ProcessBuilder("sudo", "/sbin/shutdown", "-h", "now").start();
Expand Down Expand Up @@ -432,23 +455,35 @@ public String getInstanceType () {
}
}

/**
* Requires a worker configuration, which is a Java Properties file with the following
* attributes.
*
* broker-address address of the broker, without protocol or port
* broker port port broker is running on, default 80.
* graphs-bucket S3 bucket in which graphs are stored.
* pointsets-bucket S3 bucket in which pointsets are stored
* auto-shutdown Should this worker shut down its machine if it is idle (e.g. on throwaway cloud instances)
* statistics-queue SQS queue to which to send statistics (optional)
*/
public static void main(String[] args) {
TaskStatisticsStore tss;
if (args.length == 2)
tss = new SQSTaskStatisticsStore(args[1]);
else
tss = new TaskStatisticsStore() {
@Override public void store(TaskStatistics ts) {
/** do nothing, quietly */
}
};

AnalystWorker w = new AnalystWorker(tss);
Properties config = new Properties();

if (args.length > 0)
w.BROKER_BASE_URL = args[0];
try {
File cfg;
if (args.length > 0)
cfg = new File(args[0]);
else
cfg = new File("worker.conf");

InputStream cfgis = new FileInputStream(cfg);
config.load(cfgis);
cfgis.close();
} catch (Exception e) {
LOG.info("Error loading worker configuration", e);
return;
}

w.run();
new AnalystWorker(config).run();
}

}
Expand Up @@ -24,7 +24,11 @@ public class SQSTaskStatisticsStore implements TaskStatisticsStore {

/** create a task statistics store for the given queue name */
public SQSTaskStatisticsStore(String queueName) {
queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
try {
queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
} catch (Exception e) {
LOG.error("Unable to initialize statistics store", e);
}
LOG.info("Sending statistics to SQS queue {}", queueName);
}

Expand Down

0 comments on commit 3a1fac9

Please sign in to comment.