Skip to content

Commit

Permalink
Introduce authorization for enrich in ESQL (elastic#99646)
Browse files Browse the repository at this point in the history
This change introduces a new privilege monitor_enrich. Users are 
required to have this privilege in order to use the enrich functionality
in ESQL. Additionally, it eliminates the need to use the enrich_origin
when executing enrich lookups. The enrich_origin will only be used when
resolving enrich policies to prevent warnings when accessing system
indices directly.

Closes elastic#98482
  • Loading branch information
dnhatn committed Sep 27, 2023
1 parent d7eff41 commit ae17505
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ A successful call returns an object with "cluster" and "index" fields.
"manage_watcher",
"monitor",
"monitor_data_frame_transforms",
"monitor_enrich",
"monitor_ml",
"monitor_rollup",
"monitor_snapshot",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ public class ClusterPrivilegeResolver {
private static final Set<String> MONITOR_TRANSFORM_PATTERN = Set.of("cluster:monitor/data_frame/*", "cluster:monitor/transform/*");
private static final Set<String> MONITOR_WATCHER_PATTERN = Set.of("cluster:monitor/xpack/watcher/*");
private static final Set<String> MONITOR_ROLLUP_PATTERN = Set.of("cluster:monitor/xpack/rollup/*");
private static final Set<String> MONITOR_ENRICH_PATTERN = Set.of("cluster:monitor/xpack/enrich/*", "cluster:admin/xpack/enrich/get");

private static final Set<String> ALL_CLUSTER_PATTERN = Set.of(
"cluster:*",
"indices:admin/template/*",
Expand Down Expand Up @@ -172,7 +174,7 @@ public class ClusterPrivilegeResolver {
XPackInfoAction.NAME,
ClusterStateAction.NAME
);
private static final Set<String> MANAGE_ENRICH_AUTOMATON = Set.of("cluster:admin/xpack/enrich/*");
private static final Set<String> MANAGE_ENRICH_AUTOMATON = Set.of("cluster:admin/xpack/enrich/*", "cluster:monitor/xpack/enrich/*");

public static final NamedClusterPrivilege NONE = new ActionClusterPrivilege("none", Set.of(), Set.of());
public static final NamedClusterPrivilege ALL = new ActionClusterPrivilege("all", ALL_CLUSTER_PATTERN);
Expand All @@ -192,6 +194,7 @@ public class ClusterPrivilegeResolver {
);
public static final NamedClusterPrivilege MONITOR_WATCHER = new ActionClusterPrivilege("monitor_watcher", MONITOR_WATCHER_PATTERN);
public static final NamedClusterPrivilege MONITOR_ROLLUP = new ActionClusterPrivilege("monitor_rollup", MONITOR_ROLLUP_PATTERN);
public static final NamedClusterPrivilege MONITOR_ENRICH = new ActionClusterPrivilege("monitor_enrich", MONITOR_ENRICH_PATTERN);
public static final NamedClusterPrivilege MANAGE = new ActionClusterPrivilege("manage", ALL_CLUSTER_PATTERN, ALL_SECURITY_PATTERN);
public static final NamedClusterPrivilege MANAGE_ML = new ActionClusterPrivilege("manage_ml", MANAGE_ML_PATTERN);
public static final NamedClusterPrivilege MANAGE_TRANSFORM_DEPRECATED = new ActionClusterPrivilege(
Expand Down Expand Up @@ -333,6 +336,7 @@ public class ClusterPrivilegeResolver {
MONITOR_TRANSFORM,
MONITOR_WATCHER,
MONITOR_ROLLUP,
MONITOR_ENRICH,
MANAGE,
MANAGE_ML,
MANAGE_TRANSFORM_DEPRECATED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,20 @@ public void testManageEnrichPrivilege() {
verifyClusterActionAllowed(ClusterPrivilegeResolver.MANAGE_ENRICH, GetEnrichPolicyAction.NAME);
verifyClusterActionAllowed(ClusterPrivilegeResolver.MANAGE_ENRICH, PutEnrichPolicyAction.NAME);
verifyClusterActionAllowed(ClusterPrivilegeResolver.MANAGE_ENRICH, "cluster:admin/xpack/enrich/brand_new_api");
verifyClusterActionAllowed(ClusterPrivilegeResolver.MANAGE_ENRICH, "cluster:monitor/xpack/enrich/esql/resolve_policy");
verifyClusterActionDenied(ClusterPrivilegeResolver.MANAGE_ENRICH, "cluster:admin/xpack/whatever");
}

public void testMonitorEnrichPrivilege() {
verifyClusterActionAllowed(ClusterPrivilegeResolver.MONITOR_ENRICH, "cluster:monitor/xpack/enrich/esql/resolve_policy");
verifyClusterActionAllowed(ClusterPrivilegeResolver.MONITOR_ENRICH, GetEnrichPolicyAction.NAME);
verifyClusterActionAllowed(ClusterPrivilegeResolver.MONITOR_ENRICH, "cluster:monitor/xpack/enrich/brand_new_api");
verifyClusterActionDenied(ClusterPrivilegeResolver.MONITOR_ENRICH, PutEnrichPolicyAction.NAME);
verifyClusterActionDenied(ClusterPrivilegeResolver.MONITOR_ENRICH, ExecuteEnrichPolicyAction.NAME);
verifyClusterActionDenied(ClusterPrivilegeResolver.MONITOR_ENRICH, DeleteEnrichPolicyAction.NAME);
verifyClusterActionDenied(ClusterPrivilegeResolver.MONITOR_ENRICH, "cluster:admin/xpack/whatever");
}

public void testIlmPrivileges() {
{
verifyClusterActionAllowed(
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugin/esql/qa/security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ testClusters.configureEach {
user username: "user1", password: 'x-pack-test-password', role: "user1"
user username: "user2", password: 'x-pack-test-password', role: "user2"
user username: "user3", password: 'x-pack-test-password', role: "user3"
user username: "user4", password: 'x-pack-test-password', role: "user4"
user username: "user5", password: 'x-pack-test-password', role: "user5"
}
21 changes: 17 additions & 4 deletions x-pack/plugin/esql/qa/security/roles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test-admin:
user1:
cluster:
- cluster:monitor/main
- manage_enrich
indices:
- names: ['index-user1', 'index', "test-enrich" ]
privileges:
Expand All @@ -22,8 +23,7 @@ user1:
- indices:admin/refresh

user2:
cluster:
- cluster:monitor/main
cluster: []
indices:
- names: [ 'index-user2', 'index' ]
privileges:
Expand All @@ -33,8 +33,7 @@ user2:
- indices:admin/refresh

user3:
cluster:
- cluster:monitor/main
cluster: []
indices:
- names: [ 'index' ]
privileges: [ 'read' ]
Expand All @@ -44,3 +43,17 @@ user3:
"org": "sales"
}
}
user4:
cluster:
- monitor_enrich
indices:
- names: ['index-user1', 'index', "test-enrich" ]
privileges:
- read
user5:
cluster: []
indices:
- names: ['index-user1', 'index', "test-enrich" ]
privileges:
- read
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql;

import org.apache.http.HttpStatus;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
Expand Down Expand Up @@ -152,15 +153,26 @@ record Listen(long timestamp, String songId, double duration) {
client().performRequest(indexDoc);
}
refresh("test-enrich");
Response resp = runESQLCommand(
"user1",
"FROM test-enrich | ENRICH songs ON song_id | stats total_duration = sum(duration) by artist | sort artist"
);
Map<String, Object> respMap = entityAsMap(resp);
assertThat(
respMap.get("values"),
equalTo(List.of(List.of(2.75, "Disturbed"), List.of(10.5, "Eagles"), List.of(8.25, "Linkin Park")))
for (String user : List.of("user1", "user4")) {
Response resp = runESQLCommand(
user,
"FROM test-enrich | ENRICH songs ON song_id | stats total_duration = sum(duration) by artist | sort artist"
);
Map<String, Object> respMap = entityAsMap(resp);
assertThat(
respMap.get("values"),
equalTo(List.of(List.of(2.75, "Disturbed"), List.of(10.5, "Eagles"), List.of(8.25, "Linkin Park")))
);
}

ResponseException resp = expectThrows(
ResponseException.class,
() -> runESQLCommand(
"user5",
"FROM test-enrich | ENRICH songs ON song_id | stats total_duration = sum(duration) by artist | sort artist"
)
);
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_FORBIDDEN));
} finally {
removeEnrichPolicy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -23,7 +22,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
Expand Down Expand Up @@ -52,7 +50,6 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry;
Expand Down Expand Up @@ -141,19 +138,15 @@ public void lookupAsync(
}
DiscoveryNode targetNode = clusterState.nodes().get(shardRouting.currentNodeId());
LookupRequest lookupRequest = new LookupRequest(sessionId, shardIt.shardId(), matchType, matchField, inputPage, extractFields);
ThreadContext threadContext = transportService.getThreadPool().getThreadContext();
listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
// TODO: handle retry and avoid forking for the local lookup
transportService.sendChildRequest(
targetNode,
LOOKUP_ACTION_NAME,
lookupRequest,
parentTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener.map(r -> r.page), LookupResponse::new, executor)
);
}
// TODO: handle retry and avoid forking for the local lookup
transportService.sendChildRequest(
targetNode,
LOOKUP_ACTION_NAME,
lookupRequest,
parentTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener.map(r -> r.page), LookupResponse::new, executor)
);
}

private void doLookup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,46 +8,123 @@
package org.elasticsearch.xpack.esql.enrich;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.enrich.EnrichMetadata;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.ql.index.IndexResolver;

import java.util.Map;
import java.util.Set;

public class EnrichPolicyResolver {
private static final String RESOLVE_ACTION_NAME = "cluster:monitor/xpack/enrich/esql/resolve_policy";

private final ClusterService clusterService;
private final IndexResolver indexResolver;
private final TransportService transportService;
private final ThreadPool threadPool;

public EnrichPolicyResolver(ClusterService clusterService, IndexResolver indexResolver, ThreadPool threadPool) {
public EnrichPolicyResolver(ClusterService clusterService, TransportService transportService, IndexResolver indexResolver) {
this.clusterService = clusterService;
this.transportService = transportService;
this.indexResolver = indexResolver;
this.threadPool = threadPool;
this.threadPool = transportService.getThreadPool();
transportService.registerRequestHandler(
RESOLVE_ACTION_NAME,
threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME),
ResolveRequest::new,
new RequestHandler()
);
}

public void resolvePolicy(String policyName, ActionListener<EnrichPolicyResolution> listener) {
EnrichPolicy policy = policies().get(policyName);
ThreadContext threadContext = threadPool.getThreadContext();
listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
indexResolver.resolveAsMergedMapping(
EnrichPolicy.getBaseName(policyName),
IndexResolver.ALL_FIELDS,
false,
Map.of(),
listener.map(indexResult -> new EnrichPolicyResolution(policyName, policy, indexResult))
);
transportService.sendRequest(
clusterService.localNode(),
RESOLVE_ACTION_NAME,
new ResolveRequest(policyName),
new ActionListenerResponseHandler<>(
listener.map(r -> r.resolution),
ResolveResponse::new,
threadPool.executor(EsqlPlugin.ESQL_THREAD_POOL_NAME)
)
);
}

private static UnsupportedOperationException unsupported() {
return new UnsupportedOperationException("local node transport action");
}

private static class ResolveRequest extends TransportRequest {
private final String policyName;

ResolveRequest(String policyName) {
this.policyName = policyName;
}

ResolveRequest(StreamInput in) {
throw unsupported();
}

@Override
public void writeTo(StreamOutput out) {
throw unsupported();
}
}

private static class ResolveResponse extends TransportResponse {
private final EnrichPolicyResolution resolution;

ResolveResponse(EnrichPolicyResolution resolution) {
this.resolution = resolution;
}

ResolveResponse(StreamInput in) {
throw unsupported();
}

@Override
public void writeTo(StreamOutput out) {
throw unsupported();
}
}

private class RequestHandler implements TransportRequestHandler<ResolveRequest> {
@Override
public void messageReceived(ResolveRequest request, TransportChannel channel, Task task) throws Exception {
String policyName = request.policyName;
EnrichPolicy policy = policies().get(policyName);
ThreadContext threadContext = threadPool.getThreadContext();
ActionListener<ResolveResponse> listener = new ChannelActionListener<>(channel);
listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
try (ThreadContext.StoredContext ignored = threadContext.stashWithOrigin(ClientHelper.ENRICH_ORIGIN)) {
indexResolver.resolveAsMergedMapping(
EnrichPolicy.getBaseName(policyName),
IndexResolver.ALL_FIELDS,
false,
Map.of(),
listener.map(indexResult -> new ResolveResponse(new EnrichPolicyResolution(policyName, policy, indexResult)))
);
}
}
}

public Set<String> allPolicyNames() {
// TODO: remove this suggestion as it exposes policy names without the right permission
return policies().keySet();
}

Expand Down
Loading

0 comments on commit ae17505

Please sign in to comment.