-
Notifications
You must be signed in to change notification settings - Fork 0
/
AbstractWorker.java
131 lines (113 loc) · 4.18 KB
/
AbstractWorker.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/*
* Copyright 2023 Swift Software Group, Inc.
* (Code and content before December 13, 2023, Copyright Netflix, Inc.)
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.swiftconductor.conductor.client.worker;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.util.EC2MetadataUtils;
import com.swiftconductor.conductor.client.config.PropertyFactory;
import com.swiftconductor.conductor.common.metadata.tasks.Task;
import com.swiftconductor.conductor.common.metadata.tasks.TaskResult;
public interface AbstractWorker {
/**
* Retrieve the name of the task definition the worker is currently working on.
*
* @return the name of the task definition.
*/
String getTaskDefName();
/**
* Executes a task and returns the updated task.
*
* @param task
* Task to be executed.
* @return the {@link TaskResult} object If the task is not completed yet,
* return with the status as IN_PROGRESS.
*/
TaskResult execute(Task task);
/**
* Called when the task coordinator fails to update the task to the server.
* Client should store the task id (in a database) and retry the update later
*
* @param task
* Task which cannot be updated back to the server.
*/
default void onErrorUpdate(Task task) {
}
/**
* Override this method to pause the worker from polling.
*
* @return true if the worker is paused and no more tasks should be polled from
* server.
*/
default boolean paused() {
return PropertyFactory.getBoolean(getTaskDefName(), "paused", false);
}
/**
* Override this method to app specific rules.
*
* @return returns the serverId as the id of the instance that the worker is
* running.
*/
default String getIdentity() {
String serverId;
try {
serverId = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
serverId = System.getenv("HOSTNAME");
}
if (serverId == null) {
serverId = (EC2MetadataUtils.getInstanceId() == null)
? System.getProperty("user.name")
: EC2MetadataUtils.getInstanceId();
}
LoggerHolder.logger.debug("Setting worker id to {}", serverId);
return serverId;
}
/**
* Override this method to change the interval between polls.
*
* @return interval in millisecond at which the server should be polled for
* worker tasks.
*/
default int getPollingInterval() {
return PropertyFactory.getInteger(getTaskDefName(), "pollInterval", 1000);
}
default boolean leaseExtendEnabled() {
return PropertyFactory.getBoolean(getTaskDefName(), "leaseExtendEnabled", false);
}
default int getBatchPollTimeoutInMS() {
return PropertyFactory.getInteger(getTaskDefName(), "batchPollTimeoutInMS", 1000);
}
static AbstractWorker create(String taskType, Function<Task, TaskResult> executor) {
return new AbstractWorker() {
@Override
public String getTaskDefName() {
return taskType;
}
@Override
public TaskResult execute(Task task) {
return executor.apply(task);
}
@Override
public boolean paused() {
return AbstractWorker.super.paused();
}
};
}
}
final class LoggerHolder {
static final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
}