Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spnego and trusted proxy authentication #1159

Merged
merged 5 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ project(':cruise-control') {
testCompile "org.apache.kafka:kafka-clients:$kafkaVersion:test"
testCompile 'commons-io:commons-io:2.6'
testCompile 'org.bouncycastle:bcpkix-jdk15on:1.64'
testCompile 'org.apache.kerby:kerb-simplekdc:2.0.0'

}

publishing {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,41 @@ private WebServerConfig() {
"Audience is a way for the issuer to indicate what entities the token is intended for. The default value is null, " +
"which means all audiences are accepted.";

/**
* <code>spnego.keytab.file</code>
*/
public static final String SPNEGO_KEYTAB_FILE_CONFIG = "spnego.keytab.file";
public static final String DEFAULT_SPNEGO_KEYTAB_FILE = null;
private static final String SPNEGO_KEYTAB_FILE_DOC = "Specifies the path to the keytab which contains the spnego " +
"principal that is used for SPNEGO based authentication methods.";

/**
* <code>spnego.principal</code>
*/
public static final String SPNEGO_PRINCIPAL_CONFIG = "spnego.principal";
public static final String DEFAULT_SPNEGO_PRINCIPAL = null;
private static final String SPNEGO_PRINCIPAL_DOC = "Specifies the spnego service principal that is used by Cruise Control " +
"to authenticate clients. This principal is stored in spnego.keytab.file. This must be a fully qualified principal " +
"in the service/host@REALM format (service is usually HTTP).";

/**
* <code>trusted.proxy.services</code>
*/
public static final String TRUSTED_PROXY_SERVICES_CONFIG = "trusted.proxy.services";
public static final String DEFAULT_TRUSTED_PROXY_SERVICES = null;
private static final String TRUSTED_PROXY_SERVICES_DOC = "A list of trusted proxies who can delegate user commands " +
"with the doAs query parameter.";

/**
* <code>trusted.proxy.services.ip.regex</code>
*/
public static final String TRUSTED_PROXY_SERVICES_IP_REGEX_CONFIG = "trusted.proxy.services.ip.regex";
public static final String DEFAULT_TRUSTED_PROXY_SERVICES_IP_REGEX = null;
private static final String TRUSTED_PROXY_SERVICES_IP_REGEX_DOC = "A Java regular expression that defines the whitelist of " +
"IP addresses of the trusted proxy services. If a request arrives from these addresses authenticated as one of the specified " +
"trusted.proxy.services then the operation will be delegated as the user in the doAs parameter. This is an optional " +
"parameter. Not specifying this means that the IP of the trusted proxy won't be validated.";

/**
* Define configs for Web Server.
*
Expand Down Expand Up @@ -435,6 +470,26 @@ public static ConfigDef define(ConfigDef configDef) {
ConfigDef.Type.LIST,
DEFAULT_JWT_EXPECTED_AUDIENCES,
ConfigDef.Importance.MEDIUM,
JWT_EXPECTED_AUDIENCES_DOC);
JWT_EXPECTED_AUDIENCES_DOC)
.define(SPNEGO_KEYTAB_FILE_CONFIG,
ConfigDef.Type.STRING,
DEFAULT_SPNEGO_KEYTAB_FILE,
ConfigDef.Importance.MEDIUM,
SPNEGO_KEYTAB_FILE_DOC)
.define(SPNEGO_PRINCIPAL_CONFIG,
ConfigDef.Type.STRING,
DEFAULT_SPNEGO_PRINCIPAL,
ConfigDef.Importance.MEDIUM,
SPNEGO_PRINCIPAL_DOC)
.define(TRUSTED_PROXY_SERVICES_CONFIG,
ConfigDef.Type.LIST,
DEFAULT_TRUSTED_PROXY_SERVICES,
ConfigDef.Importance.MEDIUM,
TRUSTED_PROXY_SERVICES_DOC)
.define(TRUSTED_PROXY_SERVICES_IP_REGEX_CONFIG,
ConfigDef.Type.STRING,
DEFAULT_TRUSTED_PROXY_SERVICES_IP_REGEX,
ConfigDef.Importance.MEDIUM,
TRUSTED_PROXY_SERVICES_IP_REGEX_DOC);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG;
import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.KAFKA_CRUISE_CONTROL_HTTP_SERVLET_REQUEST_OBJECT_CONFIG;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.DO_AS;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.handleParameterParseException;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.JSON_PARAM;
import static com.linkedin.kafka.cruisecontrol.servlet.parameters.ParameterUtils.GET_RESPONSE_SCHEMA;
Expand All @@ -36,6 +37,7 @@ public abstract class AbstractParameters implements CruiseControlParameters {
SortedSet<String> validParameterNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
validParameterNames.add(JSON_PARAM);
validParameterNames.add(GET_RESPONSE_SCHEMA);
validParameterNames.add(DO_AS);
CASE_INSENSITIVE_PARAMETER_NAMES = Collections.unmodifiableSortedSet(validParameterNames);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import com.google.gson.Gson;
import com.linkedin.cruisecontrol.detector.AnomalyType;
import com.linkedin.cruisecontrol.servlet.EndPoint;
import com.linkedin.cruisecontrol.servlet.parameters.CruiseControlParameters;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskUsageDistributionGoal;
import com.linkedin.cruisecontrol.servlet.EndPoint;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.detector.notifier.KafkaAnomalyType;
Expand All @@ -20,8 +22,9 @@
import com.linkedin.kafka.cruisecontrol.servlet.UserTaskManager;
import com.linkedin.kafka.cruisecontrol.servlet.purgatory.ReviewStatus;
import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerEvenRackAwareGoal;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
Expand All @@ -40,12 +43,13 @@
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.currentUtcDate;
import static com.linkedin.kafka.cruisecontrol.executor.Executor.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS;
import static com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint.*;
import static com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint.ADD_BROKER;
import static com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint.DEMOTE_BROKER;
import static com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint.FIX_OFFLINE_REPLICAS;
import static com.linkedin.kafka.cruisecontrol.servlet.CruiseControlEndPoint.REVIEW;
import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.GET_METHOD;
import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.POST_METHOD;
import static com.linkedin.kafka.cruisecontrol.servlet.KafkaCruiseControlServletUtils.REQUEST_URI;
Expand Down Expand Up @@ -127,6 +131,7 @@ public class ParameterUtils {
public static final long DEFAULT_START_TIME_FOR_CLUSTER_MODEL = -1L;
public static final String TOPIC_BY_REPLICATION_FACTOR = "topic_by_replication_factor";
public static final String NO_REASON_PROVIDED = "No reason provided";
public static final String DO_AS = "doAs";

public static final String STOP_PROPOSAL_PARAMETER_OBJECT_CONFIG = "stop.proposal.parameter.object";
public static final String BOOTSTRAP_PARAMETER_OBJECT_CONFIG = "bootstrap.parameter.object";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public UserStoreAuthorizationService(UserStore userStore) {

@Override
public UserIdentity getUserIdentity(HttpServletRequest request, String name) {
return _userStore.getUserIdentity(name);
int hostSeparator = name.indexOf('/');
String shortName = hostSeparator > 0 ? name.substring(0, hostSeparator) : name;
int realmSeparator = shortName.indexOf('@');
shortName = realmSeparator > 0 ? shortName.substring(0, realmSeparator) : shortName;
return _userStore.getUserIdentity(shortName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.security.spnego;

import com.linkedin.kafka.cruisecontrol.servlet.security.UserStoreAuthorizationService;
import org.eclipse.jetty.security.ConfigurableSpnegoLoginService;
import org.eclipse.jetty.security.PropertyUserStore;
import org.eclipse.jetty.security.authentication.AuthorizationService;
import org.eclipse.jetty.util.component.LifeCycle;

/**
* This class is purely needed in order to manage the {@link AuthorizationService} if it is a {@link LifeCycle} bean.
* For instance if the AuthorizationService holds a {@link org.eclipse.jetty.security.PropertyUserStore} then it would load
* users from the store during the {@link PropertyUserStore#start()} method.
*
* @see UserStoreAuthorizationService
*/
public class SpnegoLoginServiceWithAuthServiceLifecycle extends ConfigurableSpnegoLoginService {

private final AuthorizationService _authorizationService;

public SpnegoLoginServiceWithAuthServiceLifecycle(String realm, AuthorizationService authorizationService) {
super(realm, authorizationService);
_authorizationService = authorizationService;
}

@Override
protected void doStart() throws Exception {
if (_authorizationService instanceof LifeCycle) {
((LifeCycle) _authorizationService).start();
}
super.doStart();
}

@Override
protected void doStop() throws Exception {
super.doStop();
if (_authorizationService instanceof LifeCycle) {
((LifeCycle) _authorizationService).stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.security.spnego;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.constants.WebServerConfig;
import com.linkedin.kafka.cruisecontrol.servlet.security.DefaultRoleSecurityProvider;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.eclipse.jetty.security.Authenticator;
import org.eclipse.jetty.security.ConfigurableSpnegoLoginService;
import org.eclipse.jetty.security.LoginService;
import org.eclipse.jetty.security.authentication.AuthorizationService;
import org.eclipse.jetty.security.authentication.ConfigurableSpnegoAuthenticator;

import java.nio.file.Paths;

/**
* Defines an SPNEGO capable login service using the HTTP Negotiate authentication mechanism.
*/
public class SpnegoSecurityProvider extends DefaultRoleSecurityProvider {

protected String _privilegesFilePath;
protected String _keyTabPath;
protected KerberosName _spnegoPrincipal;

@Override
public void init(KafkaCruiseControlConfig config) {
super.init(config);
_privilegesFilePath = config.getString(WebServerConfig.WEBSERVER_AUTH_CREDENTIALS_FILE_CONFIG);
_keyTabPath = config.getString(WebServerConfig.SPNEGO_KEYTAB_FILE_CONFIG);
_spnegoPrincipal = KerberosName.parse(config.getString(WebServerConfig.SPNEGO_PRINCIPAL_CONFIG));
}

@Override
public LoginService loginService() {
ConfigurableSpnegoLoginService loginService = new SpnegoLoginServiceWithAuthServiceLifecycle(_spnegoPrincipal.realm(), authorizationService());
loginService.setServiceName(_spnegoPrincipal.serviceName());
loginService.setHostName(_spnegoPrincipal.hostName());
loginService.setKeyTabPath(Paths.get(_keyTabPath));
return loginService;
}

@Override
public Authenticator authenticator() {
return new ConfigurableSpnegoAuthenticator();
}

public AuthorizationService authorizationService() {
return new SpnegoUserStoreAuthorizationService(_privilegesFilePath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.security.spnego;

import com.linkedin.kafka.cruisecontrol.servlet.security.UserStoreAuthorizationService;
import org.eclipse.jetty.security.UserStore;
import org.eclipse.jetty.server.UserIdentity;

import javax.servlet.http.HttpServletRequest;

public class SpnegoUserStoreAuthorizationService extends UserStoreAuthorizationService {

public SpnegoUserStoreAuthorizationService(String privilegesFilePath) {
super(privilegesFilePath);
}

public SpnegoUserStoreAuthorizationService(UserStore userStore) {
super(userStore);
}

@Override
public UserIdentity getUserIdentity(HttpServletRequest request, String name) {
viktorsomogyi marked this conversation as resolved.
Show resolved Hide resolved
int hostSeparator = name.indexOf('/');
String shortName = hostSeparator > 0 ? name.substring(0, hostSeparator) : name;
int realmSeparator = shortName.indexOf('@');
shortName = realmSeparator > 0 ? shortName.substring(0, realmSeparator) : shortName;
return super.getUserIdentity(request, shortName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.servlet.security.trustedproxy;

import com.linkedin.kafka.cruisecontrol.servlet.security.DefaultRoleSecurityProvider;
import org.eclipse.jetty.security.UserStore;
import org.eclipse.jetty.security.authentication.AuthorizationService;
import org.eclipse.jetty.server.UserIdentity;
import org.eclipse.jetty.util.component.AbstractLifeCycle;

import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.regex.Pattern;

/**
* This authorization service simply checks the incoming user against a list of configured service names maintained in
* a {@link UserStore} and if specified, the remote IP address against a configured pattern.
*/
public class TrustedProxyAuthorizationService extends AbstractLifeCycle implements AuthorizationService {

private final UserStore _adminUserStore;
private final Pattern _trustedProxyIpPattern;

TrustedProxyAuthorizationService(List<String> userNames, String trustedProxyIpPattern) {
_adminUserStore = new UserStore();
userNames.forEach(u -> _adminUserStore.addUser(u, null, new String[] { DefaultRoleSecurityProvider.ADMIN }));
if (trustedProxyIpPattern != null) {
_trustedProxyIpPattern = Pattern.compile(trustedProxyIpPattern);
} else {
_trustedProxyIpPattern = null;
}
}

@Override
public UserIdentity getUserIdentity(HttpServletRequest request, String name) {
// ConfigurableSpnegoAuthenticator may pass names in servicename/host format but we only store the servicename
int nameHostSeparatorIndex = name.indexOf('/');
String serviceName = nameHostSeparatorIndex > 0 ? name.substring(0, nameHostSeparatorIndex) : name;
UserIdentity serviceIdentity = _adminUserStore.getUserIdentity(serviceName);
if (_trustedProxyIpPattern != null) {
return _trustedProxyIpPattern.matcher(request.getRemoteAddr()).matches() ? serviceIdentity : null;
} else {
return serviceIdentity;
}
}

@Override
protected void doStart() throws Exception {
_adminUserStore.start();
super.doStart();
}

@Override
protected void doStop() throws Exception {
super.doStop();
_adminUserStore.stop();
}
}
Loading