-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
JobScheduler.java
247 lines (207 loc) · 8.79 KB
/
JobScheduler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.util;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.kernel.lifecycle.Lifecycle;
import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD;
import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.POOLED;
/**
* To be expanded, the idea here is to have a database-global service for running jobs, handling jobs crashing and so on.
*/
public interface JobScheduler extends Lifecycle
{
enum SchedulingStrategy
{
/** Create a new thread each time a job is scheduled */
NEW_THREAD,
/** Run the job from a pool of threads, shared among all groups with this strategy */
POOLED
}
/**
* Represents a common group of jobs, defining how they should be scheduled.
*/
class Group
{
public static final String THREAD_ID = "thread-id";
public static final Map<String, String> NO_METADATA = Collections.emptyMap();
private final String name;
private final SchedulingStrategy strategy;
private final AtomicInteger threadCounter = new AtomicInteger( 0 );
public Group( String name, SchedulingStrategy strategy )
{
this.name = name;
this.strategy = strategy;
}
public String name()
{
return name;
}
public SchedulingStrategy strategy()
{
return strategy;
}
/**
* Name a new thread. This method may or may not be used, it is up to the scheduling strategy to decide
* to honor this.
* @param metadata comes from {@link #schedule(Group, Runnable, Map)}
*/
public String threadName( Map<String, String> metadata )
{
if ( metadata.containsKey( THREAD_ID ) )
{
return "neo4j." + name() + "-" + metadata.get( THREAD_ID );
}
return "neo4j." + name() + "-" + threadCounter.incrementAndGet();
}
}
/**
* This is an exhaustive list of job types that run in the database. It should be expanded as needed for new groups
* of jobs.
*
* For now, this does minimal configuration, but opens up for things like common
* failure handling, shared threads and affinity strategies.
*/
class Groups
{
/** Session workers, these perform the work of actually executing client queries. */
public static final Group sessionWorker = new Group( "Session", NEW_THREAD );
/** Background index population */
public static final Group indexPopulation = new Group( "IndexPopulation", POOLED );
/** Push transactions from master to slaves */
public static final Group masterTransactionPushing = new Group( "TransactionPushing", POOLED );
/**
* Rolls back idle transactions on the server.
*/
public static final Group serverTransactionTimeout = new Group( "ServerTransactionTimeout", POOLED );
/**
* Aborts idle slave lock sessions on the master.
*/
public static final Group slaveLocksTimeout = new Group( "SlaveLocksTimeout", POOLED );
/**
* Pulls updates from the master.
*/
public static final Group pullUpdates = new Group( "PullUpdates", POOLED );
/**
* Gathers approximated data about the underlying data store.
*/
public static final Group indexSamplingController = new Group( "IndexSamplingController", POOLED );
public static final Group indexSampling = new Group( "IndexSampling", POOLED );
/**
* Rotates internal diagnostic logs
*/
public static final Group internalLogRotation = new Group( "InternalLogRotation", POOLED );
/**
* Rotates query logs
*/
public static final Group queryLogRotation = new Group( "queryLogRotation", POOLED );
/**
* Checkpoint and store flush
*/
public static final Group checkPoint = new Group( "CheckPoint", POOLED );
/**
* Raft Log pruning
*/
public static final Group raftLogPruning = new Group( "RaftLogPruning", POOLED );
/**
* Network IO threads for the Bolt protocol.
*/
public static final Group boltNetworkIO = new Group( "BoltNetworkIO", NEW_THREAD );
/**
* Reporting thread for Metrics events
*/
public static final Group metricsEvent = new Group( "MetricsEvent", POOLED );
/**
* Snapshot downloader
*/
public static final Group downloadSnapshot = new JobScheduler.Group( "DownloadSnapshot", POOLED );
/**
* UDC timed events.
*/
public static Group udc = new Group( "UsageDataCollection", POOLED );
/**
* Storage maintenance.
*/
public static Group storageMaintenance = new Group( "StorageMaintenance", POOLED );
/**
* Native security.
*/
public static Group nativeSecurity = new Group( "NativeSecurity", POOLED );
/**
* File watch service group
*/
public static Group fileWatch = new Group( "FileWatcher", NEW_THREAD );
/**
* Kernel transaction timeout monitor.
*/
public static Group transactionTimeoutMonitor = new Group( "TransactionTimeoutMonitor", POOLED );
}
interface JobHandle
{
void cancel( boolean mayInterruptIfRunning );
void waitTermination() throws InterruptedException, ExecutionException;
default void registerCancelListener( CancelListener listener )
{
throw new UnsupportedOperationException( "Unsupported in this implementation" );
}
}
/**
* Gets notified about calls to {@link JobHandle#cancel(boolean)}.
*/
interface CancelListener
{
/**
* Notification that {@link JobHandle#cancel(boolean)} was called.
*
* @param mayInterruptIfRunning argument from {@link JobHandle#cancel(boolean)} call.
*/
void cancelled( boolean mayInterruptIfRunning );
}
/** Expose a group scheduler as an {@link Executor} */
Executor executor( Group group );
/**
* Expose a group scheduler as a {@link java.util.concurrent.ThreadFactory}.
* This is a lower-level alternative than {@link #executor(Group)}, where you are in control of when to spin
* up new threads for your jobs.
*
* The lifecycle of the threads you get out of here are not managed by the JobScheduler, you own the lifecycle and
* must start the thread before it can be used.
*
* This mechanism is strongly preferred over manually creating threads, as it allows a central place for record
* keeping of thread creation, central place for customizing the threads based on their groups, and lays a
* foundation for controlling things like thread affinity and priorities in a coordinated manner in the future.
*/
ThreadFactory threadFactory( Group group );
/** Schedule a new job in the specified group. */
JobHandle schedule( Group group, Runnable job );
/** Schedule a new job in the specified group, passing in metadata for the scheduling strategy to use. */
JobHandle schedule( Group group, Runnable job, Map<String, String> metadata );
/** Schedule a new job in the specified group with the given delay */
JobHandle schedule( Group group, Runnable runnable, long initialDelay, TimeUnit timeUnit );
/** Schedule a recurring job */
JobHandle scheduleRecurring( Group group, Runnable runnable, long period, TimeUnit timeUnit );
/** Schedule a recurring job where the first invocation is delayed the specified time */
JobHandle scheduleRecurring( Group group, Runnable runnable, long initialDelay, long period, TimeUnit timeUnit );
}