Skip to content

Commit

Permalink
started kubernetes-client backend
Browse files Browse the repository at this point in the history
  • Loading branch information
timbertson committed Jan 7, 2022
1 parent ba190bf commit a4ba2ba
Show file tree
Hide file tree
Showing 27 changed files with 399 additions and 60 deletions.
@@ -0,0 +1,39 @@
package com.goyeau.kubernetes.client.foperatorext

import com.goyeau.kubernetes.client.operation._
import io.k8s.apimachinery.pkg.apis.meta.v1.{ListMeta, ObjectMeta}
import org.http4s.Uri

// Kubernetes-client doesn't have any shared hierarchy, so we use structural types.
// We also cheekily put this in their namespace so we can use package-private traits.
// This surely breaks semtantic versioning, hopefully the traits become public in the future.
object Types {
type HasMetadata = {
def metadata: Option[ObjectMeta]
}

type ResourceGetters[St] = {
def metadata: Option[ObjectMeta]
def status: Option[St]
}

type ListOf[T] = {
def metadata: Option[ListMeta]
def items: Seq[T]
}

// toplevel API, e.g. PodsApi
type ResourceAPI[IO[_], T<:HasMetadata, TList<:ListOf[T]] = {
def resourceUri: Uri
def namespace(namespace: String): NamespacedResourceAPI[IO, T, TList]
}

// nested API, e.g. NamespacedPodsApi
type NamespacedResourceAPI[IO[_], T<:HasMetadata, TList<:ListOf[T]] =
Replaceable[IO, T]
with Gettable[IO, T]
with Listable[IO, TList]
with Deletable[IO]
with GroupDeletable[IO]
with Watchable[IO, T]
}
@@ -0,0 +1,14 @@
package net.gfxmonk.foperator.backend.kubernetesclientoperator

import com.goyeau.kubernetes.client.crd.{CrdContext, CustomResource}

// a typeclass for CrdContext so we don't have to pass them around manually
trait CrdContextFor[T] {
def ctx: CrdContext
}

object CrdContextFor {
def apply[Sp, St](crdCtx: CrdContext): CrdContextFor[CustomResource[Sp, St]] = new CrdContextFor[CustomResource[Sp, St]] {
override def ctx: CrdContext = crdCtx
}
}
@@ -0,0 +1,216 @@
package net.gfxmonk.foperator.backend.kubernetesclientoperator

import cats.Eq
import cats.effect.concurrent.Deferred
import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.implicits._
import com.goyeau.kubernetes.client.foperatorext.Types.{HasMetadata, ListOf, ResourceAPI, ResourceGetters}
import com.goyeau.kubernetes.client.{EventType, KubeConfig, KubernetesClient}
import fs2.{Chunk, Stream}
import io.k8s.apimachinery.pkg.apis.meta.v1.{ObjectMeta, Time}
import monix.eval.Task
import monix.eval.instances.CatsConcurrentEffectForTask
import monix.execution.Scheduler
import net.gfxmonk.foperator.internal.{BackendCompanion, Logging}
import net.gfxmonk.foperator.types._
import net.gfxmonk.foperator.{Event, Id, ListOptions, Operations}
import org.http4s.{Status, Uri}

import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneOffset}
import scala.concurrent.duration._
import scala.language.reflectiveCalls

class KClient[IO[_] : Concurrent : ContextShift](val underlying: KubernetesClient[IO]) {
val ops = Operations[IO, KClient[IO]](this)
}

object KClient {
def apply[IO[_]](implicit cs: ContextShift[IO], ce: ConcurrentEffect[IO]) = new KClientCompanion[IO]

implicit def engine[IO[_] : Concurrent : ContextShift : Timer, T<:HasMetadata, TList<:ListOf[T]]
(implicit api: HasResourceApi[IO, T, TList], res: ObjectResource[T], io: Sync[IO])
: Engine[IO, KClient[IO], T]
= new EngineImpl[IO, T, TList]

// Internal traits used to tie resources to clients
// (i.e you can only build a KClient engine for types which have
// an instance of HasResourceApi
private [kubernetesclientoperator] trait HasResourceApi[IO[_], T<:HasMetadata, TList<:ListOf[T]] {
def resourceAPI(client: KubernetesClient[IO]): ResourceAPI[IO, T, TList]

// NOTE: not all resources can have their status updated. This will fail
// at runtime if you try to use it on the wrong type, because
// encoding this in the type system sounds like too much hard work ;)
def updateStatus(client: KubernetesClient[IO], t: T): IO[Status]
}

// we need a fake client instance just to get api endpoint strings.
// This is definitely using it wrong, but we only need a few strings,
// and they're only used in the TestClient backend :shrug:
// TODO feature request: expose kind somewhere more accessible
private val dummyClient = {
implicit val io: CatsConcurrentEffectForTask = Task.catsEffect(Scheduler.global)
KubernetesClient(KubeConfig.of[Task](
Uri.fromString("https://127.0.0.1/fake-kubernetes").toOption.get
)).use(Task.pure).runSyncUnsafe(10.seconds)(Scheduler.global, implicitly)
}

class ResourceImpl[IO[_], St, T<:ResourceGetters[St], TList<:ListOf[T]](
getApi: KubernetesClient[IO] => ResourceAPI[IO, T, TList],
withMeta: (T, ObjectMeta) => T,
withStatusFn: (T, St) => T,
updateStatusFn: (KubernetesClient[IO], Id[T], T) => IO[Status],
eq: Eq[T] = Eq.fromUniversalEquals[T],
eqSt: Eq[St] = Eq.fromUniversalEquals[St],
) extends ObjectResource[T] with HasStatus[T, St] with HasResourceApi[IO, T, TList] with Eq[T] {

private def meta(t: T) = t.metadata.getOrElse(ObjectMeta())

override val eqStatus: Eq[St] = eqSt

override def status(obj: T): Option[St] = obj.status

override def withStatus(obj: T, status: St): T = withStatusFn(obj, status)

override def kind: String = getApi(dummyClient.asInstanceOf[KubernetesClient[IO]]).resourceUri.path.segments.lastOption.map(_.toString).getOrElse("UNKNOWN")

override def finalizers(t: T): List[String] = t.metadata.flatMap(_.finalizers).map(_.toList).getOrElse(Nil)

override def replaceFinalizers(t: T, f: List[String]): T = withMeta(t, meta(t).copy(finalizers = Some(f)))

override def version(t: T): String = t.metadata.flatMap(_.resourceVersion).getOrElse("")

override def id(t: T): Id[T] = t.metadata.map { m =>
Id[T](m.namespace.getOrElse(""), m.name.getOrElse(""))
}.getOrElse(Id[T]("", ""))

override def isSoftDeleted(t: T): Boolean = t.metadata.flatMap(_.deletionTimestamp).isDefined

override def eqv(x: T, y: T): Boolean = eq.eqv(x, y)

override def withVersion(t: T, newVersion: String): T =
withMeta(t, meta(t).copy(resourceVersion = Some(newVersion)))

override def softDeletedAt(t: T, time: Instant): T =
withMeta(t, meta(t).copy(deletionTimestamp = Some(Time(time.atZone(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME)))))

override def resourceAPI(client: KubernetesClient[IO]): ResourceAPI[IO, T, TList] = getApi(client)

override def updateStatus(client: KubernetesClient[IO], t: T): IO[Status] = updateStatusFn(client, id(t), t)
}

class StatusError(val status: Status) extends RuntimeException(status.toString)

private class EngineImpl[IO[_], T<:HasMetadata, TList<:ListOf[T]]
(implicit api: HasResourceApi[IO, T, TList], res: ObjectResource[T], io: Concurrent[IO], timer: Timer[IO])
extends Engine[IO, KClient[IO], T] with Logging
{
private def ns(c: KClient[IO], id: Id[T]) =
api.resourceAPI(c.underlying).namespace(id.namespace)

// TODO does this happen by default? feels liks it should, and if not the kclient readme is missing error handling
private def handleResponse(status: IO[Status]): IO[Unit] = status.flatMap { st =>
if (st.isSuccess) {
io.unit
} else {
io.raiseError(new StatusError(st))
}
}

override def classifyError(e: Throwable): ClientError = e match {
case s: StatusError => s.status match {
case Status.NotFound => ClientError.NotFound(e)
case Status.Conflict => ClientError.VersionConflict(e)
case _ => ClientError.Unknown(e)
}
// TODO does kclient throw its own errors?
case other => ClientError.Unknown(e)
}

// TODO this doesn't report 404 correctly
override def read(c: KClient[IO], t: Id[T]): IO[Option[T]] = ns(c, t).get(t.name).map(Some.apply)

override def write(c: KClient[IO], t: T): IO[Unit] = handleResponse(ns(c, res.id(t)).replace(t))

override def writeStatus[St](c: KClient[IO], t: T, st: St)(implicit sub: HasStatus[T, St]): IO[Unit] =
handleResponse(api.updateStatus(c.underlying, sub.withStatus(t, st)))

override def delete(c: KClient[IO], id: Id[T]): IO[Unit] = handleResponse(ns(c, id).delete(id.name))

override def listAndWatch(c: KClient[IO], opts: ListOptions): IO[(List[T], fs2.Stream[IO, Event[T]])] = {
val ns = api.resourceAPI(c.underlying).namespace(opts.namespace)
if (opts.fieldSelector.nonEmpty) {
// TODO: feature request
io.raiseError(new RuntimeException(s"kubernetes-client backend does not support fieldSelector in opts: ${opts}"))
} else {
val validateLabels = opts.labelSelector.traverse[IO, (String, String)] { l =>
l.split("=", 2) match {
case Array(k,v) => io.pure((k,v))
case _ =>
// TODO: feature-request
io.raiseError(new RuntimeException(s"kubernetes-client backend only supports equality-based labels, you provided: ${l}"))
}
}.map(_.toMap)

for {
labels <- validateLabels
initial <- ns.list(labels)
// TODO: feature request: accept resourceVersion in watch request
// (the below reconcile code could then be removed)
updates = ns.watch(labels)
startReconcile <- Deferred[IO, Unit]
} yield {
val triggerReconcile = startReconcile.complete(()).attempt.void
val events = updates.zipWithIndex.evalMap[IO, Event[T]] {
case (Left(err), _) => io.raiseError(new RuntimeException(s"Error watching ${res.kind} resources: $err"))
case (Right(event), idx) => {
(if (idx === 0) triggerReconcile else io.unit) >> (event.`type` match {
case EventType.ADDED | EventType.MODIFIED => io.pure(Event.Updated(event.`object`))
case EventType.DELETED => io.pure(Event.Deleted(event.`object`))
case EventType.ERROR => io.raiseError(new RuntimeException(s"Error watching ${res.kind} resources: ${event}"))
})
}
}

// we do a reconcile on the first update event,
val reconcile: IO[List[Event[T]]] = startReconcile.get.flatMap(_ => ns.list(labels)).map { secondary =>
val initialMap = initial.items.map(r => (res.id(r), r)).toMap
val secondaryMap = secondary.items.map(r => (res.id(r), r)).toMap
val allIds = (initialMap.keys ++ secondaryMap.keys).toSet
allIds.toList.flatMap[Event[T]] { id =>
(initialMap.get(id), secondaryMap.get(id)) match {
case (None, Some(current)) => Some(Event.Updated(current))
case (Some(prev), None) => Some(Event.Deleted(prev))
case (Some(prev), Some(current)) => {
if (res.version(prev) === res.version(current)) {
None
} else {
Some(Event.Updated(current))
}
}
}
}
}
val reconcileStream = Stream.evalUnChunk(reconcile.map(Chunk.apply))
val reconcileAfterDelay = Stream.evalUnChunk(timer.sleep(20.seconds) >> triggerReconcile.as(Chunk.empty[Event[T]]))
(initial.items.toList, events.merge(reconcileStream).merge(reconcileAfterDelay))
}
}
}
}
}

class KClientCompanion[IO[_]](implicit cs: ContextShift[IO], ce: ConcurrentEffect[IO]) extends BackendCompanion[IO, KClient[IO]] {

def wrap(underlying: KubernetesClient[IO]) = new KClient(underlying)

def apply(config: KubeConfig) = KubernetesClient[IO](config)

def apply[F[_]: ConcurrentEffect: ContextShift](config: F[KubeConfig]): Resource[F, KubernetesClient[F]] = KubernetesClient(config)

type Ops[T] = Operations[IO, KClient[IO], T]

}


@@ -0,0 +1,50 @@
package net.gfxmonk.foperator.backend.kubernetesclientoperator

import cats.Eq
import cats.effect.Async
import com.goyeau.kubernetes.client.KubernetesClient
import com.goyeau.kubernetes.client.crd.{CustomResource, CustomResourceList}
import io.circe.{Decoder, Encoder}
import io.k8s.api.core.v1.{Pod, PodList, PodStatus}
import io.k8s.apimachinery.pkg.apis.meta.v1.ObjectMeta
import net.gfxmonk.foperator
import net.gfxmonk.foperator.Id
import org.http4s.Status

import scala.reflect.ClassTag

package object implicits extends foperator.CommonImplicits {
import KClient._

// implicits that don't have a better place
implicit val metadataEq: Eq[ObjectMeta] = Eq.fromUniversalEquals

private def cantUpdateStatus[IO[_], T]
(implicit io: Async[IO], ct: ClassTag[T]): (KubernetesClient[IO], Id[T], T) => IO[Status] =
(_, _, _) => io.raiseError(new RuntimeException(
s"resource ${ct.getClass.getSimpleName} does not have a status sub-resource. " +
"(if it should, please update kubernetesclientoperator.implicits to expose it)")
)

implicit def implicitPodResource[IO[_]: Async]: ResourceImpl[IO, PodStatus, Pod, PodList] =
new ResourceImpl[IO, PodStatus, Pod, PodList](
_.pods,
(o, m) => o.copy(metadata=Some(m)),
(o, s) => o.copy(status=Some(s)),
cantUpdateStatus[IO, Pod]
)

implicit def implicitCustomResource[IO[_], Sp : Encoder: Decoder, St: Encoder: Decoder]
(implicit crd: CrdContextFor[CustomResource[Sp, St]])
: ResourceImpl[IO, St, CustomResource[Sp,St], CustomResourceList[Sp, St]]
= new ResourceImpl[IO, St, CustomResource[Sp, St], CustomResourceList[Sp, St]](
_.customResources[Sp, St](crd.ctx),
(o, m) => o.copy(metadata=Some(m)),
(o, s) => o.copy(status=Some(s)),
(client: KubernetesClient[IO], id: Id[CustomResource[Sp, St]], t: CustomResource[Sp, St]) => {
client.customResources[Sp,St](crd.ctx).namespace(id.namespace).updateStatus(id.name, t)
}
)

// TODO remaining builtin types
}
Expand Up @@ -82,13 +82,13 @@ object Skuber extends BackendCompanion[Task, Skuber] {
override def read(c: Skuber, t: Id[T]): Task[Option[T]] =
Task.deferFuture(c.underlying.usingNamespace(t.namespace).getOption(t.name))

override def write(c: Skuber, t: T): Task[T] =
Task.deferFuture(c.underlying.update(t))
override def write(c: Skuber, t: T): Task[Unit] =
Task.deferFuture(c.underlying.update(t)).void

override def writeStatus[St](c: Skuber, t: T, st: St)(implicit sub: HasStatus[T, St]): Task[T] = {
override def writeStatus[St](c: Skuber, t: T, st: St)(implicit sub: HasStatus[T, St]): Task[Unit] = {
// we assume that HasStatus corresponds to substatus
implicit val skuberStatus: HasStatusSubresource[T] = new skuber.HasStatusSubresource[T] {}
Task.deferFuture(c.underlying.updateStatus(sub.withStatus(t, st)))
Task.deferFuture(c.underlying.updateStatus(sub.withStatus(t, st))).void
}

override def delete(c: Skuber, id: Id[T]): Task[Unit] =
Expand All @@ -111,8 +111,10 @@ object Skuber extends BackendCompanion[Task, Skuber] {
fieldSelector = fieldSelector,
)

Task.deferFuture(c.underlying.listWithOptions[skuber.ListResource[T]](listOptions)).map { listResource =>
val source = c.underlying.watchWithOptions[T](listOptions.copy(
val namespaced = c.underlying.usingNamespace(opts.namespace)

Task.deferFuture(namespaced.listWithOptions[skuber.ListResource[T]](listOptions)).map { listResource =>
val source = namespaced.watchWithOptions[T](listOptions.copy(
resourceVersion = Some(listResource.resourceVersion),
timeoutSeconds = Some(30) // TODO configurable?
))
Expand All @@ -122,7 +124,7 @@ object Skuber extends BackendCompanion[Task, Skuber] {
e._type match {
case EventType.ADDED | EventType.MODIFIED => io.pure(Event.Updated(e._object))
case EventType.DELETED => io.pure(Event.Deleted(e._object))
case EventType.ERROR | _ => io.raiseError(new RuntimeException("Error watching resources (but skuber doesn't say what it was)"))
case EventType.ERROR | _ => io.raiseError(new RuntimeException(s"Error watching resources: $e"))
}
}
(listResource.items, updates)
Expand Down
Expand Up @@ -8,7 +8,7 @@ import net.gfxmonk.foperator.types._
import skuber.apiextensions.CustomResourceDefinition
import skuber.{CustomResource, ObjectMeta, ResourceDefinition}

import java.time.ZonedDateTime
import java.time.{Instant, ZoneOffset, ZonedDateTime}

package object implicits extends foperator.CommonImplicits {
// implicits that don't have a better place
Expand Down Expand Up @@ -40,22 +40,17 @@ package object implicits extends foperator.CommonImplicits {

override def kind: String = rd.spec.names.kind

override def apiPrefix: String = rd.spec.apiPathPrefix

override def finalizers(t: T): List[String] = t.metadata.finalizers.getOrElse(Nil)

override def replaceFinalizers(t: T, f: List[String]): T = ed.updateMetadata(t, t.metadata.copy(finalizers = Some(f)))

override def version(t: T): Option[Int] = t.metadata.resourceVersion match {
case "" => None
case nonempty => Some(nonempty.toInt)
}
override def version(t: T): String = t.metadata.resourceVersion

override def withVersion(t: T, newVersion: Int): T = ed.updateMetadata(t, t.metadata.copy(resourceVersion = newVersion.toString))
override def withVersion(t: T, newVersion: String): T = ed.updateMetadata(t, t.metadata.copy(resourceVersion = newVersion))

override def deletionTimestamp(t: T): Option[ZonedDateTime] = t.metadata.deletionTimestamp
override def isSoftDeleted(t: T): Boolean = t.metadata.deletionTimestamp.isDefined

override def withDeleted(t: T, timestamp: ZonedDateTime): T = ed.updateMetadata(t, t.metadata.copy(deletionTimestamp = Some(timestamp)))
override def softDeletedAt(t: T, timestamp: Instant): T = ed.updateMetadata(t, t.metadata.copy(deletionTimestamp = Some(timestamp.atZone(ZoneOffset.UTC))))
}

// TODO shouldn't skuber provide this?
Expand Down

0 comments on commit a4ba2ba

Please sign in to comment.