Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Changes to make automation easier. #6

Merged
merged 4 commits into from

2 participants

@rnewman
Collaborator

I'll be looking to instantiate a Bagheera server within a JUnit test. That led me to refactor out the driver class, and along the way make some improvements: removal of static access to channels, encapsulation of state, etc. etc.

In theory you can now run two Bagheera servers within the same JVM, and you can start one from inside a Java method.

I will probably hit more things as I come to use this in anger; I haven't audited the rest of the code for static accesses.

Test suite passes locally, but given that I don't have Zookeeper running yet, I don't know how much that's worth.

Please do whatever smoketest you would do prior to a production push, as well as a code review!

If you let me know what your smoketest looks like, I'll try to set it up myself for the future.

Thanks!

@xstevens xstevens merged commit c048dff into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
4 .gitignore
@@ -46,3 +46,7 @@ Thumbs.db
# Clover
.clover
report
+
+# Editor files
+.*.sw*
+*~
View
213 src/main/java/com/mozilla/bagheera/http/Bagheera.java
@@ -17,6 +17,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.mozilla.bagheera.http;
import java.io.IOException;
@@ -39,87 +40,163 @@
import com.mozilla.bagheera.producer.Producer;
import com.mozilla.bagheera.util.WildcardProperties;
+/**
+ * Front-end class to a Bagheera server instance.
+ *
+ * Either create a server using `startServer`, or allow the main method to do so.
+ */
public class Bagheera {
private static final Logger LOG = Logger.getLogger(Bagheera.class);
-
+
public static final String PROPERTIES_RESOURCE_NAME = "/bagheera.properties";
public static final String KAFKA_PROPERTIES_RESOURCE_NAME = "/kafka.producer.properties";
-
+
private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
- static final ChannelGroup allChannels = new DefaultChannelGroup(Bagheera.class.getName());
-
- public static void main(String[] args) throws Exception {
- int port = Integer.parseInt(System.getProperty("server.port", "8080"));
- boolean tcpNoDelay = Boolean.parseBoolean(System.getProperty("server.tcpnodelay", "false"));
-
+ // Ensure that we only do this once.
+ private static boolean metricsManagerInitialized = false;
+
+ public static NioServerSocketChannelFactory getChannelFactory() {
+ return new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newFixedThreadPool(DEFAULT_IO_THREADS));
+ }
+
+ /**
+ * Immutable record of the persistent state around a Bagheera server.
+ */
+ public static class BagheeraServerState {
+ public final int port;
+ public final Producer kafkaProducer;
+ public final NioServerSocketChannelFactory channelFactory;
+ public final Channel channel;
+ public final ChannelGroup channelGroup;
+
+ public BagheeraServerState(final int port,
+ final Producer kafkaProducer,
+ final NioServerSocketChannelFactory channelFactory,
+ final Channel channel,
+ final ChannelGroup channelGroup) {
+ this.port = port;
+ this.kafkaProducer = kafkaProducer;
+ this.channelFactory = channelFactory;
+ this.channel = channel;
+ this.channelGroup = channelGroup;
+ }
+
+ public void close() {
+ // Close our channels.
+ this.channelGroup.close().awaitUninterruptibly();
+ this.channel.close().awaitUninterruptibly();
+
+ // The caller is responsible for releasing resources from the channel factory.
+
+ // Shut down producer.
+ if (this.kafkaProducer != null) {
+ LOG.info("Closing producer resource...");
+ try {
+ this.kafkaProducer.close();
+ } catch (IOException e) {
+ LOG.error("Error closing producer.", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Start a Bagheera server with the provided settings.
+ * Throws if the server could not be started.
+ * The caller is responsible for closing the returned instance, and the
+ * channel factory if desired.
+ */
+ public static BagheeraServerState startServer(final int port,
+ final boolean tcpNoDelay,
+ final WildcardProperties props,
+ final Properties kafkaProps,
+ final NioServerSocketChannelFactory channelFactory,
+ final String channelGroupName)
+ throws Exception {
+
// Initialize metrics collection, reporting, etc.
- MetricsManager.getInstance();
-
- // Initalize properties and producer
- WildcardProperties props = new WildcardProperties();
- Properties kafkaProps = new Properties();
- InputStream in = null;
+ // We assume that this method is thread-safe.
+ // Do this only once.
+ if (!metricsManagerInitialized) {
+ MetricsManager.getInstance();
+ metricsManagerInitialized = true;
+ }
+
+ final Producer producer = new KafkaProducer(kafkaProps);
+
+ // HTTP server setup.
+ final ChannelGroup channelGroup = new DefaultChannelGroup(channelGroupName);
+ final ServerBootstrap server = new ServerBootstrap(channelFactory);
+ final HttpServerPipelineFactory pipeFactory = new HttpServerPipelineFactory(props, producer, channelGroup);
+ server.setPipelineFactory(pipeFactory);
+ server.setOption("tcpNoDelay", tcpNoDelay);
+
+ // Disable keep-alive so client connections don't hang around.
+ server.setOption("keepAlive", false);
+
+ final Channel channel = server.bind(new InetSocketAddress(port));
+ return new BagheeraServerState(port, producer, channelFactory, channel, channelGroup);
+ }
+
+ /**
+ * A simple front-end that configures a new server from properties files,
+ * waiting until runtime shutdown to clean up.
+ */
+ public static void main(String[] args) throws Exception {
+ final int port = Integer.parseInt(System.getProperty("server.port", "8080"));
+ final boolean tcpNoDelay = Boolean.parseBoolean(System.getProperty("server.tcpnodelay", "false"));
+
+ // Initalize properties and producer.
+ final WildcardProperties props = getDefaultProperties();
+ final Properties kafkaProps = getDefaultKafkaProperties();
+
+ final BagheeraServerState server = startServer(port, tcpNoDelay,
+ props, kafkaProps,
+ getChannelFactory(),
+ Bagheera.class.getName());
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ server.close();
+ server.channelFactory.releaseExternalResources();
+ }
+ });
+ }
+
+ protected static Properties getDefaultKafkaProperties() throws Exception {
+ final Properties props = new Properties();
+ final URL propUrl = Bagheera.class.getResource(KAFKA_PROPERTIES_RESOURCE_NAME);
+ if (propUrl == null) {
+ throw new IllegalArgumentException("Could not find the properties file: " + KAFKA_PROPERTIES_RESOURCE_NAME);
+ }
+
+ final InputStream in = propUrl.openStream();
try {
- URL propUrl = Bagheera.class.getResource(PROPERTIES_RESOURCE_NAME);
- if (propUrl == null) {
- throw new IllegalArgumentException("Could not find the properites file: " + PROPERTIES_RESOURCE_NAME);
- }
- in = propUrl.openStream();
props.load(in);
- in.close();
-
- propUrl = Bagheera.class.getResource(KAFKA_PROPERTIES_RESOURCE_NAME);
- if (propUrl == null) {
- throw new IllegalArgumentException("Could not find the properites file: " + KAFKA_PROPERTIES_RESOURCE_NAME);
- }
-
- in = propUrl.openStream();
- kafkaProps.load(in);
} finally {
- if (in != null) {
- in.close();
- }
+ in.close();
}
- final Producer producer = new KafkaProducer(kafkaProps);
-
- // HTTP server setup
- final NioServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newFixedThreadPool(DEFAULT_IO_THREADS)) {
- @Override
- public void releaseExternalResources() {
- super.releaseExternalResources();
- if (producer != null) {
- LOG.info("Closing producer resource...");
- try {
- producer.close();
- } catch (IOException e) {
- LOG.error("Error closing producer", e);
- }
- }
- }
- };
- ServerBootstrap server = new ServerBootstrap(channelFactory);
- HttpServerPipelineFactory pipeFactory;
+
+ return props;
+ }
+
+ protected static WildcardProperties getDefaultProperties() throws Exception {
+ final WildcardProperties props = new WildcardProperties();
+ final URL propUrl = Bagheera.class.getResource(PROPERTIES_RESOURCE_NAME);
+ if (propUrl == null) {
+ throw new IllegalArgumentException("Could not find the properties file: " + PROPERTIES_RESOURCE_NAME);
+ }
+
+ final InputStream in = propUrl.openStream();
try {
- pipeFactory = new HttpServerPipelineFactory(props, producer);
- server.setPipelineFactory(pipeFactory);
- server.setOption("tcpNoDelay", tcpNoDelay);
- // disable keep-alive so client connections don't hang around
- server.setOption("keepAlive", false);
- Channel ch = server.bind(new InetSocketAddress(port));
- allChannels.add(ch);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- ChannelGroupFuture future = allChannels.close();
- future.awaitUninterruptibly();
- channelFactory.releaseExternalResources();
- }
- });
- } catch (IOException e) {
- LOG.error("Error initializing pipeline factory", e);
+ props.load(in);
+ } finally {
+ in.close();
}
- }
+ return props;
+ }
}
View
9 src/main/java/com/mozilla/bagheera/http/HttpServerPipelineFactory.java
@@ -24,6 +24,7 @@
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpContentDecompressor;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
@@ -38,8 +39,9 @@
private final int maxContentLength;
private final Validator validator;
private final Producer producer;
+ private final ChannelGroup channelGroup;
- public HttpServerPipelineFactory(WildcardProperties props, Producer producer) throws IOException {
+ public HttpServerPipelineFactory(WildcardProperties props, Producer producer, ChannelGroup channelGroup) throws IOException {
this.props = props;
String validNsStr = props.getProperty("valid.namespaces");
if (validNsStr == null || validNsStr.length() == 0) {
@@ -48,6 +50,7 @@ public HttpServerPipelineFactory(WildcardProperties props, Producer producer) th
this.validator = new Validator(validNsStr.split(","));
this.maxContentLength = Integer.parseInt(props.getProperty("max.content.length","1048576"));
this.producer = producer;
+ this.channelGroup = channelGroup;
}
/* (non-Javadoc)
@@ -64,9 +67,9 @@ public ChannelPipeline getPipeline() throws Exception {
pipeline.addLast("encodingCorrector", new ContentEncodingCorrector());
pipeline.addLast("inflater", new HttpContentDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("handler", new SubmissionHandler(validator, producer));
+ pipeline.addLast("handler", new SubmissionHandler(validator, producer, this.channelGroup));
return pipeline;
}
-}
+}
View
7 src/main/java/com/mozilla/bagheera/http/SubmissionHandler.java
@@ -45,6 +45,7 @@
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -69,10 +70,12 @@
private final MetricsManager metricsManager;
private final Producer producer;
+ private final ChannelGroup channelGroup;
- public SubmissionHandler(Validator validator, Producer producer) {
+ public SubmissionHandler(Validator validator, Producer producer, ChannelGroup channelGroup) {
this.metricsManager = MetricsManager.getInstance();
this.producer = producer;
+ this.channelGroup = channelGroup;
}
private void updateRequestMetrics(String namespace, String method, int size) {
@@ -151,7 +154,7 @@ private void writeResponse(HttpResponseStatus status, MessageEvent e, String nam
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
- Bagheera.allChannels.add(e.getChannel());
+ this.channelGroup.add(e.getChannel());
}
@Override
Something went wrong with that request. Please try again.