Skip to content

Commit

Permalink
upgraded to thrift 7
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Oct 19, 2011
1 parent fbe24e6 commit c89270b
Show file tree
Hide file tree
Showing 38 changed files with 4,308 additions and 4,640 deletions.
3 changes: 2 additions & 1 deletion project.clj
Expand Up @@ -10,7 +10,7 @@
[commons-io "1.4"]
[org.apache.commons/commons-exec "1.1"]
[jvyaml "1.0.0"]
[backtype/thriftjava "1.0.0"]
[org.apache.thrift/libthrift "0.7.0"]
[clj-time "0.3.0"]
[log4j/log4j "1.2.16"]
[org.apache.zookeeper/zookeeper "3.3.2"]
Expand All @@ -19,6 +19,7 @@
[compojure "0.6.4"]
[hiccup "0.3.6"]
[ring/ring-jetty-adapter "0.3.11"]
[org.slf4j/slf4j-log4j12 "1.5.8"]
]
:uberjar-exclusions [#"META-INF.*"]
:dev-dependencies [
Expand Down
15 changes: 7 additions & 8 deletions src/clj/backtype/storm/daemon/drpc.clj
@@ -1,5 +1,5 @@
(ns backtype.storm.daemon.drpc
(:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [org.apache.thrift TException])
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
Expand Down Expand Up @@ -64,13 +64,12 @@
([spout-adder]
(launch-server! DEFAULT-PORT spout-adder))
([port spout-adder]
(let [options (THsHaServer$Options.)
_ (set! (. options maxWorkerThreads) 64)
service-handler (service-handler spout-adder port)
server (THsHaServer.
(DistributedRPC$Processor. service-handler)
(TNonblockingServerSocket. port)
(TBinaryProtocol$Factory.) options)]
(let [service-handler (service-handler spout-adder port)
options (THsHaServer$Args. (TNonblockingServerSocket. port))
_ (set! (. options maxWorkerThreads) 64)
_ (set! (. options processor) (DistributedRPC$Processor. service-handler))
_ (set! (. options protocolFactory) (TBinaryProtocol$Factory.))
server (THsHaServer. options)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
(log-message "Starting Distributed RPC server...")
(.serve server))))
Expand Down
42 changes: 23 additions & 19 deletions src/clj/backtype/storm/daemon/nimbus.clj
@@ -1,8 +1,10 @@
(ns backtype.storm.daemon.nimbus
(:import [org.apache.thrift.server THsHaServer THsHaServer$Options])
(:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
(:import [org.apache.thrift TException])
(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [java.nio ByteBuffer])
(:import [java.nio.channels Channels WritableByteChannel])
(:use [backtype.storm bootstrap])
(:use [backtype.storm.daemon common])
(:gen-class))
Expand Down Expand Up @@ -469,26 +471,26 @@

(beginFileUpload [this]
(let [fileloc (str inbox "/stormjar-" (uuid) ".jar")]
(.put uploaders fileloc (FileOutputStream. fileloc))
(.put uploaders fileloc (Channels/newChannel (FileOutputStream. fileloc)))
(log-message "Uploading file from client to " fileloc)
fileloc
))

(^void uploadChunk [this ^String location ^bytes chunk]
(let [^FileOutputStream os (.get uploaders location)]
(when-not os
(^void uploadChunk [this ^String location ^ByteBuffer chunk]
(let [^WritableByteChannel channel (.get uploaders location)]
(when-not channel
(throw (RuntimeException.
"File for that location does not exist (or timed out)")))
(.write os chunk)
(.put uploaders location os)
(.write channel chunk)
(.put uploaders location channel)
))

(^void finishFileUpload [this ^String location]
(let [^FileOutputStream os (.get uploaders location)]
(when-not os
(let [^WritableByteChannel channel (.get uploaders location)]
(when-not channel
(throw (RuntimeException.
"File for that location does not exist (or timed out)")))
(.close os)
(.close channel)
(log-message "Finished uploading file from client: " location)
(.remove uploaders location)
))
Expand All @@ -500,7 +502,7 @@
id
))

(^bytes downloadChunk [this ^String id]
(^ByteBuffer downloadChunk [this ^String id]
(let [^BufferFileInputStream is (.get downloaders id)]
(when-not is
(throw (RuntimeException.
Expand All @@ -509,7 +511,7 @@
(.put downloaders id is)
(when (empty? ret)
(.remove downloaders id))
ret
(ByteBuffer/wrap ret)
)))

(^String getTopologyConf [this ^String id]
Expand Down Expand Up @@ -604,13 +606,15 @@

(defn launch-server! [conf]
(validate-distributed-mode! conf)
(let [options (THsHaServer$Options.)
_ (set! (. options maxWorkerThreads) 64)
service-handler (service-handler conf)
server (THsHaServer.
(Nimbus$Processor. service-handler)
(TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
(TBinaryProtocol$Factory.) options)]
(let [service-handler (service-handler conf)
options (THsHaServer$Args.
(TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))))
_ (set! (. options workerThreads) 64)
_ (set! (. options processor) (Nimbus$Processor. service-handler))
_ (set! (. options protocolFactory) (TBinaryProtocol$Factory.))
_ (set! (. options maxWorkerThreads) 64)

server (THsHaServer. options)]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(.serve server)))
Expand Down
2 changes: 1 addition & 1 deletion src/genthrift.sh
@@ -1,6 +1,6 @@
rm -rf gen-javabean gen-py py
rm -rf jvm/backtype/storm/generated
thrift --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift
thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift
mv gen-javabean/backtype/storm/generated jvm/backtype/storm/generated
mv gen-py py
rm -rf gen-javabean
3 changes: 2 additions & 1 deletion src/jvm/backtype/storm/StormSubmitter.java
Expand Up @@ -7,6 +7,7 @@
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -82,7 +83,7 @@ public static String submitJar(Map conf, String localJar) {
while(true) {
byte[] toSubmit = is.read();
if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation, toSubmit);
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
Expand Down
96 changes: 52 additions & 44 deletions src/jvm/backtype/storm/generated/AlreadyAliveException.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c89270b

Please sign in to comment.