Skip to content

Commit

Permalink
Remove the index type from internal watcher indexes (elastic#39761)
Browse files Browse the repository at this point in the history
This commit removes the "doc" type from watcher internal indexes.
The template still carries the "_doc" type since that is needed for
the internal representation.

This impacts the .watches, .triggered-watches, and .watch-history indexes.

External consumers do not need any changes since all external calls
go through the _watcher API, and should not interact with the the .index directly.

Relates elastic#38637
  • Loading branch information
jakelandis committed Mar 8, 2019
1 parent 21d9803 commit 5872ddb
Show file tree
Hide file tree
Showing 42 changed files with 177 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@
public final class TriggeredWatchStoreField {

public static final String INDEX_NAME = ".triggered_watches";
public static final String DOC_TYPE = "doc";
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public class Watch implements ToXContentObject {

public static final String INCLUDE_STATUS_KEY = "include_status";
public static final String INDEX = ".watches";
public static final String DOC_TYPE = "doc";

private final String id;
private final Trigger trigger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"index.priority": 900
},
"mappings": {
"doc": {
"_doc": {
"dynamic" : "strict",
"properties": {
"trigger_event": {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/core/src/main/resources/watch-history.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"index.format": 6
},
"mappings": {
"doc": {
"_doc": {
"_meta": {
"watcher-history-version": "${xpack.watcher.template.version}"
},
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugin/core/src/main/resources/watches.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"index.priority": 800
},
"mappings": {
"doc": {
"_doc": {
"dynamic" : "strict",
"properties": {
"status": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void setConfiguration(Configuration configuration) {
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
if (isWatchDocument(shardId.getIndexName(), operation.type())) {
if (isWatchDocument(shardId.getIndexName())) {
ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
Expand Down Expand Up @@ -150,7 +150,7 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName(), index.type())) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
}
Expand All @@ -166,7 +166,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
*/
@Override
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
if (isWatchDocument(shardId.getIndexName(), delete.type())) {
if (isWatchDocument(shardId.getIndexName())) {
triggerService.remove(delete.id());
}

Expand All @@ -177,11 +177,10 @@ public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
* Check if a supplied index and document matches the current configuration for watcher
*
* @param index The index to check for
* @param docType The document type
* @return true if this is a watch in the active watcher index, false otherwise
*/
private boolean isWatchDocument(String index, String docType) {
return configuration.isIndexAndActive(index) && docType.equals(Watch.DOC_TYPE);
private boolean isWatchDocument(String index) {
return configuration.isIndexAndActive(index);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ public static EmailAction.Builder emailAction(EmailTemplate email) {
return EmailAction.builder(email);
}

/**
* Types are deprecated and should not be used. use {@link #indexAction(String)}
*/
@Deprecated
public static IndexAction.Builder indexAction(String index, String type) {
return IndexAction.builder(index, type);
}

public static IndexAction.Builder indexAction(String index) {
return IndexAction.builder(index);
}

public static JiraAction.Builder jiraAction(String account, MapBuilder<String, Object> fields) {
return jiraAction(account, fields.immutableMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class IndexAction implements Action {

public static final String TYPE = "index";

@Nullable final String docType;
@Nullable @Deprecated final String docType;
@Nullable final String index;
@Nullable final String docId;
@Nullable final String executionTimeField;
Expand All @@ -40,6 +40,15 @@ public class IndexAction implements Action {
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(IndexAction.class));
public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in a watcher index action is deprecated.";

public IndexAction(@Nullable String index, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
this(index, null, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}
/**
* Document types are deprecated, use constructor without docType
*/
@Deprecated
public IndexAction(@Nullable String index, @Nullable String docType, @Nullable String docId,
@Nullable String executionTimeField,
@Nullable TimeValue timeout, @Nullable ZoneId dynamicNameTimeZone, @Nullable RefreshPolicy refreshPolicy) {
Expand Down Expand Up @@ -188,10 +197,18 @@ public static IndexAction parse(String watchId, String actionId, XContentParser
return new IndexAction(index, docType, docId, executionTimeField, timeout, dynamicNameTimeZone, refreshPolicy);
}

/**
* Document types are deprecated, use {@link #builder(java.lang.String)}
*/
@Deprecated
public static Builder builder(String index, String docType) {
return new Builder(index, docType);
}

public static Builder builder(String index) {
return new Builder(index);
}

public static class Result extends Action.Result {

private final XContentSource response;
Expand Down Expand Up @@ -278,11 +295,20 @@ public static class Builder implements Action.Builder<IndexAction> {
ZoneId dynamicNameTimeZone;
RefreshPolicy refreshPolicy;

/**
* Document types are deprecated and should not be used. Use: {@link Builder#Builder(java.lang.String)}
*/
@Deprecated
private Builder(String index, String docType) {
this.index = index;
this.docType = docType;
}

private Builder(String index) {
this.index = index;
this.docType = null;
}

public Builder setDocId(String docId) {
this.docId = docId;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void updateWatchStatus(Watch watch) throws IOException {
.field(WatchField.STATUS.getPreferredName(), watch.status(), params)
.endObject();

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, watch.id());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, watch.id());
updateRequest.doc(source);
updateRequest.setIfSeqNo(watch.getSourceSeqNo());
updateRequest.setIfPrimaryTerm(watch.getSourcePrimaryTerm());
Expand Down Expand Up @@ -501,7 +501,7 @@ public void executeTriggeredWatches(Collection<TriggeredWatch> triggeredWatches)
*/
private GetResponse getWatch(String id) {
try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN)) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, id).preference(Preference.LOCAL.type()).realtime(true);
GetRequest getRequest = new GetRequest(Watch.INDEX, id).preference(Preference.LOCAL.type()).realtime(true);
PlainActionFuture<GetResponse> future = PlainActionFuture.newFuture();
client.get(getRequest, future);
return future.actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public BulkResponse putAll(final List<TriggeredWatch> triggeredWatches) throws I
private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatches) throws IOException {
BulkRequest request = new BulkRequest();
for (TriggeredWatch triggeredWatch : triggeredWatches) {
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE,
triggeredWatch.id().value());
IndexRequest indexRequest = new IndexRequest(TriggeredWatchStoreField.INDEX_NAME).id(triggeredWatch.id().value());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
triggeredWatch.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
Expand All @@ -115,7 +114,7 @@ private BulkRequest createBulkRequest(final List<TriggeredWatch> triggeredWatche
* @param wid The ID os the triggered watch id
*/
public void delete(Wid wid) {
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, TriggeredWatchStoreField.DOC_TYPE, wid.value());
DeleteRequest request = new DeleteRequest(TriggeredWatchStoreField.INDEX_NAME, wid.value());
bulkProcessor.add(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

public class HistoryStore {

public static final String DOC_TYPE = "doc";

private static final Logger logger = LogManager.getLogger(HistoryStore.class);

private final BulkProcessor bulkProcessor;
Expand All @@ -47,7 +45,7 @@ public void put(WatchRecord watchRecord) throws Exception {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);

IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
IndexRequest request = new IndexRequest(index).id(watchRecord.id().value()).source(builder);
request.opType(IndexRequest.OpType.CREATE);
bulkProcessor.add(request);
} catch (IOException ioe) {
Expand All @@ -64,7 +62,7 @@ public void forcePut(WatchRecord watchRecord) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
watchRecord.toXContent(builder, WatcherParams.HIDE_SECRETS);

IndexRequest request = new IndexRequest(index, DOC_TYPE, watchRecord.id().value()).source(builder);
IndexRequest request = new IndexRequest(index).id(watchRecord.id().value()).source(builder);
bulkProcessor.add(request);
} catch (IOException ioe) {
final WatchRecord wr = watchRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected void doExecute(AckWatchRequest request, ActionListener<AckWatchRespons
listener.onFailure(new ElasticsearchStatusException("watch[{}] is running currently, cannot ack until finished",
RestStatus.CONFLICT, request.getWatchId()));
} else {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand All @@ -99,7 +99,7 @@ protected void doExecute(AckWatchRequest request, ActionListener<AckWatchRespons
return;
}

UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getWatchId());
// this may reject this action, but prevents concurrent updates from a watch execution
updateRequest.setIfSeqNo(getResponse.getSeqNo());
updateRequest.setIfPrimaryTerm(getResponse.getPrimaryTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public TransportActivateWatchAction(TransportService transportService, ActionFil
protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWatchResponse> listener) {
try {
ZonedDateTime now = clock.instant().atZone(ZoneOffset.UTC);
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getWatchId());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
XContentBuilder builder = activateWatchBuilder(request.isActivate(), now);
updateRequest.doc(builder);
Expand All @@ -72,7 +72,7 @@ protected void doExecute(ActivateWatchRequest request, ActionListener<ActivateWa

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest,
ActionListener.<UpdateResponse>wrap(updateResponse -> {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getWatchId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getWatchId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public TransportDeleteWatchAction(TransportService transportService, ActionFilte

@Override
protected void doExecute(Task task, DeleteWatchRequest request, ActionListener<DeleteWatchResponse> listener) {
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
DeleteRequest deleteRequest = new DeleteRequest(Watch.INDEX, request.getId());
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, deleteRequest,
ActionListener.<DeleteResponse>wrap(deleteResponse -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public TransportExecuteWatchAction(TransportService transportService, ThreadPool
@Override
protected void doExecute(ExecuteWatchRequest request, ActionListener<ExecuteWatchResponse> listener) {
if (request.getId() != null) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public TransportGetWatchAction(TransportService transportService, ActionFilters

@Override
protected void doExecute(GetWatchRequest request, ActionListener<GetWatchResponse> listener) {
GetRequest getRequest = new GetRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId())
GetRequest getRequest = new GetRequest(Watch.INDEX, request.getId())
.preference(Preference.LOCAL.type()).realtime(true);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, getRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected void doExecute(PutWatchRequest request, ActionListener<PutWatchRespons
watch.toXContent(builder, DEFAULT_PARAMS);

if (isUpdate) {
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, request.getId());
if (request.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
updateRequest.setIfSeqNo(request.getIfSeqNo());
updateRequest.setIfPrimaryTerm(request.getIfPrimaryTerm());
Expand All @@ -112,7 +112,7 @@ protected void doExecute(PutWatchRequest request, ActionListener<PutWatchRespons
}, listener::onFailure),
client::update);
} else {
IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId());
IndexRequest indexRequest = new IndexRequest(Watch.INDEX).id(request.getId());
indexRequest.source(builder);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testCanUseAnyConcreteIndexName() throws Exception {
createIndex(watchResultsIndex);

stopWatcher();
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName);
startWatcher();

PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
Expand Down
Loading

0 comments on commit 5872ddb

Please sign in to comment.