Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: xstevens/baloo
base: bdc7e138ba
...
head fork: xstevens/baloo
compare: fef703bdb7
  • 2 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
4 README.md
@@ -23,13 +23,13 @@ Make sure your Kafka and Zookeeper servers are running first (see Kafka document
In order to run baloo on another machine you will probably want to use the dist assembly like so:
-mvn assembly:assembly
+`mvn assembly:assembly`
The zip file now under the target directory should be deployed to BALOO_HOME on the remote server.
To run Baloo you can use bin/baloo or copy the init.d script by the same name from bin/init.d to /etc/init.d. The init script assumes an installation of baloo at /usr/lib/baloo, but this can be modified by changing the BALOO_HOME variable near the top of that script. Here is an example of using the regular baloo script:
-bin/baloo 8080
+`bin/baloo 8080`
### REST Request Format ###
View
8 pom.xml
@@ -58,7 +58,7 @@
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
- <version>2.8.0</version>
+ <version>2.9.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -76,20 +76,20 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-core</artifactId>
- <version>5.3.8</version>
+ <version>5.3.23</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-http</artifactId>
- <version>5.3.8</version>
+ <version>5.3.23</version>
<scope>compile</scope>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-core</artifactId>
- <version>0.7.1-incubating</version>
+ <version>0.7.2-incubating</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
View
56 src/main/java/com/mozilla/baloo/http/ContentEncodingCorrector.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.baloo.http;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMessage;
+
+/**
+ * If Content-Encoding isn't specified this checks the Content-Type to see
+ * if there is any indication that content is compressed even though encoding
+ * isn't set.
+ */
+public class ContentEncodingCorrector extends SimpleChannelUpstreamHandler {
+
+ public static final String MIME_TYPE_JSON_ZLIB = "application/json+zlib";
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object msg = e.getMessage();
+ if (msg instanceof HttpMessage) {
+ HttpMessage m = (HttpMessage) msg;
+ String contentEncoding = m.getHeader(HttpHeaders.Names.CONTENT_ENCODING);
+ if (contentEncoding == null) {
+ String contentType = m.getHeader(HttpHeaders.Names.CONTENT_TYPE);
+ if (contentType != null && contentType.startsWith(MIME_TYPE_JSON_ZLIB)) {
+ m.setHeader(HttpHeaders.Names.CONTENT_ENCODING,"deflate");
+ }
+ }
+ Channels.fireMessageReceived(ctx, m, e.getRemoteAddress());
+ } else {
+ ctx.sendUpstream(e);
+ }
+ }
+
+}
View
8 .../com/mozilla/baloo/SecurityException.java → ...lla/baloo/http/HttpSecurityException.java
@@ -17,17 +17,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.mozilla.baloo;
+package com.mozilla.baloo.http;
-public class SecurityException extends Exception {
+public class HttpSecurityException extends Exception {
private static final long serialVersionUID = 480106127340879943L;
- public SecurityException(String message) {
+ public HttpSecurityException(String message) {
super(message);
}
- public SecurityException(String message, Throwable cause) {
+ public HttpSecurityException(String message, Throwable cause) {
super(message, cause);
}
}
View
34 src/main/java/com/mozilla/baloo/http/InvalidPathException.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.baloo.http;
+
+public class InvalidPathException extends Exception {
+
+ private static final long serialVersionUID = -2753373949363309083L;
+
+ public InvalidPathException(String message) {
+ super(message);
+ }
+
+ public InvalidPathException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
View
81 src/main/scala/com/mozilla/baloo/Baloo.scala
@@ -1,10 +1,28 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.mozilla.baloo
import com.twitter.finagle.{Service, SimpleFilter}
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.http.{Request, Response, CompressedHttp, CompressedRichHttp}
import com.twitter.util.Future
-import com.mozilla.baloo.validator.Validator
import java.net.InetSocketAddress
import java.util.Properties
import java.util.UUID
@@ -15,6 +33,8 @@ import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
import org.jboss.netty.util.CharsetUtil.UTF_8
import scala.collection.JavaConversions
+import com.mozilla.baloo.validator.Validator
+import com.mozilla.baloo.http._
import com.mozilla.bagheera.BagheeraProto.BagheeraMessage
import com.mozilla.bagheera.BagheeraProto.BagheeraMessage.Operation
import com.google.protobuf.ByteString
@@ -25,10 +45,12 @@ object Baloo {
def apply(request: Request, service: Service[Request, Response]) = {
service(request) handle { case error =>
val statusCode = error match {
+ case _: InvalidPathException =>
+ NOT_FOUND
+ case _: HttpSecurityException =>
+ FORBIDDEN
case _: IllegalArgumentException =>
NOT_ACCEPTABLE
- case _: SecurityException =>
- FORBIDDEN
case _ =>
INTERNAL_SERVER_ERROR
}
@@ -41,11 +63,11 @@ object Baloo {
class AccessFilter(validator: Validator) extends SimpleFilter[Request, Response] {
def apply(request: Request, service: Service[Request, Response]) = {
- val pathElements = PathDecoder.getPathElements(request.getUri())
- if (pathElements.size < 1 || !validator.isValidNamespace(pathElements(0))) {
- Future.exception(new SecurityException("Tried to access invalid resource"))
+ val (version,endpoint,ns,id) = PathDecoder.getPathElements(request.getUri())
+ if (!validator.isValidNamespace(ns)) {
+ Future.exception(new HttpSecurityException("Tried to access invalid resource"))
} else if (!(request.getMethod() == HttpMethod.POST || request.getMethod() == HttpMethod.PUT)) {
- Future.exception(new SecurityException("Tried to access " + request.getMethod() + " resource"))
+ Future.exception(new HttpSecurityException("Tried to access " + request.getMethod() + " resource"))
} else {
service(request)
}
@@ -54,30 +76,31 @@ object Baloo {
class KafkaService(producer: Producer[String,BagheeraMessage]) extends Service[Request, Response] {
def apply(request: Request) = {
- val pathElements = PathDecoder.getPathElements(request.getUri())
- var ns: String = {
- pathElements.size match {
- case 2 => pathElements(0)
- case _ => ""
- }
- }
- var id: String = {
- pathElements(1).length match {
- case 0 => UUID.randomUUID().toString()
- case _ => pathElements(1)
- }
- }
-
+ val (version,endpoint,ns,id) = PathDecoder.getPathElements(request.getUri())
var content = request.getContent();
- if (content.readable() && content.readableBytes() > 0) {
- var bmsgBuilder = BagheeraMessage.newBuilder()
- bmsgBuilder.setNamespace(ns)
- bmsgBuilder.setId(id)
- bmsgBuilder.setPayload(ByteString.copyFrom(content.toByteBuffer()))
- bmsgBuilder.setTimestamp(System.currentTimeMillis())
- producer.send(new ProducerData[String,BagheeraMessage](ns, bmsgBuilder.build()))
+ val respCode = content.readable() && content.readableBytes() > 0 match {
+ case true =>
+ var bmsgBuilder = BagheeraMessage.newBuilder()
+ bmsgBuilder.setNamespace(ns)
+ bmsgBuilder.setId(id)
+ bmsgBuilder.setPayload(ByteString.copyFrom(content.toByteBuffer()))
+ bmsgBuilder.setTimestamp(System.currentTimeMillis())
+ producer.send(new ProducerData[String,BagheeraMessage](ns, bmsgBuilder.build()))
+ if (request.containsHeader("X-Obsolete-Document")) {
+ val obsoleteId = request.getHeader("X-Obsolete-Document");
+ var obsBuilder = BagheeraMessage.newBuilder();
+ obsBuilder.setOperation(Operation.DELETE);
+ obsBuilder.setNamespace(ns);
+ obsBuilder.setId(obsoleteId);
+ obsBuilder.setIpAddr(bmsgBuilder.getIpAddr());
+ obsBuilder.setTimestamp(bmsgBuilder.getTimestamp());
+ producer.send(new ProducerData[String,BagheeraMessage](ns, obsBuilder.build()));
+ }
+ CREATED
+ case _ =>
+ BAD_REQUEST
}
- val response = new DefaultHttpResponse(HTTP_1_1, CREATED)
+ val response = new DefaultHttpResponse(HTTP_1_1, respCode)
response.setContent(copiedBuffer(id, UTF_8))
Future.value(Response(response))
}
View
31 src/main/scala/com/mozilla/baloo/PathDecoder.scala
@@ -1,15 +1,36 @@
package com.mozilla.baloo
import scala.util.matching._
+import java.util.UUID
object PathDecoder {
- val uriPattern = new Regex("^/([^/]+)/*([^/]*)$", "ns", "id")
- def getPathElements(uri: String): List[String] = {
- var m = uriPattern.findFirstMatchIn(uri)
+
+ val uriPattern = new Regex("^/([^/]+)/([^/]+)/*([^/]*)$", "endpoint", "ns", "id")
+ val uriPatternWithVersion = new Regex("^/([0-9]\\.*[0.9]*)/([^/]+)/([^/]+)/*([^/]*)$", "version", "endpoint", "ns", "id")
+
+ def getPathElements(uri: String): (String,String,String,String) = {
+ // check to see if this is a versioned URI first
+ var m = uriPatternWithVersion.findFirstMatchIn(uri)
if (m != None) {
- m.get.subgroups
+ var r = m.get
+ val id = r.group("id").length() > 0 match {
+ case true => r.group("id")
+ case _ => UUID.randomUUID().toString()
+ }
+ (r.group("version"), r.group("endpoint"), r.group("ns"), id)
} else {
- List[String]()
+ // default non-versioned URI
+ m = uriPattern.findFirstMatchIn(uri)
+ if (m != None) {
+ var r = m.get
+ val id = r.group("id").length() > 0 match {
+ case true => r.group("id")
+ case _ => UUID.randomUUID().toString()
+ }
+ ("", r.group("endpoint"), r.group("ns"), id)
+ } else {
+ ("", "", "", "")
+ }
}
}
}
View
4 src/main/scala/com/twitter/finagle/http/Codec.scala
@@ -2,7 +2,7 @@ package com.twitter.finagle.http
/**
* This is a near exact copy of the standard HTTP codec in finagle with the exception of adding
- * the HttpContentDecompressor to the server. This allows decompression of GZIP POST body.
+ * the HttpContentDecompressor to the server. This allows decompression of GZIP/Deflate POST body.
*
* This code is required to live in the finagle http package for now due to some private classes
* in the actual finagle-http code base.
@@ -23,6 +23,7 @@ import org.jboss.netty.channel.{
ChannelEvent, ChannelHandlerContext, SimpleChannelDownstreamHandler, MessageEvent}
import org.jboss.netty.handler.codec.http._
import com.twitter.finagle.transport.TransportFactory
+import com.mozilla.baloo.http.ContentEncodingCorrector
case class CompressedHttp(
_compressionLevel: Int = 0,
@@ -104,6 +105,7 @@ case class CompressedHttp(
new HttpChunkAggregator(maxRequestSizeInBytes))
if (_decompressionEnabled)
+ pipeline.addLast("encodingCorrector", new ContentEncodingCorrector)
pipeline.addLast("httpDecompressor", new HttpContentDecompressor)
_annotateCipherHeader foreach { headerName: String =>

No commit comments for this range

Something went wrong with that request. Please try again.