Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Service layer and Transport Handlers #8

Merged
merged 51 commits into from
Dec 14, 2020
Merged

Conversation

eirsep
Copy link
Contributor

@eirsep eirsep commented Nov 23, 2020

Issue #3

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@Bukhtawar Bukhtawar changed the title async search service. async search submit, get,delete transport actions. Service layer and Transport Handlers Nov 24, 2020
Copy link
Member

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Surya few comments from the first pass, will take a detailed look

Comment on lines 88 to 98
BytesReference bytesReference =
BytesReference.fromByteBuffer(ByteBuffer.wrap(Base64.getUrlDecoder().decode(asyncSearchPersistenceModel.getResponse())));
try {
NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(bytesReference.streamInput(),
namedWriteableRegistry);
SearchResponse asyncSearchResponse = new SearchResponse(wrapperStreamInput);
wrapperStreamInput.close();
return asyncSearchResponse;
} catch (IOException e) {
logger.error("Failed to parse search response " + asyncSearchPersistenceModel.getResponse(), e);
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use try-with-resource to avoid resource leaks

Comment on lines 109 to 117
try {
NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(bytesReference.streamInput(),
namedWriteableRegistry);
ElasticsearchException exception =wrapperStreamInput.readException();
wrapperStreamInput.close();
return exception;
} catch (IOException e) {
logger.error("Failed to parse search error " + asyncSearchPersistenceModel.getError(), e);
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try with resource here as well

Comment on lines 118 to 157
(s, e) -> {}, (contextId, listener) -> listener.onContextPersisted(contextId), SearchResponsePersistedEvent.class));
(s, e) -> {
}, (contextId, listener) -> listener.onContextPersisted(contextId), SearchResponsePersistedEvent.class));

stateMachine.registerTransition(new AsyncSearchTransition<>(FAILED, PERSISTED,
(s, e) -> {}, (contextId, listener) -> listener.onContextPersisted(contextId), SearchResponsePersistedEvent.class));
(s, e) -> {
}, (contextId, listener) -> listener.onContextPersisted(contextId), SearchResponsePersistedEvent.class));

//persist failed
stateMachine.registerTransition(new AsyncSearchTransition<>(SUCCEEDED, PERSIST_FAILED,
(s, e) -> {}, (contextId, listener) -> listener.onContextPersistFailed(contextId), SearchResponsePersistFailedEvent.class));
(s, e) -> {
}, (contextId, listener) -> listener.onContextPersistFailed(contextId), SearchResponsePersistFailedEvent.class));

stateMachine.registerTransition(new AsyncSearchTransition<>(FAILED, PERSIST_FAILED,
(s, e) -> {}, (contextId, listener) -> listener.onContextPersistFailed(contextId), SearchResponsePersistFailedEvent.class));
(s, e) -> {
}, (contextId, listener) -> listener.onContextPersistFailed(contextId), SearchResponsePersistFailedEvent.class));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets addd a TODO to populate respective EVents here

Comment on lines 91 to 95
if (asyncSearchContext.getAsyncSearchState() == AsyncSearchState.DELETED) {
logger.debug("Async search context {} has been moved to DELETED while waiting to acquire permits for post " +
"processing", asyncSearchContext.getAsyncSearchId());
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets release a permit here

return;
}
asyncSearchPersistenceService.storeResponse(asyncSearchContext.getAsyncSearchId(),
persistenceModel, ActionListener.wrap(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionListener.runafter(ActionListener.wrap(..., releasable::close) instead

protected void doRun() {
//should we look at the task status and retry on forwarding the request if the search is still RUNNING based
//on task status
ClusterState state = observer.setAndGetObservedState();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets skip this for now

//import static org.mockito.Mockito.mock;
//import static org.mockito.Mockito.when;
//
//public class AsyncSearchServiceTests extends ESTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it commented?

Copy link
Member

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address some comments i have left


@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(".opendistro_asynchronous_search_response",
return Collections.singletonList(new SystemIndexDescriptor(".opendistro_asynchroAnous_search_response",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

Comment on lines 260 to 262
}
//free async search context if one exists
groupedDeletionListener.onResponse(asyncSearchActiveStore.freeContext(asyncSearchContextId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add an else case to avoid doubly cleaning up

for (AsyncSearchActiveContext asyncSearchActiveContext : asyncSearchActiveStore.getAllContexts().values()) {
AsyncSearchState stage = asyncSearchActiveContext.getAsyncSearchState();
if (stage != null && (
!asyncSearchActiveContext.retainedStages().contains(stage) || asyncSearchActiveContext.isExpired())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace ! with ==false

}
}
} catch (Exception e) {
logger.debug("Exception while reaping async search active contexts", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets improve logging, try adding async search id for every log we have for better debuggability

Comment on lines 83 to 91
public boolean freeContext(AsyncSearchContextId asyncSearchContextId) {
AsyncSearchActiveContext asyncSearchContext = activeContexts.get(asyncSearchContextId.getId());
if (asyncSearchContext != null) {
logger.debug("Removing {} from context map", asyncSearchContextId);
activeContexts.remove(asyncSearchContextId.getId());
return true;
}
return false;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part needs to be sync ed up with the lastest from context isAlive

Comment on lines 47 to 49
protected void metadataToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("limit", limit);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is it being used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 122 to 130
listener.onFailure(exp);
}
// handle request locally if we were not able to forward the request
handleRequest(asyncSearchId, request, listener);
}
});
} else {
handleRequest(asyncSearchId, request, listener);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets handle the exception only if we are unable to forward


protected FetchAsyncSearchRequest(StreamInput in) throws IOException {
super(in);
connectionTimeout = in.readTimeValue();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a required field? I think you should have a default timeout and make this optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Making it optional.

/**
* A request to delete an async search response based on its id
*/
public class DeleteAsyncSearchRequest extends FetchAsyncSearchRequest<DeleteAsyncSearchRequest> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a wierd extension name- maybe have a AsyncSearchRequest extended by both Delete and Fetch- e.g. what if the fetch request includes more params in future like pagination etc. How would that fit in with Delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming it to AsyncSearchRoutingRequest.

/**
* A request to fetch an async search response by id.
*/
public class GetAsyncSearchRequest extends FetchAsyncSearchRequest<GetAsyncSearchRequest> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear how fetch and get are different just by looking at the class names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming it to AsyncSearchRoutingRequest.

Comment on lines +78 to +81
this.searchRequest.setCcsMinimizeRoundtrips(CCR_MINIMIZE_ROUNDTRIPS);
this.searchRequest.setBatchedReduceSize(DEFAULT_BATCHED_REDUCE_SIZE);
this.searchRequest.setPreFilterShardSize(DEFAULT_PRE_FILTER_SHARD_SIZE);
this.searchRequest.requestCache(DEFAULT_REQUEST_CACHE);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if these params are already set in SearchRequest? Do you intend to override these params for async search?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all cases. Yes this needs a fix, we need to check values and only selectively override

public SubmitAsyncSearchRequest(StreamInput in) throws IOException {
super(in);
this.searchRequest = new SearchRequest(in);
this.waitForCompletionTimeout = in.readTimeValue();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Making it optional.

Comment on lines 237 to 257
asyncSearchContext.getAsyncSearchId(), response);
//Intent of the lock here is to disallow ongoing migration to system index
// as if that is underway we might end up creating a new document post a DELETE was executed
asyncSearchContext.acquireContextPermit(ActionListener.wrap(
releasable -> {
//free context marks the context as DELETED
groupedDeletionListener.onResponse(
asyncSearchActiveStore.freeContext(asyncSearchContextId));
releasable.close();
}, listener::onFailure), TimeValue.timeValueSeconds(5), "free context");
},
(e) -> {
asyncSearchContext.acquireContextPermit(ActionListener.wrap(
releasable -> {
groupedDeletionListener.onResponse(
asyncSearchActiveStore.freeContext(asyncSearchContextId));
releasable.close();
//TODO introduce request timeouts to make the permit wait transparent to the client
}, listener::onFailure), TimeValue.timeValueSeconds(5), "free context");
logger.debug(() -> new ParameterizedMessage("Unable to cancel async search task {}",
asyncSearchContext.getTask()), e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lot of duplicate code?

asyncSearchStateMachine.trigger(new SearchSuccessfulEvent(asyncSearchContext, searchResponse));
if (asyncSearchContext.shouldPersist()) {
asyncSearchStateMachine.trigger(new BeginPersistEvent(asyncSearchContext));
asyncSearchActiveStore.freeContext(asyncSearchContext.getContextId());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persist event execution frees the context. Why do you need to do this twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. this has been removed.

asyncSearchContext.acquireAllContextPermits(ActionListener.wrap(releasable -> {
// check again after acquiring permit if the context has been deleted mean while
if (asyncSearchContext.shouldPersist() == false) {
logger.debug("Async search context {} has been moved to DELETED while waiting to acquire permits for post " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no DELETED state as per your new logic?

asyncSearchStateMachine.trigger(new SearchFailureEvent(asyncSearchContext, exception));
if (asyncSearchContext.shouldPersist()) {
asyncSearchStateMachine.trigger(new BeginPersistEvent(asyncSearchContext));
asyncSearchActiveStore.freeContext(asyncSearchContext.getContextId());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why freeing the context here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. this has been removed.

asyncSearchPersistenceService.storeResponse(asyncSearchContext.getAsyncSearchId(),
persistenceModel, ActionListener.runAfter(ActionListener.wrap(
(indexResponse) -> {
//Mark any dangling reference as PERSISTED and cleaning it up from the IN_MEMORY context
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you call it dangling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrent get or delete requests may be holding reference to the active context.

}
return true;
}

@Override
public void close() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized?

@@ -59,6 +61,7 @@
private final AtomicBoolean completed;
private final SetOnce<Exception> error;
private final SetOnce<SearchResponse> searchResponse;
private final AtomicBoolean closed;
private final AsyncSearchContextPermits asyncSearchContextPermits;

public AsyncSearchActiveContext(AsyncSearchContextId asyncSearchContextId, String nodeId,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be executing even after close is called? If not, then you need to check for closed flag in every method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Have added this check in every method.

}

private AsyncSearchStateMachine getAsyncSearchStateMachineDefinition() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where has this gone now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncSearchStateMachine is being initialized in the AsyncSearchService

Copy link

@itiyamas itiyamas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, please be careful about ResourceNotFound exception. It might break some consistency guarantees of your API for concurrent DELETE and GET calls. You might want to throw NotAvailable instead of NotFound to preserve strong consistency.

Copy link

@getsaurabh02 getsaurabh02 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome job on the tests! looks pretty much in the good shape.

@Bukhtawar Bukhtawar merged commit 24440c7 into master Dec 14, 2020
@Bukhtawar Bukhtawar deleted the async_search_service branch January 2, 2021 18:34
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants