Skip to content

Commit

Permalink
Merge branch 'main' into yaml-terms-enum-serverless-compat
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine committed Sep 5, 2023
2 parents 106c9ce + c9a2555 commit e76cb00
Show file tree
Hide file tree
Showing 563 changed files with 1,459 additions and 1,193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ A sample response:
----
{
"name": "my-app",
"indices": [ "index1", "index2" ],
"updated_at_millis": 1682105622204,
"template": {
"script": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ GET _tasks/<task_id>
// TEST[skip:TBD]

You can also open the Trained Models UI, select the Pipelines tab under ELSER to
follow the progress. It may take a couple of minutes to complete the process.
follow the progress.


[discrete]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public InternalAutoDateHistogram(StreamInput in) throws IOException {
super(in);
bucketInfo = new BucketInfo(in);
format = in.readNamedWriteable(DocValueFormat.class);
buckets = in.readList(stream -> new Bucket(stream, format));
buckets = in.readCollectionAsList(stream -> new Bucket(stream, format));
this.targetBuckets = in.readVInt();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_3_0)) {
bucketInnerInterval = in.readVLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder>
*/
public BucketSortPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
sorts = in.readList(FieldSortBuilder::new);
sorts = in.readCollectionAsList(FieldSortBuilder::new);
from = in.readVInt();
size = in.readOptionalVInt();
gapPolicy = GapPolicy.readFrom(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand Down Expand Up @@ -156,7 +157,7 @@ void scheduleTask() {
job = threadPool.scheduleWithFixedDelay(
() -> perform(() -> LOGGER.debug("completed tsdb update task")),
pollInterval,
ThreadPool.Names.SAME
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public Response(List<ExplainIndexDataStreamLifecycle> indices, @Nullable Rollove

public Response(StreamInput in) throws IOException {
super(in);
this.indices = in.readList(ExplainIndexDataStreamLifecycle::new);
this.indices = in.readCollectionAsList(ExplainIndexDataStreamLifecycle::new);
this.rolloverConfiguration = in.readOptionalWriteable(RolloverConfiguration::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public Response(List<DataStreamLifecycle> dataStreamLifecycles, @Nullable Rollov
}

public Response(StreamInput in) throws IOException {
this(in.readList(Response.DataStreamLifecycle::new), in.readOptionalWriteable(RolloverConfiguration::new));
this(in.readCollectionAsList(Response.DataStreamLifecycle::new), in.readOptionalWriteable(RolloverConfiguration::new));
}

public List<DataStreamLifecycle> getDataStreamLifecycles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,7 @@ public void testDownsampling() throws Exception {
Settings.builder()
.put(firstGenMetadata.getSettings())
.put(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME_KEY, firstGenIndexName)
.put(IndexMetadata.INDEX_DOWNSAMPLE_ORIGIN_NAME_KEY, firstGenIndexName)
.put(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey(), SUCCESS)
)
.numberOfReplicas(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public GeoIpDownloaderStats getStats() {

@Override
protected List<NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(NodeResponse::new);
return in.readCollectionAsList(NodeResponse::new);
}

@Override
Expand Down Expand Up @@ -164,10 +164,10 @@ public static class NodeResponse extends BaseNodeResponse {
protected NodeResponse(StreamInput in) throws IOException {
super(in);
stats = in.readBoolean() ? new GeoIpDownloaderStats(in) : null;
databases = in.readImmutableSet(StreamInput::readString);
filesInTemp = in.readImmutableSet(StreamInput::readString);
databases = in.readCollectionAsImmutableSet(StreamInput::readString);
filesInTemp = in.readCollectionAsImmutableSet(StreamInput::readString);
configDatabases = in.getTransportVersion().onOrAfter(TransportVersion.V_8_0_0)
? in.readImmutableSet(StreamInput::readString)
? in.readCollectionAsImmutableSet(StreamInput::readString)
: null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public MultiSearchTemplateRequest() {}
MultiSearchTemplateRequest(StreamInput in) throws IOException {
super(in);
maxConcurrentSearchRequests = in.readVInt();
requests = in.readList(SearchTemplateRequest::new);
requests = in.readCollectionAsList(SearchTemplateRequest::new);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public Response(List<String> scriptContextNames, PainlessContextInfo painlessCon

public Response(StreamInput in) throws IOException {
super(in);
scriptContextNames = in.readStringList();
scriptContextNames = in.readStringCollectionAsList();
painlessContextInfo = in.readOptionalWriteable(PainlessContextInfo::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public PainlessContextClassBindingInfo(StreamInput in) throws IOException {
name = in.readString();
rtn = in.readString();
readOnly = in.readInt();
parameters = in.readImmutableList(StreamInput::readString);
parameters = in.readCollectionAsImmutableList(StreamInput::readString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ public PainlessContextClassInfo(
public PainlessContextClassInfo(StreamInput in) throws IOException {
name = in.readString();
imported = in.readBoolean();
constructors = in.readImmutableList(PainlessContextConstructorInfo::new);
staticMethods = in.readImmutableList(PainlessContextMethodInfo::new);
methods = in.readImmutableList(PainlessContextMethodInfo::new);
staticFields = in.readImmutableList(PainlessContextFieldInfo::new);
fields = in.readImmutableList(PainlessContextFieldInfo::new);
constructors = in.readCollectionAsImmutableList(PainlessContextConstructorInfo::new);
staticMethods = in.readCollectionAsImmutableList(PainlessContextMethodInfo::new);
methods = in.readCollectionAsImmutableList(PainlessContextMethodInfo::new);
staticFields = in.readCollectionAsImmutableList(PainlessContextFieldInfo::new);
fields = in.readCollectionAsImmutableList(PainlessContextFieldInfo::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public PainlessContextConstructorInfo(String declaring, List<String> parameters)

public PainlessContextConstructorInfo(StreamInput in) throws IOException {
declaring = in.readString();
parameters = in.readImmutableList(StreamInput::readString);
parameters = in.readCollectionAsImmutableList(StreamInput::readString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ public PainlessContextInfo(

public PainlessContextInfo(StreamInput in) throws IOException {
name = in.readString();
classes = in.readImmutableList(PainlessContextClassInfo::new);
importedMethods = in.readImmutableList(PainlessContextMethodInfo::new);
classBindings = in.readImmutableList(PainlessContextClassBindingInfo::new);
instanceBindings = in.readImmutableList(PainlessContextInstanceBindingInfo::new);
classes = in.readCollectionAsImmutableList(PainlessContextClassInfo::new);
importedMethods = in.readCollectionAsImmutableList(PainlessContextMethodInfo::new);
classBindings = in.readCollectionAsImmutableList(PainlessContextClassBindingInfo::new);
instanceBindings = in.readCollectionAsImmutableList(PainlessContextInstanceBindingInfo::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public PainlessContextInstanceBindingInfo(StreamInput in) throws IOException {
declaring = in.readString();
name = in.readString();
rtn = in.readString();
parameters = in.readImmutableList(StreamInput::readString);
parameters = in.readCollectionAsImmutableList(StreamInput::readString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public PainlessContextMethodInfo(StreamInput in) throws IOException {
declaring = in.readString();
name = in.readString();
rtn = in.readString();
parameters = in.readImmutableList(StreamInput::readString);
parameters = in.readCollectionAsImmutableList(StreamInput::readString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ protected PercolateQueryBuilder(String field, Supplier<BytesReference> documentS
} else {
indexedDocumentVersion = null;
}
documents = in.readImmutableList(StreamInput::readBytesReference);
documents = in.readCollectionAsImmutableList(StreamInput::readBytesReference);
if (documents.isEmpty() == false) {
documentXContentType = in.readEnum(XContentType.class);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public EvalQueryQuality(String id, double metricScore) {
public EvalQueryQuality(StreamInput in) throws IOException {
this.queryId = in.readString();
this.metricScore = in.readDouble();
this.ratedHits = in.readList(RatedSearchHit::new);
this.ratedHits = in.readCollectionAsList(RatedSearchHit::new);
this.optionalMetricDetails = in.readOptionalNamedWriteable(MetricDetail.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public String toString() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(metricScore);
out.writeMap(details, (o, v) -> v.writeTo(o));
out.writeMap(details, StreamOutput::writeWriteable);
out.writeMap(failures, StreamOutput::writeException);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public RankEvalSpec(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(ratedRequests);
out.writeNamedWriteable(metric);
out.writeMap(templates, (o, v) -> v.writeTo(o));
out.writeMap(templates, StreamOutput::writeWriteable);
out.writeVInt(maxConcurrentSearches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@
@SuppressForbidden(reason = "It wraps a ThreadPool and delegates all the work")
public class ReactorScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService {
private final ThreadPool threadPool;
private final String executorName;
private final ExecutorService delegate;
private final Logger logger = LogManager.getLogger(ReactorScheduledExecutorService.class);

public ReactorScheduledExecutorService(ThreadPool threadPool, String executorName) {
this.threadPool = threadPool;
this.executorName = executorName;
this.delegate = threadPool.executor(executorName);
}

Expand All @@ -54,14 +52,14 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
} catch (Exception e) {
throw new RuntimeException(e);
}
}, new TimeValue(delay, unit), executorName);
}, new TimeValue(delay, unit), delegate);

return new ReactorFuture<>(schedule);
}

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
Runnable decoratedCommand = decorateRunnable(command);
Scheduler.ScheduledCancellable schedule = threadPool.schedule(decoratedCommand, new TimeValue(delay, unit), executorName);
Scheduler.ScheduledCancellable schedule = threadPool.schedule(decoratedCommand, new TimeValue(delay, unit), delegate);
return new ReactorFuture<>(schedule);
}

Expand All @@ -75,11 +73,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
} catch (EsRejectedExecutionException e) {
if (e.isExecutorShutdown()) {
logger.debug(
() -> format(
"could not schedule execution of [%s] on [%s] as executor is shut down",
decoratedCommand,
executorName
),
() -> format("could not schedule execution of [%s] on [%s] as executor is shut down", decoratedCommand, delegate),
e
);
} else {
Expand All @@ -93,7 +87,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
Runnable decorateRunnable = decorateRunnable(command);

Scheduler.Cancellable cancellable = threadPool.scheduleWithFixedDelay(decorateRunnable, new TimeValue(delay, unit), executorName);
Scheduler.Cancellable cancellable = threadPool.scheduleWithFixedDelay(decorateRunnable, new TimeValue(delay, unit), delegate);

return new ReactorFuture<>(cancellable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
for (MultipartUpload currentUpload : currentUploads) {
final var currentUploadId = currentUpload.getUploadId();
if (uploadId.equals(currentUploadId) == false) {
threadPool.executor(ThreadPool.Names.SNAPSHOT)
blobStore.getSnapshotExecutor()
.execute(ActionRunnable.run(listeners.acquire(), () -> safeAbortMultipartUpload(currentUploadId)));
}
}
Expand All @@ -761,7 +761,7 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
if (uploadIndex > 0) {
threadPool.scheduleUnlessShuttingDown(
TimeValue.timeValueMillis(TimeValue.timeValueSeconds(uploadIndex).millis() + Randomness.get().nextInt(50)),
ThreadPool.Names.SNAPSHOT,
blobStore.getSnapshotExecutor(),
cancelConcurrentUpdates
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -71,6 +72,7 @@ class S3BlobStore implements BlobStore {
private final RepositoryMetadata repositoryMetadata;

private final ThreadPool threadPool;
private final Executor snapshotExecutor;

private final Stats stats = new Stats();

Expand Down Expand Up @@ -101,6 +103,7 @@ class S3BlobStore implements BlobStore {
this.storageClass = initStorageClass(storageClass);
this.repositoryMetadata = repositoryMetadata;
this.threadPool = threadPool;
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
this.getMetricCollector = new IgnoreNoResponseMetricsCollector() {
@Override
public void collectMetrics(Request<?> request) {
Expand Down Expand Up @@ -145,6 +148,10 @@ public void collectMetrics(Request<?> request) {
};
}

public Executor getSnapshotExecutor() {
return snapshotExecutor;
}

public TimeValue getCompareAndExchangeTimeToLive() {
return service.compareAndExchangeTimeToLive;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
Expand Down Expand Up @@ -154,7 +155,7 @@ protected BlobContainer createBlobContainer(
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
repositoryMetadata,
BigArrays.NON_RECYCLING_INSTANCE,
null
new DeterministicTaskQueue().getThreadPool()
)
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
Expand Down Expand Up @@ -111,7 +112,7 @@ public Collection<Object> createComponents(
if (rc < 0) {
logger.warn("extending startup timeout via sd_notify failed with [{}]", rc);
}
}, TimeValue.timeValueSeconds(15), ThreadPool.Names.SAME));
}, TimeValue.timeValueSeconds(15), EsExecutors.DIRECT_EXECUTOR_SERVICE));
return List.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.systemd;

import org.elasticsearch.Build;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -30,6 +31,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -47,8 +49,13 @@ public class SystemdPluginTests extends ESTestCase {

{
when(extender.cancel()).thenReturn(true);
when(threadPool.scheduleWithFixedDelay(any(Runnable.class), eq(TimeValue.timeValueSeconds(15)), eq(ThreadPool.Names.SAME)))
.thenReturn(extender);
when(
threadPool.scheduleWithFixedDelay(
any(Runnable.class),
eq(TimeValue.timeValueSeconds(15)),
same(EsExecutors.DIRECT_EXECUTOR_SERVICE)
)
).thenReturn(extender);
}

public void testIsEnabled() {
Expand Down
Loading

0 comments on commit e76cb00

Please sign in to comment.