Skip to content
This repository has been archived by the owner on Nov 16, 2022. It is now read-only.

Commit

Permalink
import from vamp-kubernetes repo, dialect support; closes #1014
Browse files Browse the repository at this point in the history
  • Loading branch information
dragoslav authored and luciangabor committed Dec 8, 2017
1 parent 00f3ec1 commit e6453d8
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 68 deletions.
@@ -1,6 +1,8 @@
package io.vamp.container_driver.kubernetes

import io.vamp.container_driver.Docker
import org.json4s.native.Serialization._
import org.json4s.{ DefaultFormats, Formats }

case class KubernetesApp(
name: String,
Expand All @@ -12,46 +14,57 @@ case class KubernetesApp(
env: Map[String, String],
cmd: List[String],
args: List[String],
labels: Map[String, String]
labels: Map[String, String],
dialect: Map[String, Any] = Map()
) extends KubernetesArtifact {

override def toString =
s"""
|{
| "apiVersion": "extensions/v1beta1",
| "kind": "Deployment",
| "metadata": {
| "name": "$name"
| },
| "spec": {
| "replicas": $replicas,
| "template": {
| "metadata": {
| ${labels2json(labels)}
| },
| "spec": {
| "containers": [{
| "image": "${docker.image}",
| "name": "$name",
| "env": [${env.map({ case (n, v) s"""{"name": "$n", "value": "$v"}""" }).mkString(", ")}],
| "ports": [${docker.portMappings.map(pm s"""{"name": "p${pm.containerPort}", "containerPort": ${pm.containerPort}, "protocol": "${pm.protocol.toUpperCase}"}""").mkString(", ")}],
| "args": [${args.map(str s""""$str"""").mkString(", ")}],
| "command": [${cmd.map(str s""""$str"""").mkString(", ")}],
| "resources": {
| "requests": {
| "cpu": $cpu,
| "memory": "${mem}Mi"
| }
| },
| "securityContext": {
| "privileged": $privileged
| }
| }]
| }
| }
| }
|}
""".stripMargin
override def toString: String = {

val container: Map[String, Any] = Map[String, Any](
"image" docker.image,
"name" name,
"env" env.map({ case (n, v) Map[String, Any]("name" n, "value" v) }),
"ports" docker.portMappings.map(pm Map[String, Any](
"name" s"p${pm.containerPort}", "containerPort" pm.containerPort, "protocol" pm.protocol.toUpperCase
)),
"args" args,
"command" cmd,
"resources" Map[String, Any](
"requests" Map[String, Any](
"cpu" cpu,
"memory" s"${mem}Mi"
)
),
"securityContext" Map[String, Any]("privileged" privileged)
)

val containerDialect: Map[String, Any] = (dialect.getOrElse("containers", List()) match {
case l: List[_] l.headOption.getOrElse(Map()).asInstanceOf[Map[String, Any]]
case _ Map[String, Any]()
}).filterNot { case (k, _) container.contains(k) }

val deployment = Map(
"apiVersion" "extensions/v1beta1",
"kind" "Deployment",
"metadata" Map("name" name),
"spec" Map(
"replicas" replicas,
"template" Map(
"metadata" labels2map(labels),
"spec" (
dialect ++ Map(
"containers" List(
containerDialect ++ container
)
)
)
)
)
)

implicit val formats: Formats = DefaultFormats
write(deployment)
}
}

case class KubernetesApiResponse(items: List[KubernetesItem] = Nil)
Expand Down
Expand Up @@ -4,21 +4,26 @@ import java.net.URLEncoder

trait KubernetesArtifact {

protected def labels2json(labels: Map[String, String]) = {
val l = labels.filter {
protected def labels2map(labels: Map[String, String]): Map[String, Map[String, String]] = {
val l: Map[String, String] = labels.filter {
case (k, _) k.matches("(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?") && k.length < 64
} filter {
case (_, v) v.isEmpty || (v.matches("^[a-zA-Z0-9].*[a-zA-Z0-9]$") && v.length < 64)
} map {
case (k, v) k v.replaceAll("[^a-zA-Z0-9\\._-]", "_")
} map {
}
Map("labels" l)
}

protected def labels2json(labels: Map[String, String]): String = {
val l = labels2map(labels).getOrElse("labels", Map()).map {
case (k, v) s""""$k": "$v""""
} mkString ", "

s""""labels": {$l}"""
}

protected def labelSelector(labels: Map[String, String]) = {
protected def labelSelector(labels: Map[String, String]): String = {
s"labelSelector=${URLEncoder.encode(labels.map { case (k, v) s"$k=$v" } mkString ",", "UTF-8")}"
}
}
Expand Up @@ -6,6 +6,7 @@ import io.vamp.container_driver.ContainerDriver
import io.vamp.model.artifact._

import scala.concurrent.Future
import scala.util.matching.Regex

trait KubernetesContainerDriver extends ContainerDriver {

Expand All @@ -15,7 +16,7 @@ trait KubernetesContainerDriver extends ContainerDriver {

protected val nameDelimiter = "-"

protected val idMatcher = """^[a-z0-9][a-z0-9-]*$""".r
protected val idMatcher: Regex = """^[a-z0-9][a-z0-9-]*$""".r

protected def workflowNamePrefix: String

Expand Down
Expand Up @@ -9,6 +9,10 @@ import io.vamp.model.reader.{ MegaByte, Quantity }

import scala.concurrent.Future

object KubernetesDeployment {
val dialect = "kubernetes"
}

trait KubernetesDeployment extends KubernetesArtifact {
this: KubernetesContainerDriver with CommonActorLogging

Expand All @@ -22,7 +26,7 @@ trait KubernetesDeployment extends KubernetesArtifact {

private lazy val deploymentUrl = s"$apiUrl/apis/extensions/v1beta1/namespaces/${namespace.name}/deployments"

override protected def supportedDeployableTypes = RktDeployableType :: DockerDeployableType :: Nil
override protected def supportedDeployableTypes: List[DeployableType] = RktDeployableType :: DockerDeployableType :: Nil

protected def schema: Enumeration

Expand Down Expand Up @@ -90,7 +94,22 @@ trait KubernetesDeployment extends KubernetesArtifact {
val id = appId(deployment, service.breed)
if (update) log.info(s"kubernetes update app: $id") else log.info(s"kubernetes create app: $id")

deploy(id, docker(deployment, cluster, service), service.scale.get, environment(deployment, cluster, service), labels(deployment, cluster, service) ++ labels(id, deploymentServiceIdLabel), update)
val (local, dialect) = (deployment.dialects.get(KubernetesDeployment.dialect), cluster.dialects.get(KubernetesDeployment.dialect), service.dialects.get(KubernetesDeployment.dialect)) match {
case (_, _, Some(d)) Some(service) d
case (_, Some(d), None) None d
case (Some(d), None, None) None d
case _ None Map()
}

deploy(
id = id,
docker = docker(deployment, cluster, service),
scale = service.scale.get,
environmentVariables = environment(deployment, cluster, service),
labels = labels(deployment, cluster, service) ++ labels(id, deploymentServiceIdLabel),
update = update,
dialect = interpolate(deployment, local, dialect.asInstanceOf[Map[String, Any]])
)
}

protected def undeploy(deployment: Deployment, service: DeploymentService): Future[Any] = {
Expand Down Expand Up @@ -118,8 +137,17 @@ trait KubernetesDeployment extends KubernetesArtifact {
if (update) log.info(s"kubernetes update workflow: ${workflow.name}") else log.info(s"kubernetes create workflow: ${workflow.name}")

val scale = workflow.scale.get.asInstanceOf[DefaultScale]

deploy(id, docker(workflow), scale, environment(workflow), labels(workflow) ++ labels(id, workflowIdLabel), update)
val dialect = workflow.dialects.getOrElse(KubernetesDeployment.dialect, Map())

deploy(
id = id,
docker = docker(workflow),
scale = scale,
environmentVariables = environment(workflow),
labels = labels(workflow) ++ labels(id, workflowIdLabel),
update = update,
dialect = dialect.asInstanceOf[Map[String, Any]]
)
}

protected def undeploy(workflow: Workflow): Future[Any] = {
Expand All @@ -128,8 +156,7 @@ trait KubernetesDeployment extends KubernetesArtifact {
undeploy(id, workflowIdLabel)
}

private def deploy(id: String, docker: Docker, scale: DefaultScale, environmentVariables: Map[String, String], labels: Map[String, String], update: Boolean): Future[Any] = {

private def deploy(id: String, docker: Docker, scale: DefaultScale, environmentVariables: Map[String, String], labels: Map[String, String], update: Boolean, dialect: Map[String, Any]): Future[Any] = {
val app = KubernetesApp(
name = id,
docker = docker,
Expand All @@ -140,7 +167,8 @@ trait KubernetesDeployment extends KubernetesArtifact {
env = environmentVariables,
cmd = Nil,
args = Nil,
labels = labels
labels = labels,
dialect = dialect
)

if (update) httpClient.put[Any](s"$deploymentUrl/$id", app.toString, apiHeaders) else httpClient.post[Any](deploymentUrl, app.toString, apiHeaders)
Expand Down
@@ -1,7 +1,7 @@
package io.vamp.container_driver.kubernetes

import akka.actor.ActorRef
import io.vamp.common.{ ClassMapper, Config, Lookup, Namespace }
import akka.actor.{ Actor, ActorRef }
import io.vamp.common._
import io.vamp.common.http.HttpClient
import io.vamp.common.vitals.InfoRequest
import io.vamp.container_driver.ContainerDriverActor._
Expand All @@ -22,24 +22,24 @@ class KubernetesDriverActorMapper extends ClassMapper {
object KubernetesDriverActor {

object Schema extends Enumeration {
val Docker = Value
val Docker: Schema.Value = Value
}

private val config = "vamp.container-driver.kubernetes"

val url = Config.string(s"$config.url")
val url: ConfigMagnet[String] = Config.string(s"$config.url")

val workflowNamePrefix = Config.string(s"$config.workflow-name-prefix")
val workflowNamePrefix: ConfigMagnet[String] = Config.string(s"$config.workflow-name-prefix")

val token = Config.string(s"$config.token")
val token: ConfigMagnet[String] = Config.string(s"$config.token")

val bearer = Config.string(s"$config.bearer")
val bearer: ConfigMagnet[String] = Config.string(s"$config.bearer")

val createServices = Config.boolean(s"$config.create-services")
val createServices: ConfigMagnet[Boolean] = Config.boolean(s"$config.create-services")

val vampGatewayAgentId = Config.string(s"$config.vamp-gateway-agent-id")
val vampGatewayAgentId: ConfigMagnet[String] = Config.string(s"$config.vamp-gateway-agent-id")

def serviceType()(implicit namespace: Namespace) = KubernetesServiceType.withName(Config.string(s"$config.service-type")())
def serviceType()(implicit namespace: Namespace): KubernetesServiceType.Value = KubernetesServiceType.withName(Config.string(s"$config.service-type")())
}

case class KubernetesDriverInfo(version: Any, paths: Any, api: Any, apis: Any)
Expand All @@ -54,11 +54,11 @@ class KubernetesDriverActor

import KubernetesDriverActor._

protected val schema = KubernetesDriverActor.Schema
protected val schema: Enumeration = KubernetesDriverActor.Schema

protected val apiUrl = KubernetesDriverActor.url()

protected val apiHeaders = {
protected val apiHeaders: List[(String, String)] = {
def headers(bearer: String) = ("Authorization" s"Bearer $bearer") :: HttpClient.jsonHeaders

if (bearer().nonEmpty) headers(bearer())
Expand All @@ -71,7 +71,7 @@ class KubernetesDriverActor

protected val workflowNamePrefix = KubernetesDriverActor.workflowNamePrefix()

def receive = {
def receive: Actor.Receive = {

case InfoRequest reply(info)

Expand Down Expand Up @@ -102,16 +102,16 @@ class KubernetesDriverActor
ContainerInfo("kubernetes", KubernetesDriverInfo(version, paths, api, apis))
}

protected def get(deploymentServices: List[DeploymentServices]) = {
protected def get(deploymentServices: List[DeploymentServices]): Unit = {
val replyTo = sender()
allContainerServices(deploymentServices).map(_.foreach {
replyTo ! _
})
}

protected def get(workflow: Workflow, replyTo: ActorRef) = containerWorkflow(workflow).map(replyTo ! _)
protected def get(workflow: Workflow, replyTo: ActorRef): Unit = containerWorkflow(workflow).map(replyTo ! _)

protected override def deployedGateways(gateways: List[Gateway]) = {
protected override def deployedGateways(gateways: List[Gateway]): Future[Any] = {
if (createServices()) {
services(gatewayService).map { response
// update service ports
Expand Down
Expand Up @@ -5,7 +5,7 @@ import akka.pattern.ask
import io.vamp.common.ClassMapper
import io.vamp.common.akka.IoC
import io.vamp.container_driver.kubernetes.{ Job, KubernetesDriverActor }
import io.vamp.container_driver.{ ContainerDriverValidation, ContainerDriverMapping, DockerDeployableType }
import io.vamp.container_driver.{ ContainerDriverMapping, ContainerDriverValidation, DeployableType, DockerDeployableType }
import io.vamp.model.artifact._
import io.vamp.model.event.Event
import io.vamp.persistence.PersistenceActor
Expand All @@ -21,7 +21,7 @@ class KubernetesWorkflowActorMapper extends ClassMapper {

class KubernetesWorkflowActor extends DaemonWorkflowDriver with ContainerDriverMapping with ContainerDriverValidation {

override protected lazy val supportedDeployableTypes = DockerDeployableType :: Nil
override protected lazy val supportedDeployableTypes: List[DeployableType] = DockerDeployableType :: Nil

override protected lazy val info: Future[Map[_, _]] = Future.successful(Map("kubernetes" Map("url" KubernetesDriverActor.url())))

Expand Down

0 comments on commit e6453d8

Please sign in to comment.