From 65848efe8f443de0016b7c04b128434a8c613403 Mon Sep 17 00:00:00 2001 From: afeng Date: Fri, 15 Feb 2013 15:09:55 -0800 Subject: [PATCH] integration test for authentication and authorization --- src/jvm/backtype/storm/Config.java | 5 + .../storm/security/auth/NoopAuthorizer.java | 12 +- .../storm/security/auth/auth_test.clj | 147 ++++++++++++++++++ .../storm/security/auth/digest_auth_test.clj | 56 ------- 4 files changed, 158 insertions(+), 62 deletions(-) create mode 100644 test/clj/backtype/storm/security/auth/auth_test.clj delete mode 100644 test/clj/backtype/storm/security/auth/digest_auth_test.clj diff --git a/src/jvm/backtype/storm/Config.java b/src/jvm/backtype/storm/Config.java index a2b0d8414..741789ddd 100644 --- a/src/jvm/backtype/storm/Config.java +++ b/src/jvm/backtype/storm/Config.java @@ -206,6 +206,11 @@ public class Config extends HashMap { */ public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator"; + /** + * Class name for authorization plugin for Nimbus + */ + public static String NIMBUS_AUTHORIZATION_CLASSNAME = "nimbus.authorization.classname"; + /** * Storm UI binds to this port. */ diff --git a/src/jvm/backtype/storm/security/auth/NoopAuthorizer.java b/src/jvm/backtype/storm/security/auth/NoopAuthorizer.java index 4b75a340f..cba94d141 100644 --- a/src/jvm/backtype/storm/security/auth/NoopAuthorizer.java +++ b/src/jvm/backtype/storm/security/auth/NoopAuthorizer.java @@ -21,12 +21,12 @@ public class NoopAuthorizer implements IAuthorization { */ public boolean permit(ReqContext context) { LOG.info("Access " - + " from: " + context.remoteAddress() == null - ? "null" : context.remoteAddress().toString() - + " principal:"+context.principal() == null - ? "null" : context.principal() - +" op:"+context.operation() - + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME)); + + " from: " + context.remoteAddress() == null + ? "null" : context.remoteAddress().toString() + + " principal:"+context.principal() == null + ? "null" : context.principal() + +" op:"+context.operation() + + " topoology:"+ context.topologyConf().get(Config.TOPOLOGY_NAME)); return true; } } diff --git a/test/clj/backtype/storm/security/auth/auth_test.clj b/test/clj/backtype/storm/security/auth/auth_test.clj new file mode 100644 index 000000000..e6aa3a046 --- /dev/null +++ b/test/clj/backtype/storm/security/auth/auth_test.clj @@ -0,0 +1,147 @@ +(ns backtype.storm.security.auth.auth-test + (:use [clojure test]) + (:require [backtype.storm.daemon [nimbus :as nimbus]]) + (:import [org.apache.thrift7 TException]) + (:import [org.apache.thrift7.transport TTransportException]) + (:import [java.nio ByteBuffer]) + (:import [backtype.storm.utils NimbusClient]) + (:import [backtype.storm.security.auth ThriftServer ThriftClient ReqContext ReqContext$OperationType]) + (:use [backtype.storm bootstrap util]) + (:use [backtype.storm.daemon common]) + (:use [backtype.storm bootstrap testing]) + (:import [backtype.storm.generated Nimbus Nimbus$Client]) + ) + +(bootstrap) + +(defn mk-authorization-handler [conf] + (let [klassname (conf NIMBUS-AUTHORIZATION-CLASSNAME) + aznClass (if klassname (Class/forName klassname)) + aznHandler (if aznClass (.newInstance aznClass))] + (log-debug "authorization class name:" klassname + " class:" aznClass + " handler:" aznHandler) + aznHandler + )) + +(defn nimbus-data [conf inimbus] + (let [forced-scheduler (.getForcedScheduler inimbus)] + {:conf conf + :inimbus inimbus + :authorization-handler (mk-authorization-handler conf) + :submitted-count (atom 0) + :storm-cluster-state nil + :submit-lock (Object.) + :heartbeats-cache (atom {}) + :downloaders nil + :uploaders nil + :uptime (uptime-computer) + :validator nil + :timer nil + :scheduler nil + })) + +(defn update-req-context! [nimbus storm-name storm-conf operation] + (let [req (ReqContext/context)] + (.setOperation req operation) + (if storm-conf (.setTopologyConf req storm-conf) + (let [topologyConf { TOPOLOGY-NAME storm-name} ] + (.setTopologyConf req topologyConf))) + req)) + +(defn check-authorization! [nimbus storm-name storm-conf operation] + (let [aclHandler (:authorization-handler nimbus)] + (log-debug "check-authorization with handler: " aclHandler) + (if aclHandler + (let [req (update-req-context! nimbus storm-name storm-conf operation)] + (if-not (.permit aclHandler req) + (throw (RuntimeException. (str operation " on topology " storm-name " is not authorized"))) + ))))) + +(defn dummy-service-handler [conf inimbus] + (let [nimbus (nimbus-data conf inimbus)] + (reify Nimbus$Iface + (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology + ^SubmitOptions submitOptions] + (check-authorization! nimbus storm-name nil (ReqContext$OperationType/SUBMIT_TOPOLOGY))) + + (^void killTopology [this ^String storm-name] + (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY))) + + (^void killTopologyWithOpts [this ^String storm-name ^KillOptions options] + (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY))) + + (^void rebalance [this ^String storm-name ^RebalanceOptions options] + (check-authorization! nimbus storm-name nil (ReqContext$OperationType/REBALANCE_TOPOLOGY))) + + (activate [this storm-name] + (check-authorization! nimbus storm-name nil (ReqContext$OperationType/ACTIVATE_TOPOLOGY))) + + (deactivate [this storm-name] + (check-authorization! nimbus storm-name nil (ReqContext$OperationType/DEACTIVATE_TOPOLOGY))) + + (beginFileUpload [this]) + + (^void uploadChunk [this ^String location ^ByteBuffer chunk]) + + (^void finishFileUpload [this ^String location]) + + (^String beginFileDownload [this ^String file]) + + (^ByteBuffer downloadChunk [this ^String id]) + + (^String getNimbusConf [this]) + + (^String getTopologyConf [this ^String id]) + + (^StormTopology getTopology [this ^String id]) + + (^StormTopology getUserTopology [this ^String id]) + + (^ClusterSummary getClusterInfo [this]) + + (^TopologyInfo getTopologyInfo [this ^String storm-id])))) + +(defn launch-test-server [server-port login-cfg aznClass] + (System/setProperty "java.security.auth.login.config" login-cfg) + (let [conf (merge (read-storm-config) + {NIMBUS-AUTHORIZATION-CLASSNAME aznClass + NIMBUS-HOST "localhost" + NIMBUS-THRIFT-PORT server-port}) + nimbus (nimbus/standalone-nimbus) + service-handler (dummy-service-handler conf nimbus) + server (ThriftServer. (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))] + (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server)))) + (log-message "Starting Nimbus server...") + (.serve server))) + +(defn launch-server-w-wait [server-port ms login-cfg aznClass] + (future (launch-test-server server-port login-cfg aznClass)) + (log-message "Waiting for Nimbus Server...") + (Thread/sleep ms)) + +(deftest authorization-test + (launch-server-w-wait 6627 1000 "" "backtype.storm.security.auth.DenyAuthorizer") + (log-message "Starting Nimbus client w/ anonymous authentication") + (let [client (NimbusClient. "localhost" 6627) + nimbus_client (.getClient client)] + (is (thrown? TTransportException + (.activate nimbus_client "bogus_topology"))) + (.close client))) + +(deftest authentication-test + (launch-server-w-wait 6628 1000 "./conf/jaas_digest.conf" "backtype.storm.security.auth.NoopAuthorizer") + (System/setProperty "java.security.auth.login.config" "") + (log-message "Starting Nimbus client w/ anonymous authentication (expect authentication failure") + (is (= "Peer indicated failure: Unsupported mechanism type ANONYMOUS" + (try (NimbusClient. "localhost" 6628) + nil + (catch java.lang.RuntimeException ex + (.getMessage (.getCause ex)))))) + (log-message "Starting Nimbus client w/ digest authentication (expect authentication success)") + (System/setProperty "java.security.auth.login.config" "./conf/jaas_digest.conf") + (let [client (NimbusClient. "localhost" 6628) + nimbus_client (.getClient client)] + (.activate nimbus_client "bogus_topology") + (.close client))) + diff --git a/test/clj/backtype/storm/security/auth/digest_auth_test.clj b/test/clj/backtype/storm/security/auth/digest_auth_test.clj deleted file mode 100644 index 8c4a91f31..000000000 --- a/test/clj/backtype/storm/security/auth/digest_auth_test.clj +++ /dev/null @@ -1,56 +0,0 @@ -(ns backtype.storm.security.auth.digest-auth-test - (:use [clojure test]) - (:require [backtype.storm.daemon [nimbus :as nimbus]]) - (:import [org.apache.thrift7 TException]) - (:import [java.nio.channels Channels WritableByteChannel]) - (:import [backtype.storm.utils NimbusClient]) - (:import [backtype.storm.security.auth ThriftServer ThriftClient ReqContext ReqContext$OperationType]) - (:use [backtype.storm bootstrap util]) - (:use [backtype.storm.daemon common]) - (:use [backtype.storm bootstrap testing]) - (:import [backtype.storm.generated Nimbus Nimbus$Client]) - ) - -(bootstrap) - -(def server-port 6627) - -; Exceptions are getting wrapped in RuntimeException. This might be due to -; CLJ-855. -(defn- unpack-runtime-exception [expression] - (try (eval expression) - nil - (catch java.lang.RuntimeException gripe - (throw (.getCause gripe))) - ) -) - -(defn launch-test-server [] - (with-inprocess-zookeeper zk-port - (with-local-tmp [nimbus-dir] - (let [conf (merge (read-storm-config) - {STORM-ZOOKEEPER-SERVERS ["localhost"] - STORM-ZOOKEEPER-PORT zk-port - NIMBUS-HOST "localhost" - NIMBUS-THRIFT-PORT server-port - STORM-LOCAL-DIR nimbus-dir}) - nimbus (nimbus/standalone-nimbus) - service-handler (nimbus/service-handler conf nimbus) - server (ThriftServer. (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))] - (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server)))) - (log-message "Starting Nimbus server...") - (.serve server))))) - -(defn launch-server-w-wait [] - (future (launch-test-server)) - (log-message "Waiting for Nimbus Server...") - (Thread/sleep 10000)) - -(deftest digest-auth-test - (System/setProperty "java.security.auth.login.config" "./conf/jaas_digest.conf") - (launch-server-w-wait) - (log-message "Starting Nimbus client w/ connection to localhost:" server-port) - (let [client (NimbusClient. "localhost" server-port) - nimbus_client (.getClient client)] - (is (thrown? backtype.storm.generated.NotAliveException (.activate nimbus_client "bogus_topology"))) - (.close client)))