Skip to content

Commit

Permalink
Merge pull request #7325 from kobergj/SEEImprovements
Browse files Browse the repository at this point in the history
Improve SSE Notifications
  • Loading branch information
kobergj committed Sep 22, 2023
2 parents 8bf3f2f + 586bae4 commit 0535c62
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 19 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/improve-sses.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Improve SSE format

Improve format of sse notifications

https://github.com/owncloud/ocis/pull/7325
6 changes: 6 additions & 0 deletions services/clientlog/pkg/service/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package service

// FileReadyEvent is emitted when the postprocessing of a file is finished
type FileReadyEvent struct {
ItemID string `json:"itemid"`
}
25 changes: 11 additions & 14 deletions services/clientlog/pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
"go.opentelemetry.io/otel/trace"
)

// ClientNotification is the event the clientlog service is sending to the client
type ClientNotification struct {
Type string
ItemID string
}

// ClientlogService is the service responsible for user activities
type ClientlogService struct {
log log.Logger
Expand Down Expand Up @@ -93,8 +87,9 @@ func (cl *ClientlogService) processEvent(event events.Event) {
}

var (
users []string
noti ClientNotification
users []string
evType string
data interface{}
)
switch e := event.Event.(type) {
default:
Expand All @@ -106,8 +101,10 @@ func (cl *ClientlogService) processEvent(event events.Event) {
return
}

noti.Type = "postprocessing-finished"
noti.ItemID = storagespace.FormatResourceID(*info.GetId())
evType = "postprocessing-finished"
data = FileReadyEvent{
ItemID: storagespace.FormatResourceID(*info.GetId()),
}

users, err = utils.GetSpaceMembers(ctx, info.GetSpace().GetId().GetOpaqueId(), gwc, utils.ViewerRole)
}
Expand All @@ -119,22 +116,22 @@ func (cl *ClientlogService) processEvent(event events.Event) {

// II) instruct sse service to send the information
for _, id := range users {
if err := cl.sendSSE(id, noti); err != nil {
if err := cl.sendSSE(id, evType, data); err != nil {
cl.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user")
return
}
}
}

func (cl *ClientlogService) sendSSE(userid string, noti ClientNotification) error {
b, err := json.Marshal(noti)
func (cl *ClientlogService) sendSSE(userid string, evType string, data interface{}) error {
b, err := json.Marshal(data)
if err != nil {
return err
}

return events.Publish(context.Background(), cl.publisher, events.SendSSE{
UserID: userid,
Type: "clientlog-notification",
Type: evType,
Message: b,
})
}
22 changes: 17 additions & 5 deletions services/sse/pkg/service/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"encoding/json"
"net/http"

revactx "github.com/cs3org/reva/v2/pkg/ctx"
Expand All @@ -12,6 +13,12 @@ import (
"github.com/owncloud/ocis/v2/services/sse/pkg/config"
)

// ServerSentEvent is the data structure sent by the sse service
type ServerSentEvent struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}

// SSE defines implements the business logic for Service.
type SSE struct {
c *config.Config
Expand Down Expand Up @@ -45,20 +52,25 @@ func (s SSE) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// ListenForEvents listens for events
func (s SSE) ListenForEvents() error {
func (s SSE) ListenForEvents() {
for e := range s.evChannel {
switch ev := e.Event.(type) {
default:
s.l.Error().Interface("event", ev).Msg("unhandled event")
case events.SendSSE:
b, err := json.Marshal(ServerSentEvent{
Type: ev.Type,
Data: ev.Message,
})
if err != nil {
s.l.Error().Interface("event", ev).Msg("cannot marshal event")
continue
}
s.sse.Publish(ev.UserID, &sse.Event{
Event: []byte(ev.Type),
Data: ev.Message,
Data: b,
})
}
}

return nil
}

// HandleSSE is the GET handler for events
Expand Down

0 comments on commit 0535c62

Please sign in to comment.