Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controller-runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation(libs.clikt)
implementation(libs.spring.crypto)
implementation(libs.spotify.completablefutures)
implementation(libs.envoy.controlplane)
}

application {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package app.simplecloud.controller.runtime

import app.simplecloud.controller.runtime.database.DatabaseFactory
import app.simplecloud.controller.runtime.droplet.ControllerDropletService
import app.simplecloud.controller.runtime.droplet.DropletRepository
import app.simplecloud.controller.runtime.envoy.ControlPlaneServer
import app.simplecloud.controller.runtime.group.GroupRepository
import app.simplecloud.controller.runtime.group.GroupService
import app.simplecloud.controller.runtime.host.ServerHostRepository
Expand Down Expand Up @@ -29,11 +32,13 @@ class ControllerRuntime(
private val database = DatabaseFactory.createDatabase(controllerStartCommand.databaseUrl)
private val authCallCredentials = AuthCallCredentials(controllerStartCommand.authSecret)

private val dropletRepository = DropletRepository()
private val groupRepository = GroupRepository(controllerStartCommand.groupPath)
private val numericalIdRepository = ServerNumericalIdRepository()
private val serverRepository = ServerRepository(database, numericalIdRepository)
private val hostRepository = ServerHostRepository()
private val pubSubService = PubSubService()
private val controlPlaneServer = ControlPlaneServer(controllerStartCommand, dropletRepository)
private val authServer = OAuthServer(controllerStartCommand, database)
private val reconciler = Reconciler(
groupRepository,
Expand All @@ -50,6 +55,7 @@ class ControllerRuntime(
logger.info("Starting controller")
setupDatabase()
startAuthServer()
startControlPlaneServer()
startPubSubGrpcServer()
startGrpcServer()
startReconciler()
Expand Down Expand Up @@ -80,6 +86,11 @@ class ControllerRuntime(

}

private fun startControlPlaneServer() {
logger.info("Starting envoy control plane...")
controlPlaneServer.start()
}

private fun setupDatabase() {
logger.info("Setting up database...")
database.setup()
Expand Down Expand Up @@ -154,6 +165,7 @@ class ControllerRuntime(
)
)
)
.addService(ControllerDropletService(dropletRepository))
.intercept(AuthSecretInterceptor(controllerStartCommand.grpcHost, controllerStartCommand.authorizationPort))
.build()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package app.simplecloud.controller.runtime.droplet

import app.simplecloud.droplet.api.droplet.Droplet
import build.buf.gen.simplecloud.controller.v1.*
import io.grpc.Status
import io.grpc.StatusException

class ControllerDropletService(private val dropletRepository: DropletRepository) :
ControllerDropletServiceGrpcKt.ControllerDropletServiceCoroutineImplBase() {
override suspend fun getDroplet(request: GetDropletRequest): GetDropletResponse {
val droplet = dropletRepository.find(request.type, request.id)
?: throw StatusException(Status.NOT_FOUND.withDescription("This Droplet does not exist"))
return getDropletResponse { this.definition = droplet.toDefinition() }
}

override suspend fun getAllDroplets(request: GetAllDropletsRequest): GetAllDropletsResponse {
val allDroplets = dropletRepository.getAll()
return getAllDropletsResponse {
definition.addAll(allDroplets.map { it.toDefinition() })
}
}

override suspend fun getDropletsByType(request: GetDropletsByTypeRequest): GetDropletsByTypeResponse {
val type = request.type
val typedDroplets = dropletRepository.getAll().filter { it.type == type }
return getDropletsByTypeResponse {
definition.addAll(typedDroplets.map { it.toDefinition() })
}
}

override suspend fun registerDroplet(request: RegisterDropletRequest): RegisterDropletResponse {
dropletRepository.find(request.definition.type, request.definition.id)
?: throw StatusException(Status.NOT_FOUND.withDescription("This Droplet does not exist"))
val droplet = Droplet.fromDefinition(request.definition)

try {
dropletRepository.save(droplet)
} catch (e: Exception) {
throw StatusException(Status.INTERNAL.withDescription("Error whilst updating Droplet").withCause(e))
}
return registerDropletResponse { this.definition = droplet.toDefinition() }
}

override suspend fun unregisterDroplet(request: UnregisterDropletRequest): UnregisterDropletResponse {
val droplet = dropletRepository.find(request.id)
?: throw StatusException(Status.NOT_FOUND.withDescription("This Droplet does not exist"))
val deleted = dropletRepository.delete(droplet)
if (!deleted) throw StatusException(Status.NOT_FOUND.withDescription("Could not delete this Droplet"))
return unregisterDropletResponse { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package app.simplecloud.controller.runtime.droplet

import app.simplecloud.controller.runtime.Repository
import app.simplecloud.controller.runtime.envoy.DropletCache
import app.simplecloud.droplet.api.droplet.Droplet
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

class DropletRepository : Repository<Droplet, String> {

private val currentDroplets = mutableListOf<Droplet>()
private val dropletCache = DropletCache(this)

override suspend fun getAll(): List<Droplet> {
return currentDroplets
}

override suspend fun find(identifier: String): Droplet? {
return currentDroplets.firstOrNull { it.id == identifier }
}

fun find(type: String, identifier: String): Droplet? {
return currentDroplets.firstOrNull { it.type == type && it.id == identifier }
}

override fun save(element: Droplet) {
val updated = managePortRange(element)
val droplet = find(element.type, element.id)
if (droplet != null) {
currentDroplets[currentDroplets.indexOf(droplet)] = updated
return
}
currentDroplets.add(updated)
CoroutineScope(Dispatchers.IO).launch {
dropletCache.update()
}
}

private fun managePortRange(element: Droplet): Droplet {
if (!currentDroplets.any { it.envoyPort == element.envoyPort }) return element
return managePortRange(element.copy(envoyPort = element.envoyPort + 1))
}

override suspend fun delete(element: Droplet): Boolean {
val found = find(element.type, element.id) ?: return false
if (!currentDroplets.remove(found)) return false
dropletCache.update()
return true
}

fun getAsDropletCache(): DropletCache {
return dropletCache
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package app.simplecloud.controller.runtime.envoy

import app.simplecloud.controller.runtime.droplet.DropletRepository
import app.simplecloud.controller.runtime.launcher.ControllerStartCommand
import app.simplecloud.droplet.api.droplet.Droplet
import io.envoyproxy.controlplane.server.V3DiscoveryServer
import io.grpc.ServerBuilder
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager

/**
* @see <a href="https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#xds-protocol-ads">ADS Documentation</a>
*/
class ControlPlaneServer(private val args: ControllerStartCommand, private val dropletRepository: DropletRepository) {
private val server = V3DiscoveryServer(dropletRepository.getAsDropletCache().getCache())
private val logger = LogManager.getLogger(ControlPlaneServer::class.java)

fun start() {
val serverBuilder = ServerBuilder.forPort(args.envoyDiscoveryPort)
register(serverBuilder)
val server = serverBuilder.build()
CoroutineScope(Dispatchers.IO).launch {
try {
server.start()
server.awaitTermination()
} catch (e: Exception) {
logger.warn("Error in envoy control server server", e)
throw e
}
}
registerSelf()
}

private fun registerSelf() {
dropletRepository.save(
Droplet(
type = "controller",
id = "internal-controller",
host = args.grpcHost,
port = args.grpcPort,
envoyPort = 8080,
)
)
}

private fun register(builder: ServerBuilder<*>) {
logger.info("Registering envoy ADS...")
builder.addService(server.aggregatedDiscoveryServiceImpl)
}
}
Loading