Skip to content
This repository has been archived by the owner on Apr 30, 2019. It is now read-only.

Commit

Permalink
changes for logging , id based partition selection
Browse files Browse the repository at this point in the history
  • Loading branch information
Harsha committed Jan 24, 2014
1 parent 2701fbc commit b319e4e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
16 changes: 11 additions & 5 deletions src/main/java/com/mozilla/bagheera/http/SubmissionHandler.java
Expand Up @@ -99,7 +99,7 @@ private void updateResponseMetrics(String namespace, int status) {
private void handlePost(MessageEvent e, BagheeraHttpRequest request) {
HttpResponseStatus status = BAD_REQUEST;
ChannelBuffer content = request.getContent();

String remoteIpAddress = HttpUtil.getRemoteAddr(request, ((InetSocketAddress)e.getChannel().getRemoteAddress()).getAddress().getHostAddress());
if (content.readable() && content.readableBytes() > 0) {
BagheeraMessage.Builder templateBuilder = BagheeraMessage.newBuilder();
setMessageFields(request, e, templateBuilder, System.currentTimeMillis(), false);
Expand All @@ -109,9 +109,11 @@ private void handlePost(MessageEvent e, BagheeraHttpRequest request) {
storeBuilder.setPayload(ByteString.copyFrom(content.toByteBuffer()));
storeBuilder.setId(request.getId());
producer.send(storeBuilder.build());
LOG.info(request.getNamespace()+" HTTP_PUT "+request.getId());

if (request.containsHeader(HEADER_OBSOLETE_DOCUMENT)) {
handleObsoleteDocuments(request.getNamespace(),request.getHeaders(HEADER_OBSOLETE_DOCUMENT), template);
handleObsoleteDocuments(request,remoteIpAddress,request.getHeaders(HEADER_OBSOLETE_DOCUMENT), template);
} else {
LOG.info("IP "+remoteIpAddress+" "+request.getNamespace()+" HTTP_PUT "+request.getId());
}

status = CREATED;
Expand Down Expand Up @@ -140,7 +142,7 @@ protected void setMessageFields(BagheeraHttpRequest request, MessageEvent event,
}
}

private void handleObsoleteDocuments(String namespace, List<String> headers, BagheeraMessage template) {
private void handleObsoleteDocuments(BagheeraHttpRequest request, String remoteIpAddress,List<String> headers, BagheeraMessage template) {
// According to RFC 2616, the standard for multi-valued document headers is
// a comma-separated list:
// http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2
Expand All @@ -157,13 +159,14 @@ private void handleObsoleteDocuments(String namespace, List<String> headers, Bag
// combined field value, and thus a proxy MUST NOT change the order
// of these field values when a message is forwarded.
// ------------------------------------------------------------------
String deleteIDs = "";
for (String header : headers) {
// Split on comma, delete each one.
// The performance penalty for supporting multiple values is
// tested in BagheeraHttpRequestTest.testSplitPerformance().
if (header != null) {
for (String obsoleteIdRaw : header.split(",")) {
LOG.info(namespace+" HTTP_DELETE "+obsoleteIdRaw.trim());
deleteIDs += obsoleteIdRaw.trim()+",";
// Use the given message as a base for creating each delete message.
BagheeraMessage.Builder deleteBuilder = BagheeraMessage.newBuilder(template);
deleteBuilder.setOperation(Operation.DELETE);
Expand All @@ -172,11 +175,14 @@ private void handleObsoleteDocuments(String namespace, List<String> headers, Bag
}
}
}
LOG.info("IP "+remoteIpAddress+" "+request.getNamespace()+" HTTP_PUT "+request.getId()+" HTTP_DELETE "+deleteIDs);
}

private void handleDelete(MessageEvent e, BagheeraHttpRequest request) {
BagheeraMessage.Builder bmsgBuilder = BagheeraMessage.newBuilder();
setMessageFields(request, e, bmsgBuilder, System.currentTimeMillis(), true);
String remoteIpAddress = HttpUtil.getRemoteAddr(request, ((InetSocketAddress)e.getChannel().getRemoteAddress()).getAddress().getHostAddress());
LOG.info("IP "+remoteIpAddress+" "+request.getNamespace()+" HTTP_DELETE "+request.getId());
bmsgBuilder.setOperation(Operation.DELETE);
producer.send(bmsgBuilder.build());
updateRequestMetrics(request.getNamespace(), request.getMethod().getName(), 0);
Expand Down
Expand Up @@ -20,7 +20,8 @@
package com.mozilla.bagheera.producer;

import java.util.Properties;

import java.util.List;
import java.util.ArrayList;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;
import kafka.producer.ProducerConfig;
Expand Down Expand Up @@ -50,7 +51,9 @@ public void close() {
*/
@Override
public void send(BagheeraMessage msg) {
producer.send(new ProducerData<String,BagheeraMessage>(msg.getNamespace(), msg));
List<BagheeraMessage> list = new ArrayList<BagheeraMessage>();
list.add(msg);
producer.send(new ProducerData<String,BagheeraMessage>(msg.getNamespace(), msg.getId(),list));
}

}

0 comments on commit b319e4e

Please sign in to comment.