Skip to content

Commit

Permalink
feat: Add simple logs viewer for targets/results
Browse files Browse the repository at this point in the history
  • Loading branch information
codablock committed Aug 11, 2023
1 parent 694df3b commit 0e4f50e
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 24 deletions.
8 changes: 5 additions & 3 deletions cmd/kluctl/commands/cmd_webui.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,20 @@ func (cmd *webuiCmd) Run(ctx context.Context) error {

var authConfig webui.AuthConfig

var inClusterConfig *rest.Config
var inClusterClient client.Client
if cmd.InCluster {
configOverrides := &clientcmd.ConfigOverrides{
CurrentContext: cmd.InClusterContext,
}
config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
var err error
inClusterConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
configOverrides).ClientConfig()
if err != nil {
return err
}
inClusterClient, err = client.NewWithWatch(config, client.Options{})
inClusterClient, err = client.NewWithWatch(inClusterConfig, client.Options{})
if err != nil {
return err
}
Expand Down Expand Up @@ -154,7 +156,7 @@ func (cmd *webuiCmd) Run(ctx context.Context) error {
sbw := webui.NewStaticWebuiBuilder(collector)
return sbw.Build(cmd.StaticPath)
} else {
server, err := webui.NewCommandResultsServer(ctx, collector, configs, inClusterClient, authConfig, cmd.OnlyApi)
server, err := webui.NewCommandResultsServer(ctx, collector, configs, inClusterConfig, inClusterClient, authConfig, cmd.OnlyApi)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions install/webui/webui/webui-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ rules:
- apiGroups: ["gitops.kluctl.io"]
resources: ["kluctldeployments"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods", "pods/log"]
verbs: ["get", "list", "watch"]
# allow to impersonate other users, groups and serviceaccounts
- apiGroups: [""]
resources: ["users", "groups", "serviceaccounts"]
Expand Down
22 changes: 21 additions & 1 deletion pkg/webui/clusteraccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
Expand Down Expand Up @@ -114,11 +115,21 @@ func (ca *clusterAccessor) getClient(asUser string, asGroups []string) (client.C
return ca.getClientLocked(asUser, asGroups)
}

func (ca *clusterAccessor) getClientLocked(asUser string, asGroups []string) (client.Client, error) {
func (ca *clusterAccessor) getCoreV1Client(asUser string, asGroups []string) (*v1.CoreV1Client, error) {
ca.mutex.Lock()
defer ca.mutex.Unlock()
return ca.getCoreV1ClientLocked(asUser, asGroups)
}

func (ca *clusterAccessor) getImpersonatedConfig(asUser string, asGroups []string) *rest.Config {
config := rest.CopyConfig(ca.config)
config.Impersonate.UserName = asUser
config.Impersonate.Groups = asGroups
return config
}

func (ca *clusterAccessor) getClientLocked(asUser string, asGroups []string) (client.Client, error) {
config := ca.getImpersonatedConfig(asUser, asGroups)
c, err := client.NewWithWatch(config, client.Options{
Scheme: ca.scheme,
Mapper: ca.mapper,
Expand All @@ -129,6 +140,15 @@ func (ca *clusterAccessor) getClientLocked(asUser string, asGroups []string) (cl
return c, nil
}

func (ca *clusterAccessor) getCoreV1ClientLocked(asUser string, asGroups []string) (*v1.CoreV1Client, error) {
config := ca.getImpersonatedConfig(asUser, asGroups)
c, err := v1.NewForConfig(config)
if err != nil {
return nil, err
}
return c, nil
}

func (ca *clusterAccessor) getK(ctx context.Context, asUser string, asGroups []string) (*k8s2.K8sCluster, error) {
ca.mutex.Lock()
defer ca.mutex.Unlock()
Expand Down
25 changes: 18 additions & 7 deletions pkg/webui/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io/fs"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"net"
"net/http"
Expand All @@ -33,7 +34,8 @@ type CommandResultsServer struct {
cam *clusterAccessorManager

// this is the client for the k8s cluster where the server runs on
serverClient client.Client
serverClient client.Client
serverCoreV1Client *corev1.CoreV1Client

auth *authHandler
events *eventsHandler
Expand All @@ -45,22 +47,29 @@ func NewCommandResultsServer(
ctx context.Context,
store *results.ResultsCollector,
configs []*rest.Config,
serverConfig *rest.Config,
serverClient client.Client,
authConfig AuthConfig,
onlyApi bool) (*CommandResultsServer, error) {

coreV1Client, err := corev1.NewForConfig(serverConfig)
if err != nil {
return nil, err
}

ret := &CommandResultsServer{
ctx: ctx,
store: store,
cam: &clusterAccessorManager{
ctx: ctx,
},
serverClient: serverClient,
onlyApi: onlyApi,
serverClient: serverClient,
serverCoreV1Client: coreV1Client,
onlyApi: onlyApi,
}

ret.events = newEventsHandler(ret)

var err error
ret.auth, err = newAuthHandler(ctx, serverClient, authConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -122,6 +131,8 @@ func (s *CommandResultsServer) Run(host string, port int) error {
}
api.Any("/events", s.events.handler)

api.Any("/logs", s.logsHandler)

address := fmt.Sprintf("%s:%d", host, port)
listener, err := net.Listen("tcp", address)
if err != nil {
Expand Down Expand Up @@ -405,9 +416,9 @@ func (s *CommandResultsServer) doModifyKluctlDeployment(c *gin.Context, clusterI
}

type KluctlDeploymentParam struct {
Cluster string `json:"cluster"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Cluster string `json:"cluster" form:"cluster"`
Name string `json:"name" form:"name"`
Namespace string `json:"namespace" form:"namespace"`
}

func (s *CommandResultsServer) doSetAnnotation(c *gin.Context, aname string, avalue string) {
Expand Down
106 changes: 106 additions & 0 deletions pkg/webui/server_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package webui

import (
"bufio"
"encoding/json"
"github.com/gin-gonic/gin"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"nhooyr.io/websocket"
)

func (s *CommandResultsServer) logsHandler(gctx *gin.Context) {
args := struct {
KluctlDeploymentParam
ReconcileId string `form:"reconcileId"`
}{}
err := gctx.BindQuery(&args)
if err != nil {
_ = gctx.AbortWithError(http.StatusBadRequest, err)
return
}

conn, err := acceptWebsocket(gctx)
if err != nil {
return
}
defer conn.Close(websocket.StatusInternalError, "the sky is falling")

ctx := conn.CloseRead(gctx)

controllerNamespace := "kluctl-system"
pods, err := s.serverCoreV1Client.Pods(controllerNamespace).List(s.ctx, metav1.ListOptions{
LabelSelector: "control-plane=kluctl-controller",
})
if err != nil {
_ = gctx.AbortWithError(http.StatusBadRequest, err)
return
}

var streams []io.ReadCloser
defer func() {
for _, s := range streams {
_ = s.Close()
}
}()

lineCh := make(chan string)

for _, pod := range pods.Items {
since := int64(60 * 5)
rc, err := s.serverCoreV1Client.Pods("kluctl-system").GetLogs(pod.Name, &corev1.PodLogOptions{
Follow: true,
SinceSeconds: &since,
}).Stream(s.ctx)
if err != nil {
if errors.IsNotFound(err) {
continue
}
_ = gctx.AbortWithError(http.StatusBadRequest, err)
return
}
streams = append(streams, rc)
}

for _, s := range streams {
s := s
go func() {
sc := bufio.NewScanner(s)
for sc.Scan() {
l := sc.Text()
lineCh <- l
}
}()
}

for l := range lineCh {
var j map[string]any
err = json.Unmarshal([]byte(l), &j)
if err != nil {
continue
}

name := j["name"]
namespace := j["namespace"]

if args.Name != "" && name != args.Name {
continue
}
if args.Namespace != "" && namespace != args.Namespace {
continue
}

rid := j["reconcileID"]
if args.ReconcileId != "" && rid != args.ReconcileId {
continue
}

err = conn.Write(ctx, websocket.MessageText, []byte(l))
if err != nil {
return
}
}
}
64 changes: 51 additions & 13 deletions pkg/webui/ui/src/api.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface Api {
reconcileNow(cluster: string, name: string, namespace: string): Promise<Response>
deployNow(cluster: string, name: string, namespace: string): Promise<Response>
setSuspended(cluster: string, name: string, namespace: string, suspend: boolean): Promise<Response>
watchLogs(cluster: string | undefined, name: string | undefined, namespace: string | undefined, reconcileId: string | undefined, handle: (lines: any[]) => void): () => void
}

export async function checkStaticBuild() {
Expand Down Expand Up @@ -103,6 +104,23 @@ export class RealApi implements Api {
return resp
}

doWebsocket(path: string, params: URLSearchParams) {
let host = window.location.host
let proto = "wss"
if (process.env.NODE_ENV === 'development') {
host = "localhost:9090"
}
if (window.location.protocol !== "https:") {
proto = "ws"
}
let url = `${proto}://${host}${rootPath}${path}`
url += "?" + params.toString()

console.log("ws connect: " + url)
const ws = new WebSocket(url)
return ws
}

async getAuthInfo(): Promise<AuthInfo> {
return this.doGet("/auth/info")
}
Expand All @@ -116,24 +134,13 @@ export class RealApi implements Api {
}

async listenEvents(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void> {
let host = window.location.host
let proto = "wss"
if (process.env.NODE_ENV === 'development') {
host = "localhost:9090"
}
if (window.location.protocol !== "https:") {
proto = "ws"
}
let url = `${proto}://${host}${rootPath}/api/events`

const params = new URLSearchParams()
if (filterProject) {
params.set("filterProject", filterProject)
}
if (filterSubDir) {
params.set("filterSubDir", filterSubDir)
}
url += "?" + params.toString()

let ws: WebSocket | undefined;
let cancelled = false
Expand All @@ -143,8 +150,7 @@ export class RealApi implements Api {
return
}

console.log("ws connect: " + url)
ws = new WebSocket(url);
ws = this.doWebsocket("/api/events", params)
ws.onopen = function () {
console.log("ws connected")
}
Expand Down Expand Up @@ -231,6 +237,34 @@ export class RealApi implements Api {
"suspend": suspend,
})
}

watchLogs(cluster: string | undefined, name: string | undefined, namespace: string | undefined, reconcileId: string | undefined, handle: (lines: any[]) => void): () => void {
const params = new URLSearchParams()
if (cluster) params.set("cluster", cluster)
if (name) params.set("name", name)
if (namespace) params.set("namespace", namespace)
if (reconcileId) params.set("reconcileId", reconcileId)

const ws = this.doWebsocket("/api/logs", params)
let cancelled = false

ws.onmessage = function (event: MessageEvent) {
if (cancelled) {
return
}
try {
const line: any = JSON.parse(event.data)
handle([line])
} catch (e) {
}
}

return () => {
console.log("cancel logs", params.toString())
cancelled = true
ws.close()
}
}
}

export class StaticApi implements Api {
Expand Down Expand Up @@ -314,6 +348,10 @@ export class StaticApi implements Api {
setSuspended(cluster: string, name: string, namespace: string, suspend: boolean): Promise<Response> {
throw new Error("not implemented")
}

watchLogs(cluster: string, name: string, namespace: string, reconcileId: string, handle: (lines: any[]) => void): () => void {
return () => {}
}
}

function buildRefParams(ref: ObjectRef, params: URLSearchParams) {
Expand Down

0 comments on commit 0e4f50e

Please sign in to comment.