diff --git a/engine/api/api.go b/engine/api/api.go index 850cf70fb2..d25969807d 100644 --- a/engine/api/api.go +++ b/engine/api/api.go @@ -247,6 +247,7 @@ type API struct { StartupTime time.Time Maintenance bool eventsBroker *eventsBroker + websocketBroker *websocketBroker warnChan chan sdk.Event Cache cache.Store Metrics struct { diff --git a/engine/api/api_routes.go b/engine/api/api_routes.go index e62fd5f560..64405c9399 100644 --- a/engine/api/api_routes.go +++ b/engine/api/api_routes.go @@ -32,6 +32,16 @@ func (api *API) InitRouter() { } api.eventsBroker.Init(context.Background(), api.PanicDump()) + // Initialize event broker + api.websocketBroker = &websocketBroker{ + router: api.Router, + cache: api.Cache, + clients: make(map[string]*websocketBrokerSubscribe), + dbFunc: api.DBConnectionFactory.GetDBMap, + messages: make(chan sdk.Event), + } + api.websocketBroker.Init(context.Background(), api.PanicDump()) + // Access token r.Handle("/accesstoken", r.POST(api.postNewAccessTokenHandler)) r.Handle("/accesstoken/{id}", r.PUT(api.putRegenAccessTokenHandler), r.DELETE(api.deleteAccessTokenHandler)) @@ -379,6 +389,7 @@ func (api *API) InitRouter() { // SSE r.Handle("/events", r.GET(api.eventsBroker.ServeHTTP)) + r.Handle("/ws", r.GET(api.websocketBroker.ServeHTTP, WebSocket())) // Feature r.Handle("/feature/clean", r.POST(api.cleanFeatureHandler, NeedToken("X-Izanami-Token", api.Config.Features.Izanami.Token), Auth(false))) diff --git a/engine/api/router.go b/engine/api/router.go index aa25754d3c..5567c2ead4 100644 --- a/engine/api/router.go +++ b/engine/api/router.go @@ -462,6 +462,13 @@ func NeedHatchery() HandlerConfigParam { return f } +func WebSocket() HandlerConfigParam { + f := func(rc *service.HandlerConfig) { + rc.Options["websocket"] = "true" + } + return f +} + // NeedService set the route for hatchery only func NeedService() HandlerConfigParam { f := func(rc *service.HandlerConfig) { diff --git a/engine/api/router_auth.go b/engine/api/router_auth.go index c8a71fea9c..033388a0f0 100644 --- a/engine/api/router_auth.go +++ b/engine/api/router_auth.go @@ -77,6 +77,9 @@ func (api *API) authDeprecatedMiddleware(ctx context.Context, w http.ResponseWri return ctx, false, sdk.WrapError(sdk.ErrUnauthorized, "Router> Authorization denied on %s %s for %s sdk.ServiceAgent agent %s : %s", req.Method, req.URL, req.RemoteAddr, getAgent(req), err) } default: + if rc.Options["websocket"] == "true" { + req.Header.Add(sdk.SessionTokenHeader, req.Header.Get("Sec-WebSocket-Protocol")) + } var err error ctx, err = api.Router.AuthDriver.CheckAuth(ctx, w, req) if err != nil { diff --git a/engine/api/websocket.go b/engine/api/websocket.go index 4b51ccbb12..dcaf79b23b 100644 --- a/engine/api/websocket.go +++ b/engine/api/websocket.go @@ -23,7 +23,11 @@ import ( "github.com/ovh/cds/sdk/log" ) -var upgrader = websocket.Upgrader{} // use default options +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} // use default options // websocketBrokerSubscribe is the information needed to subscribe type websocketBrokerSubscribe struct { @@ -166,44 +170,76 @@ func (b *websocketBroker) Start(ctx context.Context, panicCallback func(s string func (b *websocketBroker) ServeHTTP() service.Handler { return func(ctx context.Context, w http.ResponseWriter, r *http.Request) (err error) { + w.Header().Set("Connection", "keep-alive") + c, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Error("websocket: unable to create connection: %v", err) return } + defer c.Close() - // Create User - user := deprecatedGetUser(ctx) - if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil { - return sdk.WrapError(err, "websocketBroker.Serve Cannot load user permission") - } + /* + log.Warning("Connected") + //c.WriteMessage(websocket.TextMessage, []byte("Connected")) + log.Warning("Msg Send") - uuid := sdk.UUID() - client := &websocketBrokerSubscribe{ - UUID: uuid, - User: user, - isAlive: abool.NewBool(true), - conn: c, - } + // Create User + user := deprecatedGetUser(ctx) + if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil { + return sdk.WrapError(err, "websocketBroker.Serve Cannot load user permission") + } - // Add this client to the map of those that should receive updates - b.chanAddClient <- client + log.Warning("Load user") - tick := time.NewTicker(time.Second) - defer tick.Stop() + uuid := sdk.UUID() + client := &websocketBrokerSubscribe{ + UUID: uuid, + User: user, + isAlive: abool.NewBool(true), + conn: c, + } - leave: + log.Warning("Create client") + + // Add this client to the map of those that should receive updates + b.chanAddClient <- client + + log.Warning("Client added") + tick := time.NewTicker(time.Second) + defer tick.Stop() + + log.Warning("Start loop") + + leave: + for { + select { + case <-ctx.Done(): + log.Warning("websocket.Http: context done") + b.chanRemoveClient <- client.UUID + break leave + case <-r.Context().Done(): + log.Warning("websocket.Http: client disconnected") + b.chanRemoveClient <- client.UUID + break leave + } + } + log.Warning("Bonjour Je ferme tout") + return nil + + */ for { - select { - case <-ctx.Done(): - log.Debug("websocket.Http: context done") - b.chanRemoveClient <- client.UUID - break leave - case <-r.Context().Done(): - log.Debug("websocket.Http: client disconnected") - b.chanRemoveClient <- client.UUID - break leave + mt, message, err := c.ReadMessage() + if err != nil { + log.Warning("read: %v", err) + //break + } + log.Warning("rcv: %s", message) + err = c.WriteMessage(mt, message) + if err != nil { + log.Warning("write: %v", err) + //break } } return nil diff --git a/ui/src/app/app.component.ts b/ui/src/app/app.component.ts index c48320bf03..06fdc6a6dc 100644 --- a/ui/src/app/app.component.ts +++ b/ui/src/app/app.component.ts @@ -21,6 +21,7 @@ import { ToastService } from './shared/toast/ToastService'; import { CDSSharedWorker } from './shared/worker/shared.worker'; import { CDSWebWorker } from './shared/worker/web.worker'; import { CDSWorker } from './shared/worker/worker'; +import { WebSocketSubject } from 'rxjs/webSocket'; @Component({ selector: 'app-root', @@ -48,6 +49,8 @@ export class AppComponent implements OnInit { lastPing: number; currentTheme: string; + private websocket: WebSocketSubject; + constructor( _translate: TranslateService, private _language: LanguageStore, @@ -95,6 +98,7 @@ export class AppComponent implements OnInit { } else { this.isConnected = true; this.startSSE(); + this.startWebSocket2(); } this.startVersionWorker(); }); @@ -146,6 +150,35 @@ export class AppComponent implements OnInit { } } + startWebSocket2(): void { + let exampleSocket = new WebSocket("ws://127.0.0.1:8081/ws", this._authStore.getUser().token); + exampleSocket.onopen = function (event) { + console.log('Connected', event); + }; + + + } + + startWebSocket(): void { + let conf = { + url: 'ws://127.0.0.1:8081/ws', + protocol: this._authStore.getUser().token, + headers: { + "ee": "df" + }, + }; + + this.websocket = new WebSocketSubject(conf); + this.websocket.subscribe((message)=> { + console.log(message); + }, (err) => { + console.error(err) + }, () => { + console.warn('Completed'); + }); + + } + startSSE(): void { if (this.sseWorker) { this.stopWorker(this.sseWorker, null);