Permalink
Browse files

Sync'ing latest changes

  • Loading branch information...
1 parent da135af commit fef703bdb724e0ec366385ef2acd0383b94b4d28 @xstevens committed Feb 19, 2013
View
@@ -76,20 +76,20 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-core</artifactId>
- <version>5.3.19</version>
+ <version>5.3.23</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>finagle-http</artifactId>
- <version>5.3.19</version>
+ <version>5.3.23</version>
<scope>compile</scope>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-core</artifactId>
- <version>0.7.1-incubating</version>
+ <version>0.7.2-incubating</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.baloo.http;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMessage;
+
+/**
+ * If Content-Encoding isn't specified this checks the Content-Type to see
+ * if there is any indication that content is compressed even though encoding
+ * isn't set.
+ */
+public class ContentEncodingCorrector extends SimpleChannelUpstreamHandler {
+
+ public static final String MIME_TYPE_JSON_ZLIB = "application/json+zlib";
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ Object msg = e.getMessage();
+ if (msg instanceof HttpMessage) {
+ HttpMessage m = (HttpMessage) msg;
+ String contentEncoding = m.getHeader(HttpHeaders.Names.CONTENT_ENCODING);
+ if (contentEncoding == null) {
+ String contentType = m.getHeader(HttpHeaders.Names.CONTENT_TYPE);
+ if (contentType != null && contentType.startsWith(MIME_TYPE_JSON_ZLIB)) {
+ m.setHeader(HttpHeaders.Names.CONTENT_ENCODING,"deflate");
+ }
+ }
+ Channels.fireMessageReceived(ctx, m, e.getRemoteAddress());
+ } else {
+ ctx.sendUpstream(e);
+ }
+ }
+
+}
@@ -17,17 +17,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.mozilla.baloo;
+package com.mozilla.baloo.http;
-public class SecurityException extends Exception {
+public class HttpSecurityException extends Exception {
private static final long serialVersionUID = 480106127340879943L;
- public SecurityException(String message) {
+ public HttpSecurityException(String message) {
super(message);
}
- public SecurityException(String message, Throwable cause) {
+ public HttpSecurityException(String message, Throwable cause) {
super(message, cause);
}
}
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mozilla.baloo.http;
+
+public class InvalidPathException extends Exception {
+
+ private static final long serialVersionUID = -2753373949363309083L;
+
+ public InvalidPathException(String message) {
+ super(message);
+ }
+
+ public InvalidPathException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
@@ -1,10 +1,28 @@
+/*
+ * Copyright 2012 Mozilla Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.mozilla.baloo
import com.twitter.finagle.{Service, SimpleFilter}
import com.twitter.finagle.builder.{Server, ServerBuilder}
import com.twitter.finagle.http.{Request, Response, CompressedHttp, CompressedRichHttp}
import com.twitter.util.Future
-import com.mozilla.baloo.validator.Validator
import java.net.InetSocketAddress
import java.util.Properties
import java.util.UUID
@@ -15,6 +33,8 @@ import org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
import org.jboss.netty.util.CharsetUtil.UTF_8
import scala.collection.JavaConversions
+import com.mozilla.baloo.validator.Validator
+import com.mozilla.baloo.http._
import com.mozilla.bagheera.BagheeraProto.BagheeraMessage
import com.mozilla.bagheera.BagheeraProto.BagheeraMessage.Operation
import com.google.protobuf.ByteString
@@ -25,10 +45,12 @@ object Baloo {
def apply(request: Request, service: Service[Request, Response]) = {
service(request) handle { case error =>
val statusCode = error match {
+ case _: InvalidPathException =>
+ NOT_FOUND
+ case _: HttpSecurityException =>
+ FORBIDDEN
case _: IllegalArgumentException =>
NOT_ACCEPTABLE
- case _: SecurityException =>
- FORBIDDEN
case _ =>
INTERNAL_SERVER_ERROR
}
@@ -41,11 +63,11 @@ object Baloo {
class AccessFilter(validator: Validator) extends SimpleFilter[Request, Response] {
def apply(request: Request, service: Service[Request, Response]) = {
- val pathElements = PathDecoder.getPathElements(request.getUri())
- if (pathElements.size < 1 || !validator.isValidNamespace(pathElements(0))) {
- Future.exception(new SecurityException("Tried to access invalid resource"))
+ val (version,endpoint,ns,id) = PathDecoder.getPathElements(request.getUri())
+ if (!validator.isValidNamespace(ns)) {
+ Future.exception(new HttpSecurityException("Tried to access invalid resource"))
} else if (!(request.getMethod() == HttpMethod.POST || request.getMethod() == HttpMethod.PUT)) {
- Future.exception(new SecurityException("Tried to access " + request.getMethod() + " resource"))
+ Future.exception(new HttpSecurityException("Tried to access " + request.getMethod() + " resource"))
} else {
service(request)
}
@@ -54,30 +76,31 @@ object Baloo {
class KafkaService(producer: Producer[String,BagheeraMessage]) extends Service[Request, Response] {
def apply(request: Request) = {
- val pathElements = PathDecoder.getPathElements(request.getUri())
- var ns: String = {
- pathElements.size match {
- case 2 => pathElements(0)
- case _ => ""
- }
- }
- var id: String = {
- pathElements(1).length match {
- case 0 => UUID.randomUUID().toString()
- case _ => pathElements(1)
- }
- }
-
+ val (version,endpoint,ns,id) = PathDecoder.getPathElements(request.getUri())
var content = request.getContent();
- if (content.readable() && content.readableBytes() > 0) {
- var bmsgBuilder = BagheeraMessage.newBuilder()
- bmsgBuilder.setNamespace(ns)
- bmsgBuilder.setId(id)
- bmsgBuilder.setPayload(ByteString.copyFrom(content.toByteBuffer()))
- bmsgBuilder.setTimestamp(System.currentTimeMillis())
- producer.send(new ProducerData[String,BagheeraMessage](ns, bmsgBuilder.build()))
+ val respCode = content.readable() && content.readableBytes() > 0 match {
+ case true =>
+ var bmsgBuilder = BagheeraMessage.newBuilder()
+ bmsgBuilder.setNamespace(ns)
+ bmsgBuilder.setId(id)
+ bmsgBuilder.setPayload(ByteString.copyFrom(content.toByteBuffer()))
+ bmsgBuilder.setTimestamp(System.currentTimeMillis())
+ producer.send(new ProducerData[String,BagheeraMessage](ns, bmsgBuilder.build()))
+ if (request.containsHeader("X-Obsolete-Document")) {
+ val obsoleteId = request.getHeader("X-Obsolete-Document");
+ var obsBuilder = BagheeraMessage.newBuilder();
+ obsBuilder.setOperation(Operation.DELETE);
+ obsBuilder.setNamespace(ns);
+ obsBuilder.setId(obsoleteId);
+ obsBuilder.setIpAddr(bmsgBuilder.getIpAddr());
+ obsBuilder.setTimestamp(bmsgBuilder.getTimestamp());
+ producer.send(new ProducerData[String,BagheeraMessage](ns, obsBuilder.build()));
+ }
+ CREATED
+ case _ =>
+ BAD_REQUEST
}
- val response = new DefaultHttpResponse(HTTP_1_1, CREATED)
+ val response = new DefaultHttpResponse(HTTP_1_1, respCode)
response.setContent(copiedBuffer(id, UTF_8))
Future.value(Response(response))
}
@@ -1,15 +1,36 @@
package com.mozilla.baloo
import scala.util.matching._
+import java.util.UUID
object PathDecoder {
- val uriPattern = new Regex("^/([^/]+)/*([^/]*)$", "ns", "id")
- def getPathElements(uri: String): List[String] = {
- var m = uriPattern.findFirstMatchIn(uri)
+
+ val uriPattern = new Regex("^/([^/]+)/([^/]+)/*([^/]*)$", "endpoint", "ns", "id")
+ val uriPatternWithVersion = new Regex("^/([0-9]\\.*[0.9]*)/([^/]+)/([^/]+)/*([^/]*)$", "version", "endpoint", "ns", "id")
+
+ def getPathElements(uri: String): (String,String,String,String) = {
+ // check to see if this is a versioned URI first
+ var m = uriPatternWithVersion.findFirstMatchIn(uri)
if (m != None) {
- m.get.subgroups
+ var r = m.get
+ val id = r.group("id").length() > 0 match {
+ case true => r.group("id")
+ case _ => UUID.randomUUID().toString()
+ }
+ (r.group("version"), r.group("endpoint"), r.group("ns"), id)
} else {
- List[String]()
+ // default non-versioned URI
+ m = uriPattern.findFirstMatchIn(uri)
+ if (m != None) {
+ var r = m.get
+ val id = r.group("id").length() > 0 match {
+ case true => r.group("id")
+ case _ => UUID.randomUUID().toString()
+ }
+ ("", r.group("endpoint"), r.group("ns"), id)
+ } else {
+ ("", "", "", "")
+ }
}
}
}
@@ -2,7 +2,7 @@ package com.twitter.finagle.http
/**
* This is a near exact copy of the standard HTTP codec in finagle with the exception of adding
- * the HttpContentDecompressor to the server. This allows decompression of GZIP POST body.
+ * the HttpContentDecompressor to the server. This allows decompression of GZIP/Deflate POST body.
*
* This code is required to live in the finagle http package for now due to some private classes
* in the actual finagle-http code base.
@@ -23,6 +23,7 @@ import org.jboss.netty.channel.{
ChannelEvent, ChannelHandlerContext, SimpleChannelDownstreamHandler, MessageEvent}
import org.jboss.netty.handler.codec.http._
import com.twitter.finagle.transport.TransportFactory
+import com.mozilla.baloo.http.ContentEncodingCorrector
case class CompressedHttp(
_compressionLevel: Int = 0,
@@ -104,6 +105,7 @@ case class CompressedHttp(
new HttpChunkAggregator(maxRequestSizeInBytes))
if (_decompressionEnabled)
+ pipeline.addLast("encodingCorrector", new ContentEncodingCorrector)
pipeline.addLast("httpDecompressor", new HttpContentDecompressor)
_annotateCipherHeader foreach { headerName: String =>

0 comments on commit fef703b

Please sign in to comment.