-
Notifications
You must be signed in to change notification settings - Fork 10
Integrates security with asynchronous search #11
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will take a deeper look in a while
build.gradle
Outdated
@@ -56,6 +56,7 @@ dependencies { | |||
testCompile "org.elasticsearch.plugin:reindex-client:${es_version}" | |||
compileOnly "org.elasticsearch.plugin:transport-netty4-client:${es_version}" | |||
compileOnly "org.elasticsearch:elasticsearch:${es_version}" | |||
compile "com.amazon.opendistroforelasticsearch:common-utils:1.11.0.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets not hardcode the version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Probably like this? https://github.com/opendistro-for-elasticsearch/alerting/blob/master/core/build.gradle#L28
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is new version of common-utils: 1.11.1.0 for 7.9.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the version
if(userDetails == null) { | ||
return null; | ||
} | ||
String name = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ""?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name is the only part of user object which can't be null. Hence initialised with empty string. Should I use new String() instead ?
|
||
public AsyncSearchActiveContext(AsyncSearchContextId asyncSearchContextId, String nodeId, | ||
TimeValue keepAlive, boolean keepOnCompletion, | ||
ThreadPool threadPool, LongSupplier currentTimeSupplier, | ||
AsyncSearchProgressListener searchProgressActionListener, | ||
AsyncSearchContextListener asyncSearchContextListener) { | ||
AsyncSearchContextListener asyncSearchContextListener, User user) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
public AsyncSearchPersistenceModel(long startTimeMillis, long expirationTimeMillis, String response, String error) { | ||
public AsyncSearchPersistenceModel(long startTimeMillis, long expirationTimeMillis, String response, String error, User user) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we retain the two constructors one for SearchResponse one for Exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one constructor was being used in the main code. This is after we made the state machine changes and this is called from BeginPersistChanges. The two functions that I removed were only being used by test cases.
if(user == null) { | ||
client.delete(new DeleteRequest(ASYNC_SEARCH_RESPONSE_INDEX, id), ActionListener.wrap(deleteResponse -> { | ||
if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) { | ||
logger.warn("Delete async search {} successful. Returned result {}", id, deleteResponse.getResult()); | ||
listener.onResponse(true); | ||
} else { | ||
logger.debug("Delete async search {} unsuccessful. Returned result {}", id, deleteResponse.getResult()); | ||
listener.onResponse(false); | ||
} | ||
}, e -> { | ||
final Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
if (cause instanceof DocumentMissingException) { | ||
logger.warn(() -> new ParameterizedMessage("Async search response doc already deleted {}", id), e); | ||
listener.onResponse(false); | ||
} else { | ||
logger.warn(() -> new ParameterizedMessage("Failed to delete async search for id {}", id), e); | ||
listener.onFailure(cause instanceof Exception ? (Exception) cause : new NotSerializableExceptionWrapper(cause)); | ||
} | ||
})); | ||
} | ||
else { | ||
UpdateRequest updateRequest = new UpdateRequest(ASYNC_SEARCH_RESPONSE_INDEX, id); | ||
String scriptCode = "if (ctx._source.user == null || ctx._source.user.backend_roles == null || " + | ||
"( params.backend_roles!=null && params.backend_roles.containsAll(ctx._source.user.backend_roles))) " + | ||
"{ ctx.op = 'delete' } else { ctx.op = 'none' }"; | ||
Map<String, Object> params = new HashMap<>(); | ||
params.put("backend_roles", user.getBackendRoles()); | ||
Script deleteConditionallyScript = new Script(ScriptType.INLINE, "painless", scriptCode, params); | ||
updateRequest.script(deleteConditionallyScript); | ||
client.update(updateRequest, ActionListener.wrap(deleteResponse -> { | ||
switch (deleteResponse.getResult()) { | ||
case UPDATED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant code. Can we abstract out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Abstracted the exception consumer.
@@ -71,10 +73,12 @@ public TransportSubmitAsyncSearchAction(ThreadPool threadPool, TransportService | |||
@Override | |||
protected void doExecute(Task task, SubmitAsyncSearchRequest request, ActionListener<AsyncSearchResponse> listener) { | |||
AsyncSearchContext asyncSearchContext = null; | |||
String userStr = threadPool.getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OPENDISTRO_SECURITY_USER_AND_ROLES name got changed in 1.11.1.0 . We renamed it to more generic name and included tenantid also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to 1.11.1.0
ThreadContext threadContext = threadPool.getThreadContext(); | ||
String userStr = threadContext.getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER_AND_ROLES); | ||
User user = User.parse(userStr); | ||
threadContext.stashContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stashContext
returns an auto-closable. You want want to this way:
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { ....do my work }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to move to common-utils-1.11.1.0
...endistroforelasticsearch/search/async/context/persistence/AsyncSearchPersistenceService.java
Show resolved
Hide resolved
case UPDATED: | ||
listener.onFailure(new IllegalStateException("Document updated when requesting delete for async search id "+ id)); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this be IllegalStateException?
Concurrent update and deletes are still possible
UpdateRequest updateRequest = new UpdateRequest(ASYNC_SEARCH_RESPONSE_INDEX, id); | ||
updateRequest.doc(source, XContentType.JSON); | ||
updateRequest.retryOnConflict(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concurrent updates can cause a conflict. Lets retry on conflict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added retries.
listener.onResponse(false); | ||
} | ||
}, e -> { | ||
Consumer<Exception> onFailure = e -> { | ||
final Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
if (cause instanceof DocumentMissingException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is VersionConflictingEngineException
possible with concurrent updates and deletes. Lets write a test to veify
Map<String, Object> params = new HashMap<>(); | ||
params.put("backend_roles", user.getBackendRoles()); | ||
Script deleteConditionallyScript = new Script(ScriptType.INLINE, "painless", scriptCode, params); | ||
updateRequest.script(deleteConditionallyScript); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to retry on conflicts?
} | ||
} | ||
|
||
private void sendLocalRequest(AsyncSearchId asyncSearchId, Request request, ActionListener<Response> listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to executeLocally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good minor comments
Integrates security with asynchronous search. Enforce access to search results through user assigned backend roles.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.