Skip to content

Commit

Permalink
fix(api): write on sse (#3457)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and yesnault committed Oct 12, 2018
1 parent c9398dd commit e66d70c
Showing 1 changed file with 89 additions and 56 deletions.
145 changes: 89 additions & 56 deletions engine/api/events.go
Expand Up @@ -27,9 +27,9 @@ import (
type eventsBrokerSubscribe struct {
UUID string
User *sdk.User
Queue chan sdk.Event
Mutex *sync.Mutex
IsAlive *abool.AtomicBool
isAlive *abool.AtomicBool
w http.ResponseWriter
mutex sync.Mutex
}

// lastUpdateBroker keeps connected client of the current route,
Expand Down Expand Up @@ -93,7 +93,7 @@ func (b *eventsBroker) Start(ctx context.Context) {
b.chanAddClient = make(chan (*eventsBrokerSubscribe))
b.chanRemoveClient = make(chan (string))

tickerMetrics := time.NewTicker(30 * time.Second)
tickerMetrics := time.NewTicker(10 * time.Second)
defer tickerMetrics.Stop()

for {
Expand All @@ -103,40 +103,50 @@ func (b *eventsBroker) Start(ctx context.Context) {

case <-ctx.Done():
if b.clients != nil {
for c, v := range b.clients {
close(v.Queue)
delete(b.clients, c)
for uuid := range b.clients {
delete(b.clients, uuid)
}
observability.Record(b.router.Background, b.router.Stats.SSEClients, 0)

}
if ctx.Err() != nil {
log.Error("eventsBroker.Start> Exiting: %v", ctx.Err())
return
}

case receivedEvent := <-b.messages:
for i := range b.clients {
go func(c *eventsBrokerSubscribe) {
c.Mutex.Lock()
defer c.Mutex.Unlock()
if c.IsAlive.IsSet() {
log.Debug("send data to %s", c.UUID)
c.Queue <- receivedEvent
}
}(b.clients[i])
c := b.clients[i]
if c == nil {
delete(b.clients, i)
continue
}

// Send the event to the client sse within a goroutine
s := "sse-" + b.clients[i].UUID
sdk.GoRoutine(ctx, s,
func(ctx context.Context) {
if c.isAlive.IsSet() {
log.Debug("send data to %s", c.UUID)
if err := c.Send(receivedEvent); err != nil {
log.Error("eventsBroker> unable to send event to %s: %v", c.UUID, err)
b.chanRemoveClient <- c.UUID
}
}
},
)
}

case client := <-b.chanAddClient:
b.clients[client.UUID] = client

case uuid := <-b.chanRemoveClient:
client, has := b.clients[uuid]
if !has {
return
continue
}
go func(c *eventsBrokerSubscribe) {
c.Mutex.Lock()
close(c.Queue)
c.IsAlive.UnSet()
c.Mutex.Unlock()
}(client)

client.isAlive.UnSet()
delete(b.clients, uuid)
}
}
Expand All @@ -155,9 +165,8 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
client := &eventsBrokerSubscribe{
UUID: uuid,
User: getUser(ctx),
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
Mutex: new(sync.Mutex),
IsAlive: abool.NewBool(true),
isAlive: abool.NewBool(true),
w: w,
}

// Add this client to the map of those that should receive updates
Expand Down Expand Up @@ -188,29 +197,6 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
log.Info("events.Http: client disconnected")
b.chanRemoveClient <- client.UUID
break leave
case event := <-client.Queue:
if ok := client.manageEvent(event); !ok {
continue
}

msg, errJ := json.Marshal(event)
if errJ != nil {
log.Warning("sendevent> Unavble to marshall event: %v", errJ)
continue
}

var buffer bytes.Buffer
buffer.WriteString("data: ")
buffer.Write(msg)
buffer.WriteString("\n\n")

if !client.IsAlive.IsSet() {
break leave
}
if _, err := w.Write(buffer.Bytes()); err != nil {
return sdk.WrapError(err, "events.write> Unable to write to client")
}
f.Flush()
case <-tick.C:
if _, err := w.Write([]byte("")); err != nil {
return sdk.WrapError(err, "events.write> Unable to ping client")
Expand All @@ -223,50 +209,97 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
}
}

func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
func (client *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
var isSharedInfra bool
for _, g := range s.User.Groups {
for _, g := range client.User.Groups {
if g.ID == group.SharedInfraGroup.ID {
isSharedInfra = true
break
}
}

if strings.HasPrefix(event.EventType, "sdk.EventProject") {
if s.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
if client.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, client.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventWorkflow") || strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") {
if s.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
if client.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, client.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventApplication") {
if s.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
if client.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, client.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventPipeline") {
if s.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
if client.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, client.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventEnvironment") {
if s.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
if client.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, client.User) >= permission.PermissionRead {
return true
}
return false
}
if strings.HasPrefix(event.EventType, "sdk.EventBroadcast") {
if s.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
if client.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, client.User, permission.PermissionRead) {
return true
}
return false
}
return false
}

// Send an event to a client
func (client *eventsBrokerSubscribe) Send(event sdk.Event) (err error) {
client.mutex.Lock()
defer client.mutex.Unlock()

if client == nil || client.w == nil {
return nil
}

// Make sure that the writer supports flushing.
f, ok := client.w.(http.Flusher)
if !ok {
return sdk.WrapError(fmt.Errorf("streaming unsupported"), "")
}

if ok := client.manageEvent(event); !ok {
return nil
}

msg, err := json.Marshal(event)
if err != nil {
return sdk.WrapError(err, "Unable to marshall event")
}

var buffer bytes.Buffer
buffer.WriteString("data: ")
buffer.Write(msg)
buffer.WriteString("\n\n")

if !client.isAlive.IsSet() {
return nil
}

defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
}()

if _, err := client.w.Write(buffer.Bytes()); err != nil {
return sdk.WrapError(err, "unable to write to client")
}
f.Flush()

return nil
}

0 comments on commit e66d70c

Please sign in to comment.