Permalink
Browse files

Rework Bagheera class to make it easier to automate.

  • Loading branch information...
1 parent 68bfb64 commit 3368c804975a8777846ccd2bc0389fde8b75497b @rnewman rnewman committed Feb 20, 2013
Showing with 127 additions and 60 deletions.
  1. +127 −60 src/main/java/com/mozilla/bagheera/http/Bagheera.java
@@ -40,6 +40,11 @@
import com.mozilla.bagheera.producer.Producer;
import com.mozilla.bagheera.util.WildcardProperties;
+/**
+ * Front-end class to a Bagheera server instance.
+ *
+ * Either create a server using `startServer`, or allow the main method to do so.
+ */
public class Bagheera {
private static final Logger LOG = Logger.getLogger(Bagheera.class);
@@ -49,78 +54,140 @@
private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
- static final ChannelGroup allChannels = new DefaultChannelGroup(Bagheera.class.getName());
+ // Ensure that we only do this once.
+ private static boolean metricsManagerInitialized = false;
- public static void main(String[] args) throws Exception {
- int port = Integer.parseInt(System.getProperty("server.port", "8080"));
- boolean tcpNoDelay = Boolean.parseBoolean(System.getProperty("server.tcpnodelay", "false"));
+ public static NioServerSocketChannelFactory getChannelFactory() {
+ return new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newFixedThreadPool(DEFAULT_IO_THREADS));
+ }
+
+ /**
+ * Immutable record of the persistent state around a Bagheera server.
+ */
+ public static class BagheeraServerState {
+ public final int port;
+ public final Producer kafkaProducer;
+ public final NioServerSocketChannelFactory channelFactory;
+ public final Channel channel;
+
+ public BagheeraServerState(final int port,
+ final Producer kafkaProducer,
+ final NioServerSocketChannelFactory channelFactory,
+ final Channel channel) {
+ this.port = port;
+ this.kafkaProducer = kafkaProducer;
+ this.channelFactory = channelFactory;
+ this.channel = channel;
+ }
+
+ public void close() {
+ // Close our channel.
+ this.channel.close().awaitUninterruptibly();
+
+ // The caller is responsible for releasing resources from the channel factory.
+
+ // Shut down producer.
+ if (this.kafkaProducer != null) {
+ LOG.info("Closing producer resource...");
+ try {
+ this.kafkaProducer.close();
+ } catch (IOException e) {
+ LOG.error("Error closing producer.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Start a Bagheera server with the provided settings.
+ * Throws if the server could not be started.
+ * The caller is responsible for closing the returned instance, and the
+ * channel factory if desired.
+ */
+ public static BagheeraServerState startServer(final int port,
+ final boolean tcpNoDelay,
+ final WildcardProperties props,
+ final Properties kafkaProps,
+ final NioServerSocketChannelFactory channelFactory)
+ throws Exception {
// Initialize metrics collection, reporting, etc.
- MetricsManager.getInstance();
+ // We assume that this method is thread-safe.
+ // Do this only once.
+ if (!metricsManagerInitialized) {
+ MetricsManager.getInstance();
+ metricsManagerInitialized = true;
+ }
+
+ final Producer producer = new KafkaProducer(kafkaProps);
+
+ // HTTP server setup.
+ final ServerBootstrap server = new ServerBootstrap(channelFactory);
+ final HttpServerPipelineFactory pipeFactory = new HttpServerPipelineFactory(props, producer);
+ server.setPipelineFactory(pipeFactory);
+ server.setOption("tcpNoDelay", tcpNoDelay);
+
+ // Disable keep-alive so client connections don't hang around.
+ server.setOption("keepAlive", false);
+
+ final Channel channel = server.bind(new InetSocketAddress(port));
+ return new BagheeraServerState(port, producer, channelFactory, channel);
+ }
+
+ /**
+ * A simple front-end that configures a new server from properties files,
+ * waiting until runtime shutdown to clean up.
+ */
+ public static void main(String[] args) throws Exception {
+ final int port = Integer.parseInt(System.getProperty("server.port", "8080"));
+ final boolean tcpNoDelay = Boolean.parseBoolean(System.getProperty("server.tcpnodelay", "false"));
+
+ // Initalize properties and producer.
+ final WildcardProperties props = getDefaultProperties();
+ final Properties kafkaProps = getDefaultKafkaProperties();
+
+ final BagheeraServerState server = startServer(port, tcpNoDelay, props, kafkaProps, getChannelFactory());
- // Initalize properties and producer
- WildcardProperties props = new WildcardProperties();
- Properties kafkaProps = new Properties();
- InputStream in = null;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ server.close();
+ server.channelFactory.releaseExternalResources();
+ }
+ });
+ }
+
+ protected static Properties getDefaultKafkaProperties() throws Exception {
+ final Properties props = new Properties();
+ final URL propUrl = Bagheera.class.getResource(KAFKA_PROPERTIES_RESOURCE_NAME);
+ if (propUrl == null) {
+ throw new IllegalArgumentException("Could not find the properties file: " + KAFKA_PROPERTIES_RESOURCE_NAME);
+ }
+
+ final InputStream in = propUrl.openStream();
try {
- URL propUrl = Bagheera.class.getResource(PROPERTIES_RESOURCE_NAME);
- if (propUrl == null) {
- throw new IllegalArgumentException("Could not find the properties file: " + PROPERTIES_RESOURCE_NAME);
- }
- in = propUrl.openStream();
props.load(in);
+ } finally {
in.close();
+ }
- propUrl = Bagheera.class.getResource(KAFKA_PROPERTIES_RESOURCE_NAME);
- if (propUrl == null) {
- throw new IllegalArgumentException("Could not find the properties file: " + KAFKA_PROPERTIES_RESOURCE_NAME);
- }
+ return props;
+ }
- in = propUrl.openStream();
- kafkaProps.load(in);
- } finally {
- if (in != null) {
- in.close();
- }
+ protected static WildcardProperties getDefaultProperties() throws Exception {
+ final WildcardProperties props = new WildcardProperties();
+ final URL propUrl = Bagheera.class.getResource(PROPERTIES_RESOURCE_NAME);
+ if (propUrl == null) {
+ throw new IllegalArgumentException("Could not find the properties file: " + PROPERTIES_RESOURCE_NAME);
}
- final Producer producer = new KafkaProducer(kafkaProps);
- // HTTP server setup
- final NioServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newFixedThreadPool(DEFAULT_IO_THREADS)) {
- @Override
- public void releaseExternalResources() {
- super.releaseExternalResources();
- if (producer != null) {
- LOG.info("Closing producer resource...");
- try {
- producer.close();
- } catch (IOException e) {
- LOG.error("Error closing producer", e);
- }
- }
- }
- };
- ServerBootstrap server = new ServerBootstrap(channelFactory);
- HttpServerPipelineFactory pipeFactory;
+ final InputStream in = propUrl.openStream();
try {
- pipeFactory = new HttpServerPipelineFactory(props, producer);
- server.setPipelineFactory(pipeFactory);
- server.setOption("tcpNoDelay", tcpNoDelay);
- // disable keep-alive so client connections don't hang around
- server.setOption("keepAlive", false);
- Channel ch = server.bind(new InetSocketAddress(port));
- allChannels.add(ch);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- ChannelGroupFuture future = allChannels.close();
- future.awaitUninterruptibly();
- channelFactory.releaseExternalResources();
- }
- });
- } catch (IOException e) {
- LOG.error("Error initializing pipeline factory", e);
+ props.load(in);
+ } finally {
+ in.close();
}
- }
+ return props;
+ }
}

0 comments on commit 3368c80

Please sign in to comment.