Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tsipinakis committed May 23, 2022
1 parent 57a5eba commit ac3f919
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 30 deletions.
3 changes: 3 additions & 0 deletions config/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func (c KubernetesTimeoutConfig) Validate() error {
type KubernetesExecutionMode string

const (
KubernetesExecutionModeUser KubernetesExecutionMode = "user"
// KubernetesExecutionModeConnection launches one container per SSH connection.
KubernetesExecutionModeConnection KubernetesExecutionMode = "connection"
// KubernetesExecutionModeSession launches one container per SSH session (multiple containers per connection).
Expand All @@ -253,6 +254,8 @@ const (
// Validate validates the execution config.
func (e KubernetesExecutionMode) Validate() error {
switch e {
case KubernetesExecutionModeUser:
fallthrough
case KubernetesExecutionModeConnection:
fallthrough
case KubernetesExecutionModeSession:
Expand Down
2 changes: 2 additions & 0 deletions internal/kubernetes/channelHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ func (c *channelHandler) run(

var err error
switch c.networkHandler.config.Pod.Mode {
case config.KubernetesExecutionModeUser:
fallthrough
case config.KubernetesExecutionModeConnection:
err = c.handleExecModeConnection(ctx, program)
case config.KubernetesExecutionModeSession:
Expand Down
6 changes: 6 additions & 0 deletions internal/kubernetes/kubernetesClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ type kubernetesClient interface {
tty *bool,
cmd []string,
) (kubernetesPod, error)

findPod(
ctx context.Context,
filter_labels map[string]string,
connectionId string,
) (kubernetesPod, error)
}
92 changes: 86 additions & 6 deletions internal/kubernetes/kubernetesClientImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kubernetes

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -12,6 +13,8 @@ import (
"github.com/containerssh/libcontainerssh/message"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
Expand Down Expand Up @@ -63,6 +66,83 @@ loop:
return nil, err
}

func (k *kubernetesClientImpl) getPodList(
ctx context.Context,
filter_labels map[string]string,
) ([]core.Pod, error) {
selector := labels.NewSelector()
for k, v := range filter_labels {
label, err := labels.NewRequirement(k, selection.Equals, []string{v})
if err != nil {
return nil, err
}
selector.Add(*label)
}
result, err := k.client.CoreV1().Pods(k.config.Pod.Metadata.Namespace).List(ctx, meta.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return nil, err
}
if result.Continue != "" || len(result.Items) > 1 {
return nil, fmt.Errorf("Found more than 1 pod matching the labels")
}
if len(result.Items) == 0 {
return nil, nil
}
return result.Items, nil
}

func (k *kubernetesClientImpl) k8sPodToPodImpl(pod core.Pod) kubernetesPodImpl {
return kubernetesPodImpl{
pod: &pod,
client: k.client,
restClient: k.restClient,
config: k.config,
logger: k.logger.WithLabel("podName", pod.Name),
// tty ?
connectionConfig: k.connectionConfig,
backendRequestsMetric: k.backendRequestsMetric,
backendFailuresMetric: k.backendFailuresMetric,
lock: &sync.Mutex{},
wg: &sync.WaitGroup{},
removeLock: &sync.Mutex{},
}
}

func (k *kubernetesClientImpl) findPod(
ctx context.Context,
filter_labels map[string]string,
connectionId string,
) (kubernetesPod, error) {
tries := 0
for {
if tries > 3 {
return nil, fmt.Errorf("Tries exhausted")
}
podList, err := k.getPodList(ctx, filter_labels)
if err != nil {
return nil, err
}
for _, pod := range podList {
_, ok := pod.Annotations["containerssh.io/coordination"]
if !ok {
// should never happen
k.logger.Debug("No coordination found")
continue
}
podImpl := k.k8sPodToPodImpl(pod)
err := podImpl.markInUse(ctx, connectionId)
if err != nil {
k.logger.Debug(err)
continue
}
return &podImpl, nil
}
tries++
}
}

func (k *kubernetesClientImpl) attemptPodCreate(
ctx context.Context,
podConfig containerSSHConfig.KubernetesPodConfig,
Expand Down Expand Up @@ -150,13 +230,13 @@ func (k *kubernetesClientImpl) getPodConfig(
podConfig.Spec.Containers[k.config.Pod.ConsoleContainerNumber].Command = k.config.Pod.IdleCommand
}

k.addLabelsToPodConfig(podConfig, labels)
k.addAnnotationsToPodConfig(podConfig, annotations)
k.addEnvToPodConfig(env, podConfig)
k.addLabelsToPodConfig(&podConfig, labels)
k.addAnnotationsToPodConfig(&podConfig, annotations)
k.addEnvToPodConfig(env, &podConfig)
return podConfig, nil
}

func (k *kubernetesClientImpl) addLabelsToPodConfig(podConfig containerSSHConfig.KubernetesPodConfig, labels map[string]string) {
func (k *kubernetesClientImpl) addLabelsToPodConfig(podConfig *containerSSHConfig.KubernetesPodConfig, labels map[string]string) {
if podConfig.Metadata.Labels == nil {
podConfig.Metadata.Labels = map[string]string{}
}
Expand All @@ -165,7 +245,7 @@ func (k *kubernetesClientImpl) addLabelsToPodConfig(podConfig containerSSHConfig
}
}

func (k *kubernetesClientImpl) addAnnotationsToPodConfig(podConfig containerSSHConfig.KubernetesPodConfig, annotations map[string]string) {
func (k *kubernetesClientImpl) addAnnotationsToPodConfig(podConfig *containerSSHConfig.KubernetesPodConfig, annotations map[string]string) {
if podConfig.Metadata.Annotations == nil {
podConfig.Metadata.Annotations = map[string]string{}
}
Expand All @@ -174,7 +254,7 @@ func (k *kubernetesClientImpl) addAnnotationsToPodConfig(podConfig containerSSHC
}
}

func (k *kubernetesClientImpl) addEnvToPodConfig(env map[string]string, podConfig containerSSHConfig.KubernetesPodConfig) {
func (k *kubernetesClientImpl) addEnvToPodConfig(env map[string]string, podConfig *containerSSHConfig.KubernetesPodConfig) {
for key, value := range env {
podConfig.Spec.Containers[k.config.Pod.ConsoleContainerNumber].Env = append(
podConfig.Spec.Containers[k.config.Pod.ConsoleContainerNumber].Env,
Expand Down
4 changes: 2 additions & 2 deletions internal/kubernetes/kubernetesExecutionImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (k *kubernetesExecutionImpl) handleStream(
close(k.doneChan)
_ = closeWrite()
k.terminalSizeQueue.Stop()
if k.pod.config.Pod.Mode == config.KubernetesExecutionModeConnection {
if k.pod.config.Pod.Mode == config.KubernetesExecutionModeConnection || k.pod.config.Pod.Mode == config.KubernetesExecutionModeUser {
k.pod.wg.Done()
}
if err != nil {
Expand All @@ -321,7 +321,7 @@ func (k *kubernetesExecutionImpl) handleStream(
} else {
k.sendExitCodeToClient(onExit)
}
} else if k.pod.config.Pod.Mode == config.KubernetesExecutionModeConnection {
} else if k.pod.config.Pod.Mode == config.KubernetesExecutionModeConnection || k.pod.config.Pod.Mode == config.KubernetesExecutionModeUser {
onExit(0)
} else {
k.sendExitCodeToClient(onExit)
Expand Down
2 changes: 2 additions & 0 deletions internal/kubernetes/kubernetesPod.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

// kubernetesPod is the representation of a created Pod.
type kubernetesPod interface {
markNotInUse(ctx context.Context, connectionId string) error

// attach attaches to the Pod on the main console.
attach(ctx context.Context) (kubernetesExecution, error)

Expand Down
139 changes: 139 additions & 0 deletions internal/kubernetes/kubernetesPodImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand All @@ -43,6 +45,143 @@ type kubernetesPodImpl struct {
shutdown bool
}

type cooridationAnnotation struct {
Revision uint64
Connections []string
}

type jsonPatchOperation struct {
Operation string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}

func (k *kubernetesPodImpl) markInUse(ctx context.Context, connectionId string) error {
coordination, err := k.fetchCoordination(ctx)
if err != nil {
return err
}
if len(coordination.Connections) == 0 {
return fmt.Errorf("Pod marked for deletion")
}

oldmar, err := json.Marshal(coordination)
if err != nil {
return err
}

coordination.Connections = append(coordination.Connections, connectionId)
coordination.Revision++

mar, err := json.Marshal(coordination)
if err != nil {
return err
}
test := jsonPatchOperation{
Operation: "test",
Path: "/metadata/annotations/containerssh.io~1coordination",
Value: string(oldmar),
}
patch := jsonPatchOperation{
Operation: "add",
Path: "/metadata/annotations/containerssh.io~1coordination",
Value: string(mar),
}
jsonPatch, err := json.Marshal([]jsonPatchOperation{test, patch})
if err != nil {
return err
}

_, err = k.client.CoreV1().Pods(k.pod.Namespace).Patch(ctx, k.pod.Name, types.JSONPatchType, []byte(jsonPatch), meta.PatchOptions{})
if err != nil {
return err
}

return nil
}

func (k *kubernetesPodImpl) fetchCoordination(ctx context.Context) (cooridationAnnotation, error) {
var coordination cooridationAnnotation
result, err := k.client.CoreV1().Pods(k.pod.Namespace).Get(ctx, k.pod.Name, meta.GetOptions{})
if err != nil {
return coordination, err
}

k.logger.Debug("Fetched: ", result.Annotations)

return k.getCoordination(result)
}

func (k *kubernetesPodImpl) getCoordination(pod *core.Pod) (cooridationAnnotation, error) {
var coordination cooridationAnnotation
val, ok := pod.Annotations["containerssh.io/coordination"]
if !ok {
return coordination, fmt.Errorf("Coordination field not found")
}
err := json.Unmarshal([]byte(val), &coordination)
if err != nil {
return coordination, err
}
return coordination, nil
}

func (k *kubernetesPodImpl) markNotInUse(ctx context.Context, connectionId string) error {
coordination, err := k.fetchCoordination(ctx)
if err != nil {
return err
}
if len(coordination.Connections) == 0 {
return fmt.Errorf("Pod already marked for deletion")
}

var i int
for j, val := range coordination.Connections {
if val == connectionId {
i = j
}
}

k.logger.Debug("Coordination before: ", coordination)
coordination.Connections = append(coordination.Connections[:i], coordination.Connections[i+1:]...)
coordination.Revision++

mar, err := json.Marshal(coordination)
if err != nil {
return err
}
test := jsonPatchOperation{
Operation: "test",
Path: "/metadata/annotations/containerssh.io~1coordination",
Value: k.pod.Annotations["containerssh.io/coordination"],
}
patch := jsonPatchOperation{
Operation: "add",
Path: "/metadata/annotations/containerssh.io~1coordination",
Value: string(mar),
}
jsonPatch, err := json.Marshal([]jsonPatchOperation{test, patch})
if err != nil {
return err
}

result, err := k.client.CoreV1().Pods(k.pod.Namespace).Patch(ctx, k.pod.Name, types.JSONPatchType, []byte(jsonPatch), meta.PatchOptions{})
if err != nil {
return err
}

coordination, err = k.getCoordination(result)
if err != nil {
return err
}
k.logger.Debug("Coordination after: ", coordination)
if len(coordination.Connections) == 0 {
k.logger.Debug(message.NewMessage(message.MKubernetesPodRemove, "Removing pod..."))
k.remove(ctx)
}

return nil
}

func (k *kubernetesPodImpl) getExitCode(ctx context.Context) (int32, error) {
var pod *core.Pod
var lastError error
Expand Down

0 comments on commit ac3f919

Please sign in to comment.