Skip to content

Commit

Permalink
feat: Switch back to websockets for events
Browse files Browse the repository at this point in the history
  • Loading branch information
codablock committed Aug 3, 2023
1 parent 97b0923 commit 539aed9
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 52 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
github.com/tkrajina/typescriptify-golang-structs v0.1.10
go.mozilla.org/sops/v3 v3.7.4-0.20220901181616-9124783930b1
golang.org/x/oauth2 v0.10.0
nhooyr.io/websocket v1.8.7
sigs.k8s.io/cli-utils v0.35.0
sigs.k8s.io/controller-runtime v0.15.0
sigs.k8s.io/kustomize/api v0.13.4
Expand Down
20 changes: 20 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ github.com/gin-contrib/sessions v0.0.5 h1:CATtfHmLMQrMNpJRgzjWXD7worTh7g7ritsQfm
github.com/gin-contrib/sessions v0.0.5/go.mod h1:vYAuaUPqie3WUSsft6HUlCjlwwoJQs97miaG2+7neKY=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/gliderlabs/ssh v0.3.5 h1:OcaySEmAQJgyYcArR+gGGTHCyE7nvhEMTlYY+Dp8CpY=
Expand Down Expand Up @@ -322,11 +323,15 @@ github.com/go-openapi/jsonreference v0.20.1 h1:FBLnyygC4/IZZr893oiomc9XaghoveYTr
github.com/go-openapi/jsonreference v0.20.1/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k=
github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
Expand All @@ -342,6 +347,12 @@ github.com/gobuffalo/packr/v2 v2.8.3 h1:xE1yzvnO56cUC0sTpKR3DIbxZgB54AftTFMhB2XE
github.com/gobuffalo/packr/v2 v2.8.3/go.mod h1:0SahksCVcx4IMnigTjiFuyldmTrdTctXsOdiU5KwbKc=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0 h1:QEmUOlnSjWtnpRGHF3SauEiOsy82Cup83Vf2LcMlnc8=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo=
github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -459,6 +470,8 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+
github.com/gorilla/sessions v1.2.1 h1:DHd3rPN5lE3Ts3D8rKkQ8x/0kqfeNmBAaiSi+o7FsgI=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosuri/uitable v0.0.4 h1:IG2xLKRvErL3uhY6e1BylFzG+aJiwQviDDTfOKeKTpY=
github.com/gosuri/uitable v0.0.4/go.mod h1:tKR86bXuXPZazfOTG1FIzvjIdXzd0mo4Vtn16vt0PJo=
Expand Down Expand Up @@ -544,6 +557,7 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
Expand All @@ -559,6 +573,7 @@ github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down Expand Up @@ -588,6 +603,7 @@ github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand Down Expand Up @@ -845,6 +861,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli v1.22.12 h1:igJgVw1JdKH+trcLWLeLwZjU9fEfPesQ+9/e4MQ44S8=
Expand Down Expand Up @@ -1412,6 +1430,8 @@ k8s.io/kubectl v0.27.2 h1:sSBM2j94MHBFRWfHIWtEXWCicViQzZsb177rNsKBhZg=
k8s.io/kubectl v0.27.2/go.mod h1:GCOODtxPcrjh+EC611MqREkU8RjYBh10ldQCQ6zpFKw=
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 h1:kmDqav+P+/5e1i9tFfHq1qcF3sOrDp+YEkVDAHu7Jwk=
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
oras.land/oras-go v1.2.3 h1:v8PJl+gEAntI1pJ/LCrDgsuk+1PKVavVEPsYIHFE5uY=
oras.land/oras-go v1.2.3/go.mod h1:M/uaPdYklze0Vf3AakfarnpoEckvw0ESbRdN8Z1vdJg=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
92 changes: 71 additions & 21 deletions pkg/webui/events.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package webui

import (
"bytes"
"container/list"
"fmt"
"context"
"github.com/gin-gonic/gin"
"github.com/kluctl/kluctl/v2/pkg/results"
"github.com/kluctl/kluctl/v2/pkg/status"
"github.com/kluctl/kluctl/v2/pkg/types"
"github.com/kluctl/kluctl/v2/pkg/types/result"
"github.com/kluctl/kluctl/v2/pkg/yaml"
"net/http"
"nhooyr.io/websocket"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -229,7 +231,6 @@ func (h *eventsHandler) handler(c *gin.Context) {
args := struct {
FilterProject string `form:"filterProject"`
FilterSubDir string `form:"filterSubDir"`
Seq int64 `form:"seq"`
}{}
err := c.BindQuery(&args)
if err != nil {
Expand All @@ -251,14 +252,44 @@ func (h *eventsHandler) handler(c *gin.Context) {
}
}

getNewEvents := func() ([]string, int64) {
acceptOptions := &websocket.AcceptOptions{
InsecureSkipVerify: true,
}

userAgentLower := strings.ToLower(c.GetHeader("User-Agent"))
isSafari := strings.Contains(userAgentLower, "safari") && !strings.Contains(userAgentLower, "chrome") && !strings.Contains(userAgentLower, "android")

if isSafari {
acceptOptions.CompressionMode = websocket.CompressionDisabled
}

conn, err := websocket.Accept(c.Writer, c.Request, acceptOptions)
if err != nil {
return
}
defer conn.Close(websocket.StatusInternalError, "the sky is falling")

err = h.wsHandle(conn, filter)
if err != nil {
cs := websocket.CloseStatus(err)
if cs == websocket.StatusNormalClosure || cs == websocket.StatusGoingAway {
return
}
_ = c.AbortWithError(http.StatusInternalServerError, err)
}
}

func (h *eventsHandler) wsHandle(c *websocket.Conn, filter *result.ProjectKey) error {
ctx := c.CloseRead(h.server.ctx)

getNewEvents := func(seq int64) ([]string, int64) {
h.mutex.Lock()
defer h.mutex.Unlock()
var events []string
nextSeq := args.Seq
nextSeq := seq
for e := h.events.Front(); e != nil; e = e.Next() {
e2 := e.Value.(*eventEntry)
if e2.seq < args.Seq {
if e2.seq < seq {
continue
}
nextSeq = e2.seq + 1
Expand All @@ -271,24 +302,43 @@ func (h *eventsHandler) handler(c *gin.Context) {
return events, nextSeq
}

events, nextSeq := getNewEvents()
timeout := time.After(30 * time.Second)
outer:
for len(events) == 0 {
select {
case <-h.server.ctx.Done():
_ = c.AbortWithError(http.StatusServiceUnavailable, fmt.Errorf("context cancelled"))
return
case <-timeout:
break outer
case <-time.After(100 * time.Millisecond):
var seq int64
for {
events, nextSeq := getNewEvents(seq)
if len(events) != 0 {
buf := bytes.NewBuffer(nil)
buf.Write([]byte("["))
for i, e := range events {
if i != 0 {
buf.Write([]byte(","))
}
buf.Write([]byte(e))
}
buf.Write([]byte("]"))

err := h.wsSendMessage(ctx, c, time.Second*5, buf.String())
if err != nil {
return err
}
}

events, nextSeq = getNewEvents()
seq = nextSeq

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1000 * time.Millisecond):
}
}
}

func (s *eventsHandler) wsSendMessage(ctx context.Context, c *websocket.Conn, timeout time.Duration, msg string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

j := fmt.Sprintf(`{"nextSeq": %d, "events": [%s]}`, nextSeq, strings.Join(events, ","))
c.Writer.Header().Set("Content-Type", "application/json")
c.Status(http.StatusOK)
_, _ = c.Writer.WriteString(j)
err := c.Write(ctx, websocket.MessageText, []byte(msg))
if err != nil {
return err
}
return err
}
2 changes: 1 addition & 1 deletion pkg/webui/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *CommandResultsServer) Run(host string, port int) error {
if err != nil {
return err
}
api.GET("/events", s.events.handler)
api.Any("/events", s.events.handler)

address := fmt.Sprintf("%s:%d", host, port)
listener, err := net.Listen("tcp", address)
Expand Down
74 changes: 45 additions & 29 deletions pkg/webui/ui/src/api.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import Tooltip from "@mui/material/Tooltip";
import "./staticbuild.d.ts"
import { loadScript } from "./loadscript";
import { GitRef } from "./models-static";
import { sleep } from "./utils/misc";

console.log(window.location)

Expand Down Expand Up @@ -36,7 +37,7 @@ export interface Api {
getAuthInfo(): Promise<AuthInfo>
getUser(): Promise<User>
getShortNames(): Promise<ShortName[]>
listenUpdates(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void>
listenEvents(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void>
getCommandResult(resultId: string): Promise<CommandResult>
getCommandResultObject(resultId: string, ref: ObjectRef, objectType: string): Promise<any>
getValidateResult(resultId: string): Promise<ValidateResult>
Expand Down Expand Up @@ -114,50 +115,65 @@ export class RealApi implements Api {
return this.doGet("/api/getShortNames")
}

async listenUpdates(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void> {
async listenEvents(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void> {
let host = window.location.host
let proto = "wss"
if (process.env.NODE_ENV === 'development') {
host = "localhost:9090"
}
if (window.location.protocol !== "https:") {
proto = "ws"
}
let url = `${proto}://${host}${rootPath}/api/events`

const params = new URLSearchParams()
if (filterProject) {
params.set("filterProject", filterProject)
}
if (filterSubDir) {
params.set("filterSubDir", filterSubDir)
}
url += "?" + params.toString()

let seq = 0
const abort = new AbortController()
let ws: WebSocket | undefined;
let cancelled = false

const doGetEvents = async () => {
if (abort.signal.aborted) {
const connect = async () => {
if (cancelled) {
return
}

params.set("seq", seq + "")
let resp: any
try {
resp = await this.doGet("/api/events", params, abort.signal)
} catch (error) {
console.log("events error", error)
seq = 0
await new Promise(r => setTimeout(r, 5000));
doGetEvents()
return
console.log("ws connect: " + url)
ws = new WebSocket(url);
ws.onopen = function () {
console.log("ws connected")
}
ws.onclose = function (event) {
console.log("ws close")
if (!cancelled) {
sleep(5000).then(connect)
}
}
ws.onerror = function (event) {
console.log("ws error", event)
}
ws.onmessage = function (event: MessageEvent) {
if (cancelled) {
return
}
const msg: any[] = JSON.parse(event.data)
msg.forEach(handle)
}

seq = resp.nextSeq
const events = resp.events

events.forEach((e: any) => {
handle(e)
})

doGetEvents()
}

doGetEvents()
await connect()

return () => {
console.log("events cancel")
abort.abort()
console.log("ws cancel")
cancelled = true
if (ws) {
ws.close()
}
}
}

Expand Down Expand Up @@ -237,7 +253,7 @@ export class StaticApi implements Api {
return staticShortNames
}

async listenUpdates(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void> {
async listenEvents(filterProject: string | undefined, filterSubDir: string | undefined, handle: (msg: any) => void): Promise<() => void> {
await loadScript(staticPath + "/summaries.js")

staticSummaries.forEach(rs => {
Expand Down
2 changes: 1 addition & 1 deletion pkg/webui/ui/src/components/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const LoggedInApp = (props: { onLogout: () => void }) => {

console.log("starting listenResults")
let cancel: Promise<() => void>
cancel = api.listenUpdates(undefined, undefined, msg => {
cancel = api.listenEvents(undefined, undefined, msg => {
switch (msg.type) {
case "update_command_result_summary":
updateCommandResultSummary(msg.summary)
Expand Down

0 comments on commit 539aed9

Please sign in to comment.