/
OrbitServer.kt
223 lines (183 loc) · 6.95 KB
/
OrbitServer.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
Copyright (C) 2015 - 2019 Electronic Arts Inc. All rights reserved.
This file is part of the Orbit Project <https://www.orbit.cloud>.
See license in LICENSE.
*/
package orbit.server
import io.micrometer.core.instrument.MeterRegistry
import kotlinx.coroutines.launch
import mu.KotlinLogging
import orbit.server.auth.AuthSystem
import orbit.server.concurrent.RuntimePools
import orbit.server.concurrent.RuntimeScopes
import orbit.server.mesh.AddressableDirectory
import orbit.server.mesh.AddressableManager
import orbit.server.mesh.ClusterManager
import orbit.server.mesh.LocalNodeInfo
import orbit.server.mesh.NodeDirectory
import orbit.server.net.ConnectionManager
import orbit.server.net.RemoteMeshNodeManager
import orbit.server.pipeline.Pipeline
import orbit.server.pipeline.PipelineSteps
import orbit.server.pipeline.step.AuthStep
import orbit.server.pipeline.step.BlankStep
import orbit.server.pipeline.step.EchoStep
import orbit.server.pipeline.step.IdentityStep
import orbit.server.pipeline.step.PlacementStep
import orbit.server.pipeline.step.RoutingStep
import orbit.server.pipeline.step.TransportStep
import orbit.server.pipeline.step.VerifyStep
import orbit.server.router.Router
import orbit.server.service.AddressableManagementService
import orbit.server.service.ConnectionService
import orbit.server.service.GrpcEndpoint
import orbit.server.service.HealthCheckList
import orbit.server.service.HealthService
import orbit.server.service.NodeManagementService
import orbit.server.service.ServerAuthInterceptor
import orbit.shared.mesh.NodeStatus
import orbit.util.concurrent.ShutdownLatch
import orbit.util.di.ComponentContainer
import orbit.util.time.Clock
import orbit.util.time.ConstantTicker
import orbit.util.time.stopwatch
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
class OrbitServer(private val config: OrbitServerConfig) {
constructor() : this(OrbitServerConfig())
private val logger = KotlinLogging.logger {}
private var shutdownLatch = AtomicReference<ShutdownLatch>()
private val runtimePools = RuntimePools(
cpuPool = config.cpuPool,
ioPool = config.ioPool
)
private val runtimeScopes = RuntimeScopes(
runtimePools = runtimePools,
exceptionHandler = this::onUnhandledException
)
private val container = ComponentContainer()
private val clock = Clock()
private val grpcEndpoint by container.inject<GrpcEndpoint>()
private val localNodeInfo by container.inject<LocalNodeInfo>()
private val nodeManager by container.inject<ClusterManager>()
private val nodeDirectory by container.inject<NodeDirectory>()
private val addressableDirectory by container.inject<AddressableDirectory>()
private val pipeline by container.inject<Pipeline>()
private val remoteMeshNodeManager by container.inject<RemoteMeshNodeManager>()
private val ticker = ConstantTicker(
scope = runtimeScopes.cpuScope,
targetTickRate = config.tickRate.toMillis(),
clock = clock,
logger = logger,
exceptionHandler = this::onUnhandledException,
autoStart = false,
onTick = this::tick
)
init {
container.configure {
instance(this@OrbitServer)
instance(config)
instance(runtimePools)
instance(runtimeScopes)
instance(clock)
instance(config.serverInfo)
// Service
definition<GrpcEndpoint>()
definition<ServerAuthInterceptor>()
definition<NodeManagementService>()
definition<AddressableManagementService>()
definition<ConnectionService>()
definition<HealthCheckList>()
definition<HealthService>()
// Net
definition<ConnectionManager>()
// Pipeline
definition<Pipeline>()
definition<PipelineSteps>()
definition<BlankStep>()
definition<PlacementStep>()
definition<IdentityStep>()
definition<RoutingStep>()
definition<EchoStep>()
definition<VerifyStep>()
definition<AuthStep>()
definition<TransportStep>()
// Mesh
definition<LocalNodeInfo>()
definition<ClusterManager>()
definition<AddressableManager>()
definition<RemoteMeshNodeManager>()
externallyConfigured(config.nodeDirectory)
externallyConfigured(config.addressableDirectory)
externallyConfigured(config.meterRegistry)
// Auth
definition<AuthSystem>()
// Router
definition<Router>()
}
}
fun start() = runtimeScopes.cpuScope.launch {
logger.info("Starting Orbit server...")
val (elapsed, _) = stopwatch(clock) {
// Start the pipeline
pipeline.start()
// Setup the local node information
localNodeInfo.start()
// Start tick
ticker.start()
// Start gRPC endpoint
// We shouldn't do this until we're ready to serve traffic
grpcEndpoint.start()
// Flip status to active
localNodeInfo.updateInfo {
it.copy(nodeStatus = NodeStatus.ACTIVE)
}
// Acquire the latch
if (config.acquireShutdownLatch) {
ShutdownLatch().also {
shutdownLatch.set(it)
it.acquire()
}
}
}
logger.info("Orbit server started successfully in {}ms.", elapsed)
}
fun stop() = runtimeScopes.cpuScope.launch {
logger.info("Stopping Orbit server...")
val (elapsed, _) = stopwatch(clock) {
// Flip status to draining
localNodeInfo.updateInfo {
it.copy(nodeStatus = NodeStatus.DRAINING)
}
// Stop gRPC
val grpcEndpoint by container.inject<GrpcEndpoint>()
grpcEndpoint.stop()
// Stop the tick
ticker.stop()
// Stop pipeline
pipeline.stop()
// Flip status to draining
localNodeInfo.updateInfo {
it.copy(nodeStatus = NodeStatus.STOPPED)
}
// Release the latch
shutdownLatch.get()?.release().also {
shutdownLatch.set(null)
}
}
logger.info("Orbit server stopped successfully in {}ms.", elapsed)
}
private suspend fun tick() {
localNodeInfo.tick()
nodeManager.tick()
nodeDirectory.tick()
addressableDirectory.tick()
remoteMeshNodeManager.tick()
}
@Suppress("UNUSED_PARAMETER")
private fun onUnhandledException(coroutineContext: CoroutineContext, throwable: Throwable) =
onUnhandledException(throwable)
private fun onUnhandledException(throwable: Throwable) {
logger.error(throwable) { "Unhandled exception in Orbit." }
}
}