Skip to content

Commit

Permalink
Basic handling of stream disconnections and node renewal fail on clie…
Browse files Browse the repository at this point in the history
…nt for automated recovery
  • Loading branch information
JoeHegarty committed Nov 29, 2019
1 parent 30faf9c commit d7ce053
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 10 deletions.
38 changes: 37 additions & 1 deletion src/orbit-client/src/main/kotlin/orbit/client/OrbitClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import orbit.client.addressable.InvocationSystem
import orbit.client.execution.ExecutionLeases
import orbit.client.execution.ExecutionSystem
import orbit.client.mesh.AddressableLeaser
import orbit.client.mesh.NodeLeaseRenewalFailed
import orbit.client.mesh.NodeLeaseRenewalFailedHandler
import orbit.client.mesh.NodeLeaser
import orbit.client.net.ClientAuthInterceptor
import orbit.client.net.ClientState
import orbit.client.net.ConnectionHandler
import orbit.client.net.GrpcClient
import orbit.client.net.LocalNode
Expand Down Expand Up @@ -66,6 +69,7 @@ class OrbitClient(val config: OrbitClientConfig = OrbitClientConfig()) {

definition<NodeLeaser>()
definition<AddressableLeaser>()
externallyConfigured(config.nodeLeaseRenewalFailedHandler)

definition<Serializer>()

Expand All @@ -91,12 +95,18 @@ class OrbitClient(val config: OrbitClientConfig = OrbitClientConfig()) {
private val localNode by container.inject<LocalNode>()
private val definitionDirectory by container.inject<AddressableDefinitionDirectory>()
private val executionSystem by container.inject<ExecutionSystem>()
private val nodeLeaseRenewalFailedHandler by container.inject<NodeLeaseRenewalFailedHandler>()

val actorFactory by container.inject<ActorProxyFactory>()

fun start() = scope.launch {
logger.info("Starting Orbit client...")
val (elapsed, _) = stopwatch(clock) {
// Flip state
localNode.manipulate {
it.copy(clientState = ClientState.CONNECTING)
}

// Scan for capabilities
capabilitiesScanner.scan()
definitionDirectory.setupDefinition(
Expand All @@ -113,6 +123,10 @@ class OrbitClient(val config: OrbitClientConfig = OrbitClientConfig()) {
// Open message channel
connectionHandler.connect()

localNode.manipulate {
it.copy(clientState = ClientState.CONNECTED)
}

// Start tick
ticker.start()
}
Expand All @@ -121,6 +135,9 @@ class OrbitClient(val config: OrbitClientConfig = OrbitClientConfig()) {
}

private suspend fun tick() {
// Keep stream open
connectionHandler.tick()

// See if lease needs renewing
nodeLeaser.tick()

Expand All @@ -134,11 +151,24 @@ class OrbitClient(val config: OrbitClientConfig = OrbitClientConfig()) {
fun stop() = scope.launch {
logger.info("Stopping Orbit...")
val (elapsed, _) = stopwatch(clock) {
localNode.manipulate {
it.copy(clientState = ClientState.STOPPING)
}

// TODO: Wait until placements will stop

// Stop all addressables
executionSystem.stop()

// Stop messaging
connectionHandler.disconnect()

// Stop the tick
ticker.stop()

localNode.manipulate {
it.copy(clientState = ClientState.IDLE)
}
}

logger.info("Orbit stopped successfully in {}ms.", elapsed)
Expand All @@ -149,7 +179,13 @@ class OrbitClient(val config: OrbitClientConfig = OrbitClientConfig()) {
onUnhandledException(throwable)

private fun onUnhandledException(throwable: Throwable) {
logger.error(throwable) { "Unhandled exception in Orbit Client." }
when(throwable) {
is NodeLeaseRenewalFailed -> {
logger.error { "Node lease renewal failed..." }
nodeLeaseRenewalFailedHandler.onLeaseRenewalFailed()
}
else -> logger.error(throwable) { "Unhandled exception in Orbit Client." }
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package orbit.client
import kotlinx.coroutines.CoroutineDispatcher
import orbit.client.addressable.AddressableConstructor
import orbit.client.addressable.DefaultAddressableConstructor
import orbit.client.mesh.NodeLeaseRenewalFailedHandler
import orbit.client.mesh.RestartOnNodeRenewalFailure
import orbit.client.net.OrbitServiceLocator
import orbit.util.concurrent.Pools
import orbit.util.di.ExternallyConfigured
Expand Down Expand Up @@ -75,5 +77,10 @@ data class OrbitClientConfig(
/**
* The amount of time Orbit should wait for the initial join cluster to succeed before failing.
*/
val joinClusterTimeout: Duration = Duration.ofSeconds(30)
val joinClusterTimeout: Duration = Duration.ofSeconds(30),

/**
* How to handle node lease renewal failure
*/
val nodeLeaseRenewalFailedHandler: ExternallyConfigured<NodeLeaseRenewalFailedHandler> = RestartOnNodeRenewalFailure.RestartOnNodeRenewalFailureSingleton
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ package orbit.client.addressable

import kotlinx.coroutines.CompletableDeferred
import orbit.client.execution.ExecutionSystem
import orbit.client.net.ClientState
import orbit.client.net.Completion
import orbit.client.net.LocalNode
import orbit.client.net.MessageHandler
import orbit.client.serializer.Serializer
import orbit.shared.addressable.AddressableInvocation
Expand All @@ -21,6 +23,7 @@ import orbit.util.di.ComponentContainer
internal class InvocationSystem(
private val serializer: Serializer,
private val executionSystem: ExecutionSystem,
private val localNode: LocalNode,
componentContainer: ComponentContainer
) {
private val messageHandler by componentContainer.inject<MessageHandler>()
Expand Down Expand Up @@ -59,6 +62,8 @@ internal class InvocationSystem(
}

fun sendInvocation(invocation: AddressableInvocation): Completion {
check(localNode.status.clientState == ClientState.CONNECTED) { "The Orbit client is not connected"}

val arguments = serializer.serialize(invocation.args)
val msg = Message(
MessageContent.InvocationRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ internal class ExecutionSystem(
}
}

suspend fun stop() {
activeAddressables.forEach {
deactivate(it.value);
}
}

private suspend fun activate(
reference: AddressableReference
): ExecutionHandle? =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright (C) 2015 - 2019 Electronic Arts Inc. All rights reserved.
This file is part of the Orbit Project <https://www.orbit.cloud>.
See license in LICENSE.
*/

package orbit.client.mesh

import kotlinx.coroutines.launch
import mu.KotlinLogging
import orbit.client.OrbitClient
import orbit.util.concurrent.SupervisorScope
import orbit.util.di.ExternallyConfigured

interface NodeLeaseRenewalFailedHandler {
fun onLeaseRenewalFailed()
}

class RestartOnNodeRenewalFailure(private val orbitClient: OrbitClient, private val supervisorScope: SupervisorScope) : NodeLeaseRenewalFailedHandler {
val logger = KotlinLogging.logger { }

object RestartOnNodeRenewalFailureSingleton : ExternallyConfigured<NodeLeaseRenewalFailedHandler> {
override val instanceType = RestartOnNodeRenewalFailure::class.java
}

override fun onLeaseRenewalFailed() {
supervisorScope.launch {
logger.info { "Beginning Orbit restart..." }
orbitClient.stop().join()
orbitClient.start().join()
logger.info { "Restart complete" }

}
}
}

class NodeLeaseRenewalFailed(msg: String) : Throwable(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ internal class NodeLeaser(private val localNode: LocalNode, grpcClient: GrpcClie
}
logger.info("Joined cluster as node '${nodeInfo.id}'.")
}

}
}

Expand All @@ -56,10 +55,14 @@ internal class NodeLeaser(private val localNode: LocalNode, grpcClient: GrpcClie
.build()
).await()

check(renewalResult.status == NodeManagementOuterClass.NodeLeaseResponseProto.Status.OK) { "Node renewal failed" }
if(renewalResult.status != NodeManagementOuterClass.NodeLeaseResponseProto.Status.OK) {
throw NodeLeaseRenewalFailed("Node renewal failed")
}

localNode.manipulate {
it.copy(nodeInfo = renewalResult.info.toNodeInfo())
}

logger.debug("Lease renewed.")
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/orbit-client/src/main/kotlin/orbit/client/net/ClientState.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
Copyright (C) 2015 - 2019 Electronic Arts Inc. All rights reserved.
This file is part of the Orbit Project <https://www.orbit.cloud>.
See license in LICENSE.
*/

package orbit.client.net

enum class ClientState {
IDLE,
CONNECTING,
CONNECTED,
STOPPING
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,37 @@ internal class ConnectionHandler(
}
}

private suspend fun onMessage(message: Message) {
messageHandler.onMessage(message)
fun tick() {
testConnection()
}

fun disconnect() {
if(::connectionChannel.isInitialized) {
connectionChannel.cancel()
messageRails.stopWorkers()
}
}

fun send(msg: Message) {
testConnection();

synchronized(connectionChannel) {
connectionChannel.send(msg.toMessageProto())
}
}

fun disconnect() {
connectionChannel.cancel()
messageRails.stopWorkers()

private suspend fun onMessage(message: Message) {
messageHandler.onMessage(message)
}

private fun testConnection() {
if(::connectionChannel.isInitialized) {
if(connectionChannel.isClosedForReceive) {
logger.warn { "The stream connection is closed. Reopening..." }
disconnect()
connect()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ internal class LocalNode(config: OrbitClientConfig) {
internal data class NodeData(
val serviceLocator: OrbitServiceLocator,
val nodeInfo: NodeInfo? = null,
val capabilities: NodeCapabilities? = null
val capabilities: NodeCapabilities? = null,
val clientState: ClientState = ClientState.IDLE
)
39 changes: 39 additions & 0 deletions src/orbit-client/src/test/kotlin/orbit/client/LoopTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright (C) 2015 - 2019 Electronic Arts Inc. All rights reserved.
This file is part of the Orbit Project <https://www.orbit.cloud>.
See license in LICENSE.
*/

package orbit.client

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import orbit.client.actor.GreeterActor
import orbit.client.actor.createProxy
import orbit.client.net.OrbitServiceLocator
fun main() {
val logger = KotlinLogging.logger { }
val targetUri = "orbit://localhost:50056/test"

val client = OrbitClient(
OrbitClientConfig(
serviceLocator = OrbitServiceLocator(targetUri),
packages = listOf("orbit.client.actor")
)
)

runBlocking {
client.start().join()
val greeter = client.actorFactory.createProxy<GreeterActor>()
do {
try {
val result = greeter.greetAsync("Joe").await()
logger.info { result }
}catch(e: Throwable) {
logger.error { e }
}
delay(10000)
} while(true)
}
}

0 comments on commit d7ce053

Please sign in to comment.