Skip to content

Commit

Permalink
Add lease checking pipeline step, currently just rejects messages aft…
Browse files Browse the repository at this point in the history
…er lease expiration. Rename NodeManagement to NodeLeases.
  • Loading branch information
brettmorien committed Sep 10, 2019
1 parent d987ce1 commit 7ce50a4
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 15 deletions.
12 changes: 7 additions & 5 deletions src/orbit-server/src/main/kotlin/orbit/server/OrbitServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import orbit.server.net.GrpcEndpoint
import orbit.server.net.OutgoingConnections
import orbit.server.net.NodeCollection
import orbit.server.net.NodeId
import orbit.server.net.NodeManagement
import orbit.server.net.NodeLeases
import orbit.server.pipeline.Pipeline
import orbit.server.pipeline.steps.AddressablePipelineStep
import orbit.server.pipeline.steps.BlankPipelineStep
import orbit.server.pipeline.steps.LeasePipelineStep
import orbit.server.pipeline.steps.PipelineSteps
import orbit.server.pipeline.steps.RoutingPipelineStep
import orbit.server.routing.AddressableDirectory
Expand Down Expand Up @@ -65,7 +66,7 @@ class OrbitServer(private val config: OrbitServerConfig) {
init {
container.configure {
instance(config.localNode)
instance(NodeManagement.LeaseExpiration(config.leaseExpiration, config.leaseRenewal))
instance(NodeLeases.LeaseExpiration(config.leaseExpiration, config.leaseRenewal))
instance(NodeInfo.LocalServerNodeInfo(NodeId(config.localNode.nodeId.value), host = "0.0.0.0", port = config.grpcPort))
instance(this@OrbitServer)
instance(config)
Expand All @@ -81,12 +82,13 @@ class OrbitServer(private val config: OrbitServerConfig) {
definition<OutgoingConnections>()
definition<IncomingConnections>()
definition<NodeCollection>()
definition<NodeManagement>()
definition<NodeLeases>()

definition<GrpcEndpoint>()

definition<Pipeline>()
definition<BlankPipelineStep>()
definition<LeasePipelineStep>()
definition<AddressablePipelineStep>()
definition<RoutingPipelineStep>()
definition<PipelineSteps>()
Expand Down Expand Up @@ -140,8 +142,8 @@ class OrbitServer(private val config: OrbitServerConfig) {
val outgoingConnections: OutgoingConnections by container.inject()
outgoingConnections.refreshConnections()

val nodeManagement: NodeManagement by container.inject()
nodeManagement.cullLeases()
val nodeLeases: NodeLeases by container.inject()
nodeLeases.cullLeases()
}

private suspend fun onStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data class OrbitServerConfig(
/**
* The node's identity.
*/
val localNode: LocalNodeId = LocalNodeId.generate("router"),
val localNode: LocalNodeId = LocalNodeId.generate("router:"),

/**
* The gRPC endpoint port.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ internal class GrpcClient(
type = value.invocationRequest.reference.type,
id = value.invocationRequest.reference.id)
),
source = id,
target = MessageTarget.Unicast(NodeId("target"))
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.grpc.ServerBuilder
import orbit.common.logging.logger
import orbit.server.OrbitServerConfig

internal class GrpcEndpoint(private val config: OrbitServerConfig, private val incomingConnections: IncomingConnections, private val nodeManagement: NodeManagement) {
internal class GrpcEndpoint(private val config: OrbitServerConfig, private val incomingConnections: IncomingConnections, private val nodeLeases: NodeLeases) {
private lateinit var server: Server

private val logger by logger()
Expand All @@ -21,7 +21,7 @@ internal class GrpcEndpoint(private val config: OrbitServerConfig, private val i

server = ServerBuilder.forPort(config.grpcPort)
.addService(incomingConnections)
.addService(nodeManagement)
.addService(nodeLeases)
.intercept(NodeIdServerInterceptor())
.build()
.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package orbit.server.net

import io.grpc.Status
import io.grpc.StatusException
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.launch
import orbit.common.di.ComponentProvider
Expand All @@ -19,6 +21,7 @@ import orbit.shared.proto.Messages
internal class IncomingConnections(
private val localNode: LocalNodeId,
private val nodeDirectory: NodeDirectory,
private val leases: NodeLeases,
private val container: ComponentProvider
) :
ConnectionGrpc.ConnectionImplBase() {
Expand All @@ -29,8 +32,13 @@ internal class IncomingConnections(
return clients[nodeId]
}

override fun messages(responseObserver: StreamObserver<Messages.Message>): StreamObserver<Messages.Message> {
override fun messages(responseObserver: StreamObserver<Messages.Message>): StreamObserver<Messages.Message>? {
val nodeId = NodeId(NodeIdServerInterceptor.NODE_ID.get())

if (!nodeId.value.startsWith("router") && !leases.checkLease(nodeId)) {
responseObserver.onError(StatusException(Status.UNAUTHENTICATED))
return null
}
val runtimeScopes by container.inject<RuntimeScopes>()

val connection = clients[nodeId] ?: GrpcClient(nodeId, responseObserver, container) {
Expand All @@ -43,6 +51,7 @@ internal class IncomingConnections(
nodeDirectory.join(NodeInfo.ClientNodeInfo(connection.id, listOf(localNode.nodeId)))
}
return connection

}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.util.concurrent.ConcurrentHashMap

typealias ChallengeToken = String

internal class NodeManagement(private val expiration: LeaseExpiration) : NodeManagementGrpc.NodeManagementImplBase() {
internal class NodeLeases(private val expiration: LeaseExpiration) : NodeManagementGrpc.NodeManagementImplBase() {

private val leases = ConcurrentHashMap<NodeId, NodeLease>()

Expand All @@ -43,10 +43,7 @@ internal class NodeManagement(private val expiration: LeaseExpiration) : NodeMan
request: NodeManagementOuterClass.RenewLeaseRequest,
responseObserver: StreamObserver<NodeManagementOuterClass.RenewLeaseResponse>
) {
val nodeId = NodeId(request.nodeIdentity)
if (!leases.containsKey(nodeId) ||
leases[nodeId]!!.challengeToken != request.challengeToken
|| leases[nodeId]!!.expiresAt < ZonedDateTime.now(ZoneOffset.UTC)
if (!checkLease(NodeId(request.nodeIdentity), request.challengeToken)
) {
responseObserver.onError(StatusException(Status.UNAUTHENTICATED))
return
Expand All @@ -70,6 +67,13 @@ internal class NodeManagement(private val expiration: LeaseExpiration) : NodeMan
responseObserver.onCompleted()
}

fun checkLease(nodeId: NodeId, challengeToken: ChallengeToken? = null): Boolean {
val lease = leases[nodeId]
return lease != null &&
lease.expiresAt > ZonedDateTime.now(ZoneOffset.UTC) &&
(challengeToken == null || lease.challengeToken == challengeToken)
}

fun cullLeases() {
val now = ZonedDateTime.now(ZoneOffset.UTC)
val leaseCount = leases.count()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright (C) 2015 - 2019 Electronic Arts Inc. All rights reserved.
This file is part of the Orbit Project <https://www.orbit.cloud>.
See license in LICENSE.
*/

package orbit.server.pipeline.steps

import orbit.server.net.Message
import orbit.server.net.NodeLeases
import orbit.server.pipeline.PipelineContext

internal class LeasePipelineStep(private val nodeLeases: NodeLeases) : PipelineStep {
override suspend fun onInbound(context: PipelineContext, msg: Message) {
if (msg.source != null && this.nodeLeases.checkLease(msg.source)) {
super.onInbound(context, msg)
}
}

override suspend fun onOutbound(context: PipelineContext, msg: Message) {
super.onOutbound(context, msg)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
package orbit.server.pipeline.steps

internal class PipelineSteps(
leasePipelineStep: LeasePipelineStep,
routingPipelineStep: RoutingPipelineStep,
addressablePipelineStep: AddressablePipelineStep
) {
val steps: Array<PipelineStep> = arrayOf(routingPipelineStep, addressablePipelineStep)
val steps: Array<PipelineStep> = arrayOf(
routingPipelineStep,
addressablePipelineStep,
leasePipelineStep
)
}

0 comments on commit 7ce50a4

Please sign in to comment.