/
WorkerContext.java
139 lines (112 loc) · 4.02 KB
/
WorkerContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package org.ray.runtime;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.ray.api.id.JobId;
import org.ray.api.id.TaskId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.generated.Common.WorkerType;
import org.ray.runtime.raylet.RayletClientImpl;
import org.ray.runtime.task.TaskSpec;
/**
* This is a wrapper class for worker context of core worker.
*/
public class WorkerContext {
/**
* The native pointer of worker context of core worker.
*/
private final long nativeWorkerContextPointer;
private ClassLoader currentClassLoader;
/**
* The ID of main thread which created the worker context.
*/
private long mainThreadId;
/**
* The run-mode of this worker.
*/
private RunMode runMode;
public WorkerContext(WorkerType workerType, JobId jobId, RunMode runMode) {
this.nativeWorkerContextPointer = nativeCreateWorkerContext(workerType.getNumber(), jobId.getBytes());
mainThreadId = Thread.currentThread().getId();
this.runMode = runMode;
currentClassLoader = null;
}
public long getNativeWorkerContext() {
return nativeWorkerContextPointer;
}
/**
* @return For the main thread, this method returns the ID of this worker's current running task;
* for other threads, this method returns a random ID.
*/
public TaskId getCurrentTaskId() {
return new TaskId(nativeGetCurrentTaskId(nativeWorkerContextPointer));
}
/**
* Set the current task which is being executed by the current worker. Note, this method can only
* be called from the main thread.
*/
public void setCurrentTask(TaskSpec task, ClassLoader classLoader) {
if (runMode == RunMode.CLUSTER) {
Preconditions.checkState(
Thread.currentThread().getId() == mainThreadId,
"This method should only be called from the main thread."
);
}
Preconditions.checkNotNull(task);
byte[] taskSpec = RayletClientImpl.convertTaskSpecToProtobuf(task);
nativeSetCurrentTask(nativeWorkerContextPointer, taskSpec);
currentClassLoader = classLoader;
}
/**
* Increment the put index and return the new value.
*/
public int nextPutIndex() {
return nativeGetNextPutIndex(nativeWorkerContextPointer);
}
/**
* Increment the task index and return the new value.
*/
public int nextTaskIndex() {
return nativeGetNextTaskIndex(nativeWorkerContextPointer);
}
/**
* @return The ID of the current worker.
*/
public UniqueId getCurrentWorkerId() {
return new UniqueId(nativeGetCurrentWorkerId(nativeWorkerContextPointer));
}
/**
* The ID of the current job.
*/
public JobId getCurrentJobId() {
return JobId.fromByteBuffer(nativeGetCurrentJobId(nativeWorkerContextPointer));
}
/**
* @return The class loader which is associated with the current job.
*/
public ClassLoader getCurrentClassLoader() {
return currentClassLoader;
}
/**
* Get the current task.
*/
public TaskSpec getCurrentTask() {
byte[] bytes = nativeGetCurrentTask(nativeWorkerContextPointer);
if (bytes == null) {
return null;
}
return RayletClientImpl.parseTaskSpecFromProtobuf(bytes);
}
public void destroy() {
nativeDestroy(nativeWorkerContextPointer);
}
private static native long nativeCreateWorkerContext(int workerType, byte[] jobId);
private static native byte[] nativeGetCurrentTaskId(long nativeWorkerContextPointer);
private static native void nativeSetCurrentTask(long nativeWorkerContextPointer, byte[] taskSpec);
private static native byte[] nativeGetCurrentTask(long nativeWorkerContextPointer);
private static native ByteBuffer nativeGetCurrentJobId(long nativeWorkerContextPointer);
private static native byte[] nativeGetCurrentWorkerId(long nativeWorkerContextPointer);
private static native int nativeGetNextTaskIndex(long nativeWorkerContextPointer);
private static native int nativeGetNextPutIndex(long nativeWorkerContextPointer);
private static native void nativeDestroy(long nativeWorkerContextPointer);
}