Browse files

Added topic back to Message. Publisher can be shared for publishing o…

…n multiple topics.
  • Loading branch information...
1 parent 00d6221 commit 026ea5f55c25972268a3a3869af5ba5439cd993b Sharad Agarwal committed Apr 2, 2012
View
13 messaging-client-core/src/main/java/com/inmobi/messaging/AbstractMessagePublisher.java
@@ -4,28 +4,23 @@
import java.util.Map;
import com.inmobi.instrumentation.TimingAccumulator;
-import com.inmobi.messaging.Message;
import com.inmobi.stats.EmitterRegistry;
import com.inmobi.stats.StatsEmitter;
import com.inmobi.stats.StatsExposer;
public abstract class AbstractMessagePublisher implements MessagePublisher {
private final TimingAccumulator stats = new TimingAccumulator();
- private String topic;
private StatsEmitter emitter;
private StatsExposer statExposer;
-
- @Override
- public String getTopic() {
- return topic;
- }
+ private static final String HEADER_TOPIC = "topic";
@Override
public void publish(Message m) {
getStats().accumulateInvocation();
//TODO: generate headers
Map<String, String> headers = new HashMap<String, String>();
+ headers.put(HEADER_TOPIC, m.getTopic());
publish(headers, m);
}
@@ -37,13 +32,11 @@ public TimingAccumulator getStats() {
}
@Override
- public void init(String topic, ClientConfig config) {
- this.topic = topic;
+ public void init(ClientConfig config) {
try {
String emitterConfig = null;//TODO; get from the classpath
emitter = EmitterRegistry.lookup(emitterConfig);
final Map<String, String> contexts = new HashMap<String, String>();
- contexts.put("topic", topic);
contexts.put("messaging_type", "application");
statExposer = new StatsExposer() {
View
74 messaging-client-core/src/main/java/com/inmobi/messaging/Message.java
@@ -3,36 +3,46 @@
import java.util.Arrays;
public final class Message {
-
- private final byte message[];
-
- public Message(byte message[]) {
- this.message = message;
- }
-
- public byte[] getMessage() {
- return message;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + Arrays.hashCode(message);
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Message other = (Message) obj;
- if (!Arrays.equals(message, other.message))
- return false;
- return true;
- }
+
+ private final String topic;
+ private final byte message[];
+
+ public Message(String topic, byte message[]) {
+ this.topic = topic;
+ this.message = message;
+ }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(message);
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Message other = (Message) obj;
+ if (!Arrays.equals(message, other.message))
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+ return true;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public byte[] getMessage() {
+ return message;
+ }
}
View
3 messaging-client-core/src/main/java/com/inmobi/messaging/MessagePublisher.java
@@ -2,8 +2,7 @@
public interface MessagePublisher {
- void init(String topic, ClientConfig config);
+ void init(ClientConfig config);
void close();
- String getTopic();
void publish(Message m);
}
View
4 messaging-client-flume/src/main/java/com/inmobi/messaging/flume/FlumeMessagePublisher.java
@@ -31,8 +31,8 @@
private Thread senderThread;
@Override
- public void init(String topic, ClientConfig config) {
- super.init(topic, config);
+ public void init(ClientConfig config) {
+ super.init(config);
threadPool = new ThreadPoolExecutor(CONCURRENT_SENDERS,
CONCURRENT_SENDERS, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>());
View
23 messaging-client-logappender/src/main/java/com/inmobi/messaging/MessageAppender.java
@@ -3,6 +3,11 @@
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
+/*
+ * Setting of fixed topic is deprecated.
+ * Only com.inmobi.messaging.Message is valid object type in
+ * LoggingEvent.getMessage(). byte[], String and TBase types are deprecated.
+ */
public class MessageAppender extends AppenderSkeleton {
private String topic;
@@ -18,10 +23,15 @@ public void setPublisherClass(String publisherClass) {
this.publisherClass = publisherClass;
}
+ @Deprecated
public String getTopic() {
return topic;
}
+ /*
+ * Setting of fixed topic is deprecated
+ */
+ @Deprecated
public void setTopic(String topic) {
this.topic = topic;
}
@@ -41,10 +51,13 @@ public boolean requiresLayout() {
@Override
protected void append(LoggingEvent event) {
Object o = event.getMessage();
- if (o instanceof byte[]) {
- publisher.publish(new Message((byte[]) o));
- } else if (o instanceof String) {
- publisher.publish(new Message(((String) o).getBytes()));
+ if (o instanceof Message) {
+ publisher.publish((Message) o);
+ }
+ else if (o instanceof byte[]) {//deprecated support
+ publisher.publish(new Message(this.topic, (byte[]) o));
+ } else if (o instanceof String) {//deprecated support
+ publisher.publish(new Message(this.topic, ((String) o).getBytes()));
} /*else
//TBase support only for backward compatibility
//would be deprecated
@@ -65,7 +78,7 @@ public void activateOptions() {
Class clz = Class.forName(publisherClass);
publisher = (AbstractMessagePublisher) clz.newInstance();
ClientConfig config = ClientConfig.load();
- publisher.init(topic, config);
+ publisher.init(config);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
View
6 messaging-client-scribe/src/main/java/com/inmobi/messaging/netty/ScribeMessagePublisher.java
@@ -43,8 +43,8 @@ public void connect() {
}
@Override
- public void init(String topic, ClientConfig config) {
- super.init(topic, config);
+ public void init(ClientConfig config) {
+ super.init(config);
host = config.getString("host", "localhost");
port = config.getInteger("port", 1111);
int backoffSeconds = config.getInteger("backoffSeconds", 5);
@@ -64,7 +64,7 @@ public void init(String topic, ClientConfig config) {
@Override
protected void publish(Map<String, String> headers, Message m) {
if (ch != null) {
- ScribeBites.publish(ch, getTopic(), m);
+ ScribeBites.publish(ch, m.getTopic(), m);
} else {
suggestReconnect();
}

0 comments on commit 026ea5f

Please sign in to comment.