Skip to content

Commit

Permalink
digest auth test case
Browse files Browse the repository at this point in the history
  • Loading branch information
afeng committed Feb 15, 2013
1 parent 1e17e82 commit 7def91d
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 156 deletions.
248 changes: 124 additions & 124 deletions src/jvm/backtype/storm/security/auth/ThriftClient.java
Expand Up @@ -27,134 +27,134 @@


public class ThriftClient {
private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
private TTransport _transport;
protected TProtocol _protocol;

static {
java.security.Security.addProvider(new AnonymousAuthenticationProvider());
}

public ThriftClient(String host, int port, String default_service_name) {
this(host, port, default_service_name, null);
private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
private TTransport _transport;
protected TProtocol _protocol;

static {
java.security.Security.addProvider(new AnonymousAuthenticationProvider());
}

public ThriftClient(String host, int port, String default_service_name) {
this(host, port, default_service_name, null);
}

public ThriftClient(String host, int port, String default_service_name, Integer timeout) {
try {
if(host==null) {
throw new IllegalArgumentException("host is not set");
}
if(port<=0) {
throw new IllegalArgumentException("invalid port: "+port);
}

TSocket socket = new TSocket(host, port);
if(timeout!=null) {
socket.setTimeout(timeout);
}
final TTransport underlyingTransport = socket;

String loginConfigurationFile = System.getProperty("java.security.auth.login.config");
if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) {
//apply Storm configuration for JAAS login
Map conf = Utils.readStormConfig();
loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
}
if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) { //ANONYMOUS
LOG.info("SASL ANONYMOUS client transport is being established");
_transport = new TSaslClientTransport(AuthUtils.ANONYMOUS,
null,
AuthUtils.SERVICE,
host,
null,
null,
underlyingTransport);
_transport.open();
} else {
LOG.debug("Use jaas login config:"+loginConfigurationFile);
System.setProperty("java.security.auth.login.config", loginConfigurationFile);
Configuration auth_conf = Configuration.getConfiguration();

//login our user
SaslClientCallbackHandler callback_handler = new SaslClientCallbackHandler(auth_conf);
Login login = new Login(AuthUtils.LoginContextClient, callback_handler);

final Subject subject = login.getSubject();
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //DIGEST-MD5
LOG.debug("SASL DIGEST-MD5 client transport is being established");
_transport = new TSaslClientTransport(AuthUtils.DIGEST,
null,
AuthUtils.SERVICE,
host,
null,
callback_handler,
underlyingTransport);
_transport.open();
} else { //GSSAPI
final String principal = getPrincipal(subject);
String serviceName = AuthUtils.get(auth_conf, AuthUtils.LoginContextClient, "serviceName");
if (serviceName == null) {
serviceName = default_service_name;
}
Map<String, String> props = new TreeMap<String,String>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
LOG.debug("SASL GSSAPI client transport is being established");
_transport = new TSaslClientTransport(AuthUtils.KERBEROS,
principal,
serviceName,
host,
props,
null,
underlyingTransport);

//open Sasl transport with the login credential
try {
Subject.doAs(subject,
new PrivilegedExceptionAction<Void>() {
public Void run() {
try {
LOG.debug("do as:"+ principal);
_transport.open();
}
catch (Exception e) {
LOG.error("Nimbus client failed to open SaslClientTransport to interact with a server during session initiation: " + e);
e.printStackTrace();
}
return null;
}
});
} catch (PrivilegedActionException e) {
LOG.error("Nimbus client experienced a PrivilegedActionException exception while creating a TSaslClientTransport using a JAAS principal context:" + e);
e.printStackTrace();
}
}

}
} catch (Exception e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);
}

public ThriftClient(String host, int port, String default_service_name, Integer timeout) {
try {
if(host==null) {
throw new IllegalArgumentException("host is not set");
}
if(port<=0) {
throw new IllegalArgumentException("invalid port: "+port);
}

TSocket socket = new TSocket(host, port);
if(timeout!=null) {
socket.setTimeout(timeout);
}
final TTransport underlyingTransport = socket;

String loginConfigurationFile = System.getProperty("java.security.auth.login.config");
if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) {
//apply Storm configuration for JAAS login
Map conf = Utils.readStormConfig();
loginConfigurationFile = (String)conf.get("java.security.auth.login.config");
}
if ((loginConfigurationFile==null) || (loginConfigurationFile.length()==0)) { //ANONYMOUS
LOG.debug("SASL ANONYMOUS client transport is being established");
_transport = new TSaslClientTransport(AuthUtils.ANONYMOUS,
null,
AuthUtils.SERVICE,
host,
null,
null,
underlyingTransport);
_transport.open();
} else {
LOG.debug("Use jaas login config:"+loginConfigurationFile);
System.setProperty("java.security.auth.login.config", loginConfigurationFile);
Configuration auth_conf = Configuration.getConfiguration();

//login our user
SaslClientCallbackHandler callback_handler = new SaslClientCallbackHandler(auth_conf);
Login login = new Login(AuthUtils.LoginContextClient, callback_handler);

final Subject subject = login.getSubject();
if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //DIGEST-MD5
LOG.debug("SASL DIGEST-MD5 client transport is being established");
_transport = new TSaslClientTransport(AuthUtils.DIGEST,
null,
AuthUtils.SERVICE,
host,
null,
callback_handler,
underlyingTransport);
_transport.open();
} else { //GSSAPI
final String principal = getPrincipal(subject);
String serviceName = AuthUtils.get(auth_conf, AuthUtils.LoginContextClient, "serviceName");
if (serviceName == null) {
serviceName = default_service_name;
}
Map<String, String> props = new TreeMap<String,String>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
LOG.debug("SASL GSSAPI client transport is being established");
_transport = new TSaslClientTransport(AuthUtils.KERBEROS,
principal,
serviceName,
host,
props,
null,
underlyingTransport);

//open Sasl transport with the login credential
try {
Subject.doAs(subject,
new PrivilegedExceptionAction<Void>() {
public Void run() {
try {
LOG.debug("do as:"+ principal);
_transport.open();
}
catch (Exception e) {
LOG.error("Nimbus client failed to open SaslClientTransport to interact with a server during session initiation: " + e);
e.printStackTrace();
}
return null;
}
});
} catch (PrivilegedActionException e) {
LOG.error("Nimbus client experienced a PrivilegedActionException exception while creating a TSaslClientTransport using a JAAS principal context:" + e);
e.printStackTrace();
}
}

}
} catch (Exception e) {
LOG.error(e.getMessage());
throw new RuntimeException(e);
}

_protocol = null;
if (_transport != null)
_protocol = new TBinaryProtocol(_transport);
}
_protocol = null;
if (_transport != null)
_protocol = new TBinaryProtocol(_transport);
}

private String getPrincipal(Subject subject) {
Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
if (principals==null || principals.size()<1) {
LOG.info("No principal found in login subject");
return null;
}
return ((Principal)(principals.toArray()[0])).getName();
private String getPrincipal(Subject subject) {
Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
if (principals==null || principals.size()<1) {
LOG.info("No principal found in login subject");
return null;
}
return ((Principal)(principals.toArray()[0])).getName();
}

public TTransport transport() {
return _transport;
}
public TTransport transport() {
return _transport;
}

public void close() {
_transport.close();
}
public void close() {
_transport.close();
}
}
47 changes: 15 additions & 32 deletions src/jvm/backtype/storm/utils/NimbusClient.java
@@ -1,47 +1,30 @@
package backtype.storm.utils;

import backtype.storm.Config;
import backtype.storm.security.auth.ThriftClient;
import backtype.storm.generated.Nimbus;
import java.util.Map;
import org.apache.thrift7.TException;
import org.apache.thrift7.protocol.TBinaryProtocol;
import org.apache.thrift7.transport.TFramedTransport;
import org.apache.thrift7.transport.TSocket;
import org.apache.thrift7.transport.TTransport;

import java.util.Map;

public class NimbusClient {
public class NimbusClient extends ThriftClient {
private Nimbus.Client _client;

public static NimbusClient getConfiguredClient(Map conf) {
String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
return new NimbusClient(nimbusHost, nimbusPort);
String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
return new NimbusClient(nimbusHost, nimbusPort);
}

private TTransport conn;
private Nimbus.Client client;


public NimbusClient(String host) {
this(host, 6627);
this(host, 6627);
}

public NimbusClient(String host, int port) {
try {
if(host==null) {
throw new IllegalArgumentException("Nimbus host is not set");
}
conn = new TFramedTransport(new TSocket(host, port));
client = new Nimbus.Client(new TBinaryProtocol(conn));
conn.open();
} catch(TException e) {
throw new RuntimeException(e);
}
super(host, port, "nimbus_server");
_client = new Nimbus.Client(_protocol);
}

public Nimbus.Client getClient() {
return client;
}

public void close() {
conn.close();
return _client;
}
}
56 changes: 56 additions & 0 deletions test/clj/backtype/storm/security/auth/digest_auth_test.clj
@@ -0,0 +1,56 @@
(ns backtype.storm.security.auth.digest-auth-test
(:use [clojure test])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:import [org.apache.thrift7 TException])
(:import [java.nio.channels Channels WritableByteChannel])
(:import [backtype.storm.utils NimbusClient])
(:import [backtype.storm.security.auth ThriftServer ThriftClient ReqContext ReqContext$OperationType])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap testing])
(:import [backtype.storm.generated Nimbus Nimbus$Client])
)

(bootstrap)

(def server-port 6627)

; Exceptions are getting wrapped in RuntimeException. This might be due to
; CLJ-855.
(defn- unpack-runtime-exception [expression]
(try (eval expression)
nil
(catch java.lang.RuntimeException gripe
(throw (.getCause gripe)))
)
)

(defn launch-test-server []
(with-inprocess-zookeeper zk-port
(with-local-tmp [nimbus-dir]
(let [conf (merge (read-storm-config)
{STORM-ZOOKEEPER-SERVERS ["localhost"]
STORM-ZOOKEEPER-PORT zk-port
NIMBUS-HOST "localhost"
NIMBUS-THRIFT-PORT server-port
STORM-LOCAL-DIR nimbus-dir})
nimbus (nimbus/standalone-nimbus)
service-handler (nimbus/service-handler conf nimbus)
server (ThriftServer. (Nimbus$Processor. service-handler) (int (conf NIMBUS-THRIFT-PORT)))]
(.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
(log-message "Starting Nimbus server...")
(.serve server)))))

(defn launch-server-w-wait []
(future (launch-test-server))
(log-message "Waiting for Nimbus Server...")
(Thread/sleep 10000))

(deftest digest-auth-test
(System/setProperty "java.security.auth.login.config" "./conf/jaas_digest.conf")
(launch-server-w-wait)
(log-message "Starting Nimbus client w/ connection to localhost:" server-port)
(let [client (NimbusClient. "localhost" server-port)
nimbus_client (.getClient client)]
(is (thrown? backtype.storm.generated.NotAliveException (.activate nimbus_client "bogus_topology")))
(.close client)))

0 comments on commit 7def91d

Please sign in to comment.