Browse files

revison per Nathan's comments on ReqContext/testing, and use wait-for…

…-condition
  • Loading branch information...
1 parent 9d2356f commit 9def4042b3ce0fdca3d2d410384ff8dedc4d00a4 afeng committed Mar 8, 2013
View
31 src/jvm/backtype/storm/security/auth/AuthUtils.java
@@ -4,43 +4,54 @@
import javax.security.auth.login.AppConfigurationEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.util.Map;
public class AuthUtils {
public static final String LOGIN_CONTEXT_SERVER = "StormServer";
public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
public static final String SERVICE = "storm_thrift_server";
- private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
/**
* Construct a JAAS configuration object per storm configuration file
* @param storm_conf Storm configuration
* @return
*/
public static synchronized Configuration GetConfiguration(Map storm_conf) {
- Configuration.setConfiguration(null);
-
- //exam system property first
+ //retrieve system property
String orig_loginConfigurationFile = System.getProperty("java.security.auth.login.config");
//try to find login file from Storm configuration
String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
- if (loginConfigurationFile==null)
+ if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0))
loginConfigurationFile = orig_loginConfigurationFile;
Configuration login_conf = null;
if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
+ //We don't allow system property and storm conf have conflicts
+ if (orig_loginConfigurationFile!=null &&
+ orig_loginConfigurationFile.length()>0 &&
+ !loginConfigurationFile.equals(orig_loginConfigurationFile)) {
+ throw new RuntimeException("System property java.security.auth.login.config ("
+ + orig_loginConfigurationFile
+ +") != storm configuration java.security.auth.login.config ("
+ + loginConfigurationFile + ")");
+ }
+
+ //reset login configuration so that javax.security.auth.login will not use cache
+ Configuration.setConfiguration(null);
+
+ //use javax.security.auth.login.Configuration to obtain login configuration object
+ //login.Configuration depends on system property "java.security.auth.login.config"
+ //(see http://docs.oracle.com/javase/6/docs/jre/api/security/jaas/spec/com/sun/security/auth/login/ConfigFile.html)
System.setProperty("java.security.auth.login.config", loginConfigurationFile);
login_conf = Configuration.getConfiguration();
+ //we reset system property to previous value if any
if (orig_loginConfigurationFile!=null)
System.setProperty("java.security.auth.login.config", orig_loginConfigurationFile);
+ else
+ System.setProperty("java.security.auth.login.config", "");
}
return login_conf;
}
View
9 src/jvm/backtype/storm/security/auth/IAuthorizer.java
@@ -21,11 +21,10 @@
/**
* permit() method is invoked for each incoming Thrift request.
- * @param contrext request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * @param context request context includes info about
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
- public boolean permit(ReqContext context);
+ public boolean permit(ReqContext context, String operation, Map topology_conf);
}
View
31 src/jvm/backtype/storm/security/auth/ReqContext.java
@@ -12,18 +12,16 @@
/**
* context request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * (1) remote address,
+ * (2) remote subject and primary principal
+ * (3) request ID
*/
public class ReqContext {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
- public enum OperationType { SUBMIT_TOPOLOGY, KILL_TOPOLOGY, REBALANCE_TOPOLOGY, ACTIVATE_TOPOLOGY, DEACTIVATE_TOPOLOGY };
private Subject _subject;
private InetAddress _remoteAddr;
private Integer _reqID;
private Map _storm_conf;
- private OperationType _operation;
/**
* Get a request context associated with current thread
@@ -83,26 +81,11 @@ public Principal principal() {
if (princs.size()==0) return null;
return (Principal) (princs.toArray()[0]);
}
-
+
/**
- * Topology that this request is against
+ * request ID of this request
*/
- public Map topologyConf() {
- return _storm_conf;
- }
-
- public void setTopologyConf(Map conf) {
- _storm_conf = conf;
- }
-
- /**
- * Operation that this request is performing
- */
- public OperationType operation() {
- return _operation;
- }
-
- public void setOperation(OperationType operation) {
- _operation = operation;
+ public Integer requestID() {
+ return _reqID;
}
}
View
4 src/jvm/backtype/storm/security/auth/SaslTransportPlugin.java
@@ -99,13 +99,13 @@ public boolean process(final TProtocol inProt, final TProtocol outProt) throws T
Subject remoteUser = new Subject();
remoteUser.getPrincipals().add(new User(authId));
req_context.setSubject(remoteUser);
-
+
//invoke service handler
return wrapped.process(inProt, outProt);
}
}
- static class User implements Principal {
+ public static class User implements Principal {
private final String name;
public User(String name) {
View
11 src/jvm/backtype/storm/security/auth/ThriftServer.java
@@ -11,7 +11,7 @@
public class ThriftServer {
private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
private Map _storm_conf; //storm configuration
- private TProcessor _processor = null;
+ protected TProcessor _processor = null;
private int _port = 0;
private TServer _server = null;
private Configuration _login_conf;
@@ -34,6 +34,15 @@ public void stop() {
_server.stop();
}
+ /**
+ * Is ThriftServer listening to requests?
+ * @return
+ */
+ public boolean isServing() {
+ if (_server == null) return false;
+ return _server.isServing();
+ }
+
public void serve() {
try {
//locate our thrift transport plugin
View
17 src/jvm/backtype/storm/security/auth/authorizer/DenyAuthorizer.java
@@ -17,25 +17,24 @@
/**
* Invoked once immediately after construction
- * @param conf Stom configuration
+ * @param conf Storm configuration
*/
public void prepare(Map conf) {
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param contrext request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * @param contrext request context
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
- public boolean permit(ReqContext context) {
- LOG.info("Access "
+ public boolean permit(ReqContext context, String operation, Map topology_conf) {
+ LOG.info("[req "+ context.requestID()+ "] Access "
+ " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ " principal:"+ (context.principal() == null? "null" : context.principal())
- +" op:"+context.operation()
- + " topoology:"+context.topologyConf().get(Config.TOPOLOGY_NAME));
+ +" op:"+operation
+ + " topoology:"+topology_conf.get(Config.TOPOLOGY_NAME));
return false;
}
}
View
17 src/jvm/backtype/storm/security/auth/authorizer/NoopAuthorizer.java
@@ -17,25 +17,24 @@
/**
* Invoked once immediately after construction
- * @param conf Stom configuration
+ * @param conf Storm configuration
*/
public void prepare(Map conf) {
}
/**
* permit() method is invoked for each incoming Thrift request
- * @param contrext request context includes info about
- * (1) remote address/subject,
- * (2) operation
- * (3) configuration of targeted topology
+ * @param context request context includes info about
+ * @param operation operation name
+ * @param topology_storm configuration of targeted topology
* @return true if the request is authorized, false if reject
*/
- public boolean permit(ReqContext context) {
- LOG.info("Access "
+ public boolean permit(ReqContext context, String operation, Map topology_conf) {
+ LOG.info("[req "+ context.requestID()+ "] Access "
+ " from: " + (context.remoteAddress() == null? "null" : context.remoteAddress().toString())
+ " principal:"+(context.principal() == null? "null" : context.principal())
- +" op:"+context.operation()
- + " topoology:"+ context.topologyConf().get(Config.TOPOLOGY_NAME));
+ +" op:"+ operation
+ + " topoology:"+ topology_conf.get(Config.TOPOLOGY_NAME));
return true;
}
}
View
108 test/clj/backtype/storm/security/auth/auth_test.clj
@@ -7,7 +7,7 @@
(:import [backtype.storm Config])
(:import [backtype.storm.utils NimbusClient])
(:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftClient
- ReqContext ReqContext$OperationType])
+ ReqContext])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap testing])
@@ -46,44 +46,38 @@
:scheduler nil
}))
-(defn update-req-context! [nimbus storm-name storm-conf operation]
- (let [req (ReqContext/context)]
- (.setOperation req operation)
- (if storm-conf (.setTopologyConf req storm-conf)
- (let [topologyConf { TOPOLOGY-NAME storm-name} ]
- (.setTopologyConf req topologyConf)))
- req))
-
(defn check-authorization! [nimbus storm-name storm-conf operation]
(let [aclHandler (:authorization-handler nimbus)]
(log-debug "check-authorization with handler: " aclHandler)
(if aclHandler
- (let [req (update-req-context! nimbus storm-name storm-conf operation)]
- (if-not (.permit aclHandler req)
+ (if-not (.permit aclHandler
+ (ReqContext/context)
+ operation
+ (if storm-conf storm-conf {TOPOLOGY-NAME storm-name}))
(throw (RuntimeException. (str operation " on topology " storm-name " is not authorized")))
- )))))
+ ))))
(defn dummy-service-handler [conf inimbus]
(let [nimbus (nimbus-data conf inimbus)]
(reify Nimbus$Iface
(^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
^SubmitOptions submitOptions]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/SUBMIT_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil "submitTopology"))
(^void killTopology [this ^String storm-name]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil "killTopology"))
(^void killTopologyWithOpts [this ^String storm-name ^KillOptions options]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/KILL_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil "killTopology"))
(^void rebalance [this ^String storm-name ^RebalanceOptions options]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/REBALANCE_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil "rebalance"))
(activate [this storm-name]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/ACTIVATE_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil "activate"))
(deactivate [this storm-name]
- (check-authorization! nimbus storm-name nil (ReqContext$OperationType/DEACTIVATE_TOPOLOGY)))
+ (check-authorization! nimbus storm-name nil "deactivate"))
(beginFileUpload [this])
@@ -107,7 +101,7 @@
(^TopologyInfo getTopologyInfo [this ^String storm-id]))))
-(defn launch-test-server [server-port login-cfg aznClass transportPluginClass]
+(defn launch-server [server-port login-cfg aznClass transportPluginClass]
(let [conf1 (merge (read-storm-config)
{NIMBUS-AUTHORIZER aznClass
NIMBUS-HOST "localhost"
@@ -118,46 +112,42 @@
service-handler (dummy-service-handler conf nimbus)
server (ThriftServer. conf (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.stop server))))
- (.serve server)))
-
-(defn launch-server-w-wait [server-port ms login-cfg aznClass transportPluginClass]
- (.start (Thread. #(launch-test-server server-port login-cfg aznClass transportPluginClass)))
- (Thread/sleep ms))
+ (.start (Thread. #(.serve server)))
+ (wait-for-condition #(.isServing server))))
(deftest Simple-authentication-test
- (launch-server-w-wait 6627 1000 nil nil "backtype.storm.security.auth.SimpleTransportPlugin")
+ (launch-server 6627 nil nil "backtype.storm.security.auth.SimpleTransportPlugin")
- (log-message "(Positive authentication) Server and Client with simple transport, no authentication")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
nimbus_client (.getClient client)]
(.activate nimbus_client "security_auth_test_topology")
(.close client))
- (log-message "(Negative authentication) Server: Simple vs. Client: Digest")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})]
- (is (= "java.net.SocketTimeoutException: Read timed out"
- (try (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
- nil
- (catch TTransportException ex (.getMessage ex)))))))
-
+ (testing "(Negative authentication) Server: Simple vs. Client: Digest"
+ (is (= "java.net.SocketTimeoutException: Read timed out"
+ (try (NimbusClient. storm-conf "localhost" 6627 nimbus-timeout)
+ nil
+ (catch TTransportException ex (.getMessage ex))))))))
+
(deftest positive-authorization-test
- (launch-server-w-wait 6628 1000 nil
+ (launch-server 6628 nil
"backtype.storm.security.auth.authorizer.NoopAuthorizer"
"backtype.storm.security.auth.SimpleTransportPlugin")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6628 nimbus-timeout)
nimbus_client (.getClient client)]
- (log-message "(Positive authorization) Authorization plugin should accept client request")
- (.activate nimbus_client "security_auth_test_topology")
+ (testing "(Positive authorization) Authorization plugin should accept client request"
+ (.activate nimbus_client "security_auth_test_topology"))
(.close client)))
(deftest deny-authorization-test
- (launch-server-w-wait 6629 1000 nil
+ (launch-server 6629 nil
"backtype.storm.security.auth.authorizer.DenyAuthorizer"
"backtype.storm.security.auth.SimpleTransportPlugin")
(let [storm-conf (merge (read-storm-config)
@@ -167,17 +157,17 @@
Config/NIMBUS_TASK_TIMEOUT_SECS nimbus-timeout})
client (NimbusClient/getConfiguredClient storm-conf)
nimbus_client (.getClient client)]
- (log-message "(Negative authorization) Authorization plugin should reject client request")
- (is (thrown? TTransportException
- (.activate nimbus_client "security_auth_test_topology")))
- (.close client)))
+ (testing "(Negative authorization) Authorization plugin should reject client request"
+ (is (thrown? TTransportException
+ (.activate nimbus_client "security_auth_test_topology"))))
+ (.close client)))
(deftest digest-authentication-test
- (launch-server-w-wait 6630 2000
+ (launch-server 6630
"test/clj/backtype/storm/security/auth/jaas_digest.conf"
nil
"backtype.storm.security.auth.digest.DigestSaslTransportPlugin")
- (log-message "(Positive authentication) valid digest authentication")
+ ;(log-message "(Positive authentication) valid digest authentication")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest.conf"})
@@ -186,40 +176,40 @@
(.activate nimbus_client "security_auth_test_topology")
(.close client))
- (log-message "(Negative authentication) Server: Digest vs. Client: Simple")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.SimpleTransportPlugin"})
client (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
nimbus_client (.getClient client)]
- (is (thrown? TTransportException
- (.activate nimbus_client "security_auth_test_topology")))
+ (testing "(Negative authentication) Server: Digest vs. Client: Simple"
+ (is (thrown? TTransportException
+ (.activate nimbus_client "security_auth_test_topology"))))
(.close client))
- (log-message "(Negative authentication) Invalid password")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_bad_password.conf"})]
- (is (= "Peer indicated failure: DIGEST-MD5: digest response format violation. Mismatched response."
- (try (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
- nil
- (catch TTransportException ex (.getMessage ex))))))
+ (testing "(Negative authentication) Invalid password"
+ (is (= "Peer indicated failure: DIGEST-MD5: digest response format violation. Mismatched response."
+ (try (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
+ nil
+ (catch TTransportException ex (.getMessage ex)))))))
- (log-message "(Negative authentication) Unknown user")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_unknown_user.conf"})]
- (is (= "Peer indicated failure: DIGEST-MD5: cannot acquire password for unknown_user in realm : localhost"
- (try (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
- nil
- (catch TTransportException ex (.getMessage ex)))))))
+ (testing "(Negative authentication) Unknown user"
+ (is (= "Peer indicated failure: DIGEST-MD5: cannot acquire password for unknown_user in realm : localhost"
+ (try (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)
+ nil
+ (catch TTransportException ex (.getMessage ex))))))))
- (log-message "(Negative authentication) IOException")
(let [storm-conf (merge (read-storm-config)
{STORM-THRIFT-TRANSPORT-PLUGIN "backtype.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/backtype/storm/security/auth/jaas_digest_missing_client.conf"})]
- (is (thrown? RuntimeException
- (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout))))
-
+ (testing "(Negative authentication) IOException"
+ (is (thrown? RuntimeException
+ (NimbusClient. storm-conf "localhost" 6630 nimbus-timeout)))))
+
(deftest test-GetTransportPlugin-throws-RuntimeException
(let [conf (merge (read-storm-config)
{Config/STORM_THRIFT_TRANSPORT_PLUGIN "null.invalid"})]

0 comments on commit 9def404

Please sign in to comment.