Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sguiheux committed Jun 26, 2019
1 parent 1cfa6c7 commit 1d6c6b1
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 27 deletions.
1 change: 1 addition & 0 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ type API struct {
StartupTime time.Time
Maintenance bool
eventsBroker *eventsBroker
websocketBroker *websocketBroker
warnChan chan sdk.Event
Cache cache.Store
Metrics struct {
Expand Down
11 changes: 11 additions & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ func (api *API) InitRouter() {
}
api.eventsBroker.Init(context.Background(), api.PanicDump())

// Initialize event broker
api.websocketBroker = &websocketBroker{
router: api.Router,
cache: api.Cache,
clients: make(map[string]*websocketBrokerSubscribe),
dbFunc: api.DBConnectionFactory.GetDBMap,
messages: make(chan sdk.Event),
}
api.websocketBroker.Init(context.Background(), api.PanicDump())

// Access token
r.Handle("/accesstoken", r.POST(api.postNewAccessTokenHandler))
r.Handle("/accesstoken/{id}", r.PUT(api.putRegenAccessTokenHandler), r.DELETE(api.deleteAccessTokenHandler))
Expand Down Expand Up @@ -379,6 +389,7 @@ func (api *API) InitRouter() {

// SSE
r.Handle("/events", r.GET(api.eventsBroker.ServeHTTP))
r.Handle("/ws", r.GET(api.websocketBroker.ServeHTTP, WebSocket()))

// Feature
r.Handle("/feature/clean", r.POST(api.cleanFeatureHandler, NeedToken("X-Izanami-Token", api.Config.Features.Izanami.Token), Auth(false)))
Expand Down
7 changes: 7 additions & 0 deletions engine/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,13 @@ func NeedHatchery() HandlerConfigParam {
return f
}

func WebSocket() HandlerConfigParam {
f := func(rc *service.HandlerConfig) {
rc.Options["websocket"] = "true"
}
return f
}

// NeedService set the route for hatchery only
func NeedService() HandlerConfigParam {
f := func(rc *service.HandlerConfig) {
Expand Down
3 changes: 3 additions & 0 deletions engine/api/router_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (api *API) authDeprecatedMiddleware(ctx context.Context, w http.ResponseWri
return ctx, false, sdk.WrapError(sdk.ErrUnauthorized, "Router> Authorization denied on %s %s for %s sdk.ServiceAgent agent %s : %s", req.Method, req.URL, req.RemoteAddr, getAgent(req), err)
}
default:
if rc.Options["websocket"] == "true" {
req.Header.Add(sdk.SessionTokenHeader, req.Header.Get("Sec-WebSocket-Protocol"))
}
var err error
ctx, err = api.Router.AuthDriver.CheckAuth(ctx, w, req)
if err != nil {
Expand Down
90 changes: 63 additions & 27 deletions engine/api/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import (
"github.com/ovh/cds/sdk/log"
)

var upgrader = websocket.Upgrader{} // use default options
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
} // use default options

// websocketBrokerSubscribe is the information needed to subscribe
type websocketBrokerSubscribe struct {
Expand Down Expand Up @@ -166,44 +170,76 @@ func (b *websocketBroker) Start(ctx context.Context, panicCallback func(s string

func (b *websocketBroker) ServeHTTP() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) (err error) {
w.Header().Set("Connection", "keep-alive")

c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error("websocket: unable to create connection: %v", err)
return
}

defer c.Close()

// Create User
user := deprecatedGetUser(ctx)
if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil {
return sdk.WrapError(err, "websocketBroker.Serve Cannot load user permission")
}
/*
log.Warning("Connected")
//c.WriteMessage(websocket.TextMessage, []byte("Connected"))
log.Warning("Msg Send")
uuid := sdk.UUID()
client := &websocketBrokerSubscribe{
UUID: uuid,
User: user,
isAlive: abool.NewBool(true),
conn: c,
}
// Create User
user := deprecatedGetUser(ctx)
if err := loadUserPermissions(b.dbFunc(), b.cache, user); err != nil {
return sdk.WrapError(err, "websocketBroker.Serve Cannot load user permission")
}
// Add this client to the map of those that should receive updates
b.chanAddClient <- client
log.Warning("Load user")
tick := time.NewTicker(time.Second)
defer tick.Stop()
uuid := sdk.UUID()
client := &websocketBrokerSubscribe{
UUID: uuid,
User: user,
isAlive: abool.NewBool(true),
conn: c,
}
leave:
log.Warning("Create client")
// Add this client to the map of those that should receive updates
b.chanAddClient <- client
log.Warning("Client added")
tick := time.NewTicker(time.Second)
defer tick.Stop()
log.Warning("Start loop")
leave:
for {
select {
case <-ctx.Done():
log.Warning("websocket.Http: context done")
b.chanRemoveClient <- client.UUID
break leave
case <-r.Context().Done():
log.Warning("websocket.Http: client disconnected")
b.chanRemoveClient <- client.UUID
break leave
}
}
log.Warning("Bonjour Je ferme tout")
return nil
*/
for {
select {
case <-ctx.Done():
log.Debug("websocket.Http: context done")
b.chanRemoveClient <- client.UUID
break leave
case <-r.Context().Done():
log.Debug("websocket.Http: client disconnected")
b.chanRemoveClient <- client.UUID
break leave
mt, message, err := c.ReadMessage()
if err != nil {
log.Warning("read: %v", err)
//break
}
log.Warning("rcv: %s", message)
err = c.WriteMessage(mt, message)
if err != nil {
log.Warning("write: %v", err)
//break
}
}
return nil
Expand Down
33 changes: 33 additions & 0 deletions ui/src/app/app.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ToastService } from './shared/toast/ToastService';
import { CDSSharedWorker } from './shared/worker/shared.worker';
import { CDSWebWorker } from './shared/worker/web.worker';
import { CDSWorker } from './shared/worker/worker';
import { WebSocketSubject } from 'rxjs/webSocket';

@Component({
selector: 'app-root',
Expand Down Expand Up @@ -48,6 +49,8 @@ export class AppComponent implements OnInit {
lastPing: number;
currentTheme: string;

private websocket: WebSocketSubject<any>;

constructor(
_translate: TranslateService,
private _language: LanguageStore,
Expand Down Expand Up @@ -95,6 +98,7 @@ export class AppComponent implements OnInit {
} else {
this.isConnected = true;
this.startSSE();
this.startWebSocket2();
}
this.startVersionWorker();
});
Expand Down Expand Up @@ -146,6 +150,35 @@ export class AppComponent implements OnInit {
}
}

startWebSocket2(): void {
let exampleSocket = new WebSocket("ws://127.0.0.1:8081/ws", this._authStore.getUser().token);
exampleSocket.onopen = function (event) {
console.log('Connected', event);
};


}

startWebSocket(): void {
let conf = {
url: 'ws://127.0.0.1:8081/ws',
protocol: this._authStore.getUser().token,
headers: {
"ee": "df"
},
};

this.websocket = new WebSocketSubject(conf);
this.websocket.subscribe((message)=> {
console.log(message);
}, (err) => {
console.error(err)
}, () => {
console.warn('Completed');
});

}

startSSE(): void {
if (this.sseWorker) {
this.stopWorker(this.sseWorker, null);
Expand Down

0 comments on commit 1d6c6b1

Please sign in to comment.