Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions conf/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>

<configuration scan="true" scanPeriod="30 seconds">
<appender name="A1" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-4r [%t] %-5p %c - %m%n</pattern>
</encoder>
</appender>
<logger name="org.apache.zookeeper" level="WARN"/>
<root level="INFO">
<appender-ref ref="A1"/>
</root>
</configuration>
16 changes: 9 additions & 7 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
(defproject storm/storm-benchmark "0.0.1-SNAPSHOT"
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
:dependencies []
:dev-dependencies [[storm "0.8.1-wip5"]
[org.clojure/clojure "1.4.0"]]
)
:source-paths ["src/clj"]
:java-source-paths ["src/jvm"]
:test-paths ["test/clj"]
:resource-paths ["./conf"]
:javac-options {:debug "true"}
:dependencies [[storm/storm-core "0.9.0-wip19"]
[storm/storm-netty "0.9.0-wip19"]
[org.clojure/clojure "1.4.0"]]
:jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"])
94 changes: 94 additions & 0 deletions src/jvm/storm/benchmark/MessagingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package storm.benchmark;

import java.util.Map;

import backtype.storm.messaging.IContext;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TransportFactory;
import backtype.storm.messaging.netty.StormClientHandler;
import backtype.storm.utils.Utils;
import backtype.storm.Config;

public class MessagingTest {
private static final int PORT = 6800;
private static final int TASK = 1;
private static final int MSG_COUNT = 100000;
private static final String STORM_ID = "abc";
private static final int BUFFER_SIZE = 102400;
private static final int REPEATS = 1000;

private static void batch_bench(IConnection client, IConnection server) {
long startTime = System.currentTimeMillis();

for (int ind = 1; ind <= MSG_COUNT; ind ++) {
String req_msg = new Integer(ind).toString();
client.send(TASK, req_msg.getBytes());
}

for (int ind = 1; ind <= MSG_COUNT; ind ++) {
server.recv(0);
}

long endTime = System.currentTimeMillis();
System.out.println("Total execution time: " + (endTime-startTime) + "ms");
}

private static void one_by_one_bench(IConnection client, IConnection server) {
long startTime = System.currentTimeMillis();

for (int ind = 1; ind <= MSG_COUNT; ind ++) {
String req_msg = new Integer(ind).toString();
client.send(TASK, req_msg.getBytes());
server.recv(0);
}

long endTime = System.currentTimeMillis();
System.out.println("Total execution time: " + (endTime-startTime) + "ms");
}

private static void benchmark(Map conf, String plugin_name) {
IContext context = TransportFactory.makeContext(conf);
IConnection server = context.bind(null, PORT);
IConnection client = context.connect(null, "localhost", PORT);
System.out.println("("+plugin_name+") "+REPEATS+" messages of payload size "+ BUFFER_SIZE + " sent one by one");
one_by_one_bench(client, server);
System.out.println("("+plugin_name+") "+MSG_COUNT+" short msgs in batches" );
batch_bench(client,server);
client.close();
server.close();
context.term();
}

@SuppressWarnings("unchecked")
public static void benchmark_netty() {
Map conf = new Config();
conf.put(Config.STORM_MESSAGING_TRANSPORT, "backtype.storm.messaging.netty.Context");
conf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, BUFFER_SIZE);
conf.put(Config.STORM_MESSAGING_NETTY_MAX_RETRIES, 10);
conf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
conf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);

benchmark(conf, "Netty");
}

@SuppressWarnings("unchecked")
public static void benchmark_zmq() {
Map conf = Utils.readDefaultConfig();
conf.put(Config.STORM_MESSAGING_TRANSPORT, "backtype.storm.messaging.zmq");
conf.put("topology.executor.receive.buffer.size", BUFFER_SIZE);
conf.put("topology.executor.send.buffer.size", BUFFER_SIZE);
conf.put("topology.receiver.buffer.size", BUFFER_SIZE);
conf.put("topology.transfer.buffer.size", BUFFER_SIZE);

benchmark(conf, "ZMQ");
}

/**
* @param args
*/
public static void main(String[] args) {
benchmark_netty();

benchmark_zmq();
}
}
8 changes: 8 additions & 0 deletions test/clj/storm/benchmark/messaging_benchmark_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
(ns storm.benchmark.messaging-benchmark-test
(:use [clojure test])
(:import [storm.benchmark MessagingTest]))

(deftest test-messaging
(MessagingTest/benchmark_netty)
(MessagingTest/benchmark_zmq)
)