Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Async search context and and state management framework #6

Merged
merged 22 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2b8bbfd
Async search response tests
Nov 9, 2020
db91f3b
async search context and context permit. async search stage
Nov 9, 2020
a5f9580
move context related components into a dedicated context directory
Nov 9, 2020
073d4a9
persistence context
Nov 9, 2020
3c692c1
active context
Nov 9, 2020
4e9663e
async search id tests. rename AsyncSearchContextId to AsyncSearchActi…
Nov 9, 2020
5fff307
rename AsyncSearchActiveContextId to AsyncSearchContextId
Nov 9, 2020
8ab0c79
added missing java docs. added assertions to async search stage advan…
Nov 12, 2020
2faec54
minor fixes to async search context Permit
Nov 13, 2020
1fc0a1e
revert releasable try-with-resources
Nov 13, 2020
7a6800d
added asserts in advance stage method. misc. minor fixes
Nov 13, 2020
b11868f
State machine changes
Bukhtawar Nov 16, 2020
ac9674d
added more async search transitions and events. pending fix in AsyncS…
Nov 18, 2020
5863650
async search state machine fix for trigger method. Java docs added fo…
Nov 19, 2020
2212a49
State machine wildcard capture changes
Bukhtawar Nov 19, 2020
e69d4a5
Minor state machine changes
Bukhtawar Nov 19, 2020
57010e4
async search active context tests. asyn search state machine classes …
Nov 20, 2020
c42a15b
refactored state machine transitions to map
Nov 20, 2020
0438193
Releasable Action Listener changes
Bukhtawar Nov 20, 2020
d6358f2
Async search converter and Closeable changes
Bukhtawar Nov 20, 2020
1a16c0a
Async search converter and Closeable changes tests
Bukhtawar Nov 20, 2020
5a3204b
add license on files
eirsep Nov 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
package com.amazon.opendistroforelasticsearch.search.async;

import com.amazon.opendistroforelasticsearch.search.async.context.AsyncSearchContextId;
import org.apache.lucene.store.ByteArrayDataInput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.util.Base64;
import java.util.Objects;

/**
Expand Down Expand Up @@ -62,47 +56,6 @@ public String toString() {
return "[" + node + "][" + taskId + "][" + asyncSearchContextId + "]";
}

/**
* Encodes the {@linkplain AsyncSearchId} in base64 encoding and returns an identifier for the submitted async search
*
* @param asyncSearchId The object to be encoded
* @return The id which is used to access the submitted async search
*/
public static String buildAsyncId(AsyncSearchId asyncSearchId) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeString(asyncSearchId.getNode());
out.writeLong(asyncSearchId.getTaskId());
out.writeString(asyncSearchId.getAsyncSearchContextId().getContextId());
out.writeLong(asyncSearchId.getAsyncSearchContextId().getId());
return Base64.getUrlEncoder().withoutPadding().encodeToString(BytesReference.toBytes(out.bytes()));
} catch (IOException e) {
throw new IllegalArgumentException("Cannot build async search id", e);
}
}

/**
* Attempts to decode a base64 encoded string into an {@linkplain AsyncSearchId} which contains the details pertaining to
* an async search being accessed.
*
* @param asyncSearchId The string to be decoded
* @return The parsed {@linkplain AsyncSearchId}
*/
public static AsyncSearchId parseAsyncId(String asyncSearchId) {
try {
byte[] bytes = Base64.getUrlDecoder().decode(asyncSearchId);
ByteArrayDataInput in = new ByteArrayDataInput(bytes);
String node = in.readString();
long taskId = in.readLong();
String contextId = in.readString();
long id = in.readLong();
if (in.getPosition() != bytes.length) {
throw new IllegalArgumentException("Not all bytes were read");
}
return new AsyncSearchId(node, taskId, new AsyncSearchContextId(contextId, id));
} catch (Exception e) {
throw new IllegalArgumentException("Cannot parse async search id", e);
}
}

@Override
public int hashCode() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.search.async;

import com.amazon.opendistroforelasticsearch.search.async.context.AsyncSearchContextId;
import org.apache.lucene.store.ByteArrayDataInput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.util.Base64;

public class AsyncSearchIdConverter {
/**
* Encodes the {@linkplain AsyncSearchId} in base64 encoding and returns an identifier for the submitted async search
*
* @param asyncSearchId The object to be encoded
* @return The id which is used to access the submitted async search
*/
public static String buildAsyncId(AsyncSearchId asyncSearchId) {
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeString(asyncSearchId.getNode());
out.writeLong(asyncSearchId.getTaskId());
out.writeString(asyncSearchId.getAsyncSearchContextId().getContextId());
out.writeLong(asyncSearchId.getAsyncSearchContextId().getId());
return Base64.getUrlEncoder().withoutPadding().encodeToString(BytesReference.toBytes(out.bytes()));
} catch (IOException e) {
throw new IllegalArgumentException("Cannot build async search id", e);
}
}

/**
* Attempts to decode a base64 encoded string into an {@linkplain AsyncSearchId} which contains the details pertaining to
* an async search being accessed.
*
* @param asyncSearchId The string to be decoded
* @return The parsed {@linkplain AsyncSearchId}
*/
public static AsyncSearchId parseAsyncId(String asyncSearchId) {
try {
byte[] bytes = Base64.getUrlDecoder().decode(asyncSearchId);
ByteArrayDataInput in = new ByteArrayDataInput(bytes);
String node = in.readString();
long taskId = in.readLong();
String contextId = in.readString();
long id = in.readLong();
if (in.getPosition() != bytes.length) {
throw new IllegalArgumentException("Not all bytes were read");
}
return new AsyncSearchId(node, taskId, new AsyncSearchContextId(contextId, id));
} catch (Exception e) {
throw new IllegalArgumentException("Cannot parse async search id", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ AsyncSearchProgressListener getAsyncSearchProgressListener() {
@Nullable
public AsyncSearchContextListener getContextListener() {return asyncSearchContextListener; }

public AsyncSearchState getAsyncSearchStage() {
public AsyncSearchState getAsyncSearchState() {
return currentStage;
}

public boolean isRunning() {
return getAsyncSearchStage() == AsyncSearchState.RUNNING;
return getAsyncSearchState() == AsyncSearchState.RUNNING;
}

public AsyncSearchContextId getContextId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.search.async.context.active;

import com.amazon.opendistroforelasticsearch.search.async.AsyncSearchId;
import com.amazon.opendistroforelasticsearch.search.async.AsyncSearchIdConverter;
import com.amazon.opendistroforelasticsearch.search.async.context.AsyncSearchContext;
import com.amazon.opendistroforelasticsearch.search.async.context.AsyncSearchContextId;
import com.amazon.opendistroforelasticsearch.search.async.context.permits.AsyncSearchContextPermits;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
Expand All @@ -43,7 +45,7 @@
* The context representing an ongoing search, keeps track of the underlying {@link SearchTask}
* and {@link SearchProgressActionListener}
*/
public class AsyncSearchActiveContext extends AsyncSearchContext {
public class AsyncSearchActiveContext extends AsyncSearchContext implements Closeable {

private static final Logger logger = LogManager.getLogger(AsyncSearchContext.class);

Expand Down Expand Up @@ -85,7 +87,7 @@ public void setTask(SearchTask searchTask) {
this.searchTask.set(searchTask);
this.startTimeMillis = searchTask.getStartTime();
this.expirationTimeMillis = startTimeMillis + keepAlive.getMillis();
this.asyncSearchId.set(AsyncSearchId.buildAsyncId(new AsyncSearchId(nodeId, searchTask.getId(), getContextId())));
this.asyncSearchId.set(AsyncSearchIdConverter.buildAsyncId(new AsyncSearchId(nodeId, searchTask.getId(), getContextId())));
}

public void processSearchFailure(Exception e) {
Expand Down Expand Up @@ -147,4 +149,9 @@ public void acquireContextPermit(final ActionListener<Releasable> onPermitAcquir
public void acquireAllContextPermits(final ActionListener<Releasable> onPermitAcquired, TimeValue timeout, String reason) {
asyncSearchContextPermits.asyncAcquireAllPermits(onPermitAcquired, timeout, reason);
}

@Override
public void close() {
asyncSearchContextPermits.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -38,15 +39,16 @@
* before it transitions context to the index. Provides fairness to consumers and throws {@linkplain TimeoutException} after
* maximum time has elapsed waiting for the in-flight operations block.
*/
public class AsyncSearchContextPermits {
public class AsyncSearchContextPermits implements Closeable {

private static final int TOTAL_PERMITS = Integer.MAX_VALUE;

//visible for testing
final Semaphore semaphore;
private final AsyncSearchContextId asyncSearchContextId;
private volatile String lockDetails;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will check on this. Will revisit with more tests

private final ThreadPool threadPool;
private volatile boolean closed;

private static final Logger logger = LogManager.getLogger(AsyncSearchContextPermits.class);

public AsyncSearchContextPermits(AsyncSearchContextId asyncSearchContextId, ThreadPool threadPool) {
Expand All @@ -56,24 +58,27 @@ public AsyncSearchContextPermits(AsyncSearchContextId asyncSearchContextId, Thre
}

private Releasable acquirePermits(int permits, TimeValue timeout, final String details) throws TimeoutException {
RunOnce release = new RunOnce(() -> {});
if (closed) {
throw new IllegalStateException("trying to acquire permits on closed context ["+ asyncSearchContextId +"]");
}
try {
if (semaphore.tryAcquire(permits, timeout.getMillis(), TimeUnit.MILLISECONDS)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the advantage of keeping a timeout here, considering the total permits is integer max. There are definite gains of not using timeout though. One use case I can think is for mutating operations contending for permit while all is acquired by post processor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout is because there is also an asyncAcquireAllPermits which takes integer max number of permits and hence is a blocking call (think persisting completed async search response in index). We need to timeout mutating operations which are blocked beyond a waiting time.

this.lockDetails = details;
final RunOnce release = new RunOnce(() -> semaphore.release(permits));
release = new RunOnce(() -> semaphore.release(permits));
return release::run;
} else {
throw new TimeoutException(
"obtaining context lock" + asyncSearchContextId + "timed out after " + timeout.getMillis() + "ms, " +
"previous lock details: [" + lockDetails + "] trying to lock for [" + details + "]");
throw new TimeoutException("obtaining context lock" + asyncSearchContextId + "timed out after " +
timeout.getMillis() + "ms, previous lock details: [" + lockDetails + "] trying to lock for [" + details + "]");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about permit release? Do you want to handle the interrupts upstream. That would ensure the release of the permit as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly we don't think we need to worry about releasing permit here, as one is not acquired yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) itself throws the InterruptedExecption. Hence no permits have been acquired if we encounter exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an explicit release on interrupts

release.run();
throw new RuntimeException("thread interrupted while trying to obtain context lock", e);
}
}

private void asyncAcquirePermit(
int permits, final ActionListener<Releasable> onAcquired, final TimeValue timeout, String reason) {
private void asyncAcquirePermit(int permits, final ActionListener<Releasable> onAcquired, final TimeValue timeout, String reason) {
threadPool.executor(AsyncSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME).execute(new AbstractRunnable() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we plan to introduce a separate threadpool going forward?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure which thread pool you mean, while we might add few based on our performance tests

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dedicated threadpool for different asyc search operations. Agree that can wait for more concrete results based on perf testing.

@Override
public void onFailure(final Exception e) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handlePermit release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is invoked for all exception thrown by AbstractRunnable.doRun()
the 2 cases when onFailure is invoked :

  1. permit acquisitions itself fails
  2. Permit is acquired but onAcquired fails, for permit release can be handled within onAcquired logic.

Expand Down Expand Up @@ -118,4 +123,9 @@ public void asyncAcquirePermit(final ActionListener<Releasable> onAcquired, fina
public void asyncAcquireAllPermits(final ActionListener<Releasable> onAcquired, final TimeValue timeout, String reason) {
asyncAcquirePermit(TOTAL_PERMITS, onAcquired, timeout, reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question] I don't really get the logic of Integer.MAX_VALUE.
We are saying that for any mutation operation we will first acquire lock on the permit value 1(asyncAcquirePermit method) and release it at the end, why to we need to consider the MAX_VALUE space?
Shouldn't acquiring a lock on the same permit value(1) will suffice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we acquire a single permit out of Integer.MAX_VALUE number of available permits, we imply that its a non
blocking operation. We support concurrent operations of this sort.(asyncAcquirePermits) For ex. updating expiry time is non blocking.

When we acquire all available permits, we imply that its a blocking operation.(asyncAcquireAllPermits). All other operations trying to acquire 1 or all permits will have to wait on this or timeout. For ex. storing completed async search response in index is blocking operation.

}

@Override
public void close() {
closed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Exception getSearchError() {
}

@Override
public AsyncSearchState getAsyncSearchStage() {
public AsyncSearchState getAsyncSearchState() {
return AsyncSearchState.PERSISTED;
}

Expand Down

This file was deleted.

Loading