Skip to content

Commit

Permalink
Merge pull request #5998 from kobergj/AddSSEToUserlog
Browse files Browse the repository at this point in the history
Add SSE to userlog
  • Loading branch information
kobergj committed Jun 29, 2023
2 parents ac15ea9 + 47d04d6 commit 5284820
Show file tree
Hide file tree
Showing 37 changed files with 2,410 additions and 37 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/add-sse-endpoint.md
@@ -0,0 +1,5 @@
Enhancement: Add SSE Endpoint

Add a server-sent events (sse) endpoint for the userlog service

https://github.com/owncloud/ocis/pull/5998
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -280,6 +280,7 @@ require (
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.22.8 // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rivo/uniseg v0.4.2 // indirect
github.com/rs/cors v1.9.0 // indirect
Expand Down Expand Up @@ -325,6 +326,7 @@ require (
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Expand Up @@ -1469,6 +1469,8 @@ github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9
github.com/prometheus/statsd_exporter v0.22.8 h1:Qo2D9ZzaQG+id9i5NYNGmbf1aa/KxKbB9aKfMS+Yib0=
github.com/prometheus/statsd_exporter v0.22.8/go.mod h1:/DzwbTEaFTE0Ojz5PqcSk6+PFHOPWGxdXVr6yC8eFOM=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 h1:MkV+77GLUNo5oJ0jf870itWm3D0Sjh7+Za9gazKc5LQ=
Expand Down Expand Up @@ -1808,6 +1810,7 @@ golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -2383,6 +2386,8 @@ google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cn
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/Acconut/lockfile.v1 v1.1.0/go.mod h1:6UCz3wJ8tSFUsPR6uP/j8uegEtDuEEqFxlpi0JI4Umw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 4 additions & 0 deletions services/userlog/README.md
Expand Up @@ -30,6 +30,10 @@ For the time being, the configuration which user related events are of interest

The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications).

## Subscribing

Additionaly to the oc10 API, the `userlog` service also provides an `/sse` (Server-Sent Events) endpoint to be informed by the server when an event happens. See [What is Server-Sent Events](https://medium.com/yemeksepeti-teknoloji/what-is-server-sent-events-sse-and-how-to-implement-it-904938bffd73) for a simple introduction and examples to server sent events. The `sse` endpoint will respect language changes of the user without needing to reconnect.

## Deleting

To delete events for an user, use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the IDs to delete.
Expand Down
3 changes: 3 additions & 0 deletions services/userlog/pkg/command/server.go
Expand Up @@ -16,6 +16,7 @@ import (
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
"github.com/owncloud/ocis/v2/ocis-pkg/version"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser"
"github.com/owncloud/ocis/v2/services/userlog/pkg/logging"
Expand Down Expand Up @@ -102,6 +103,7 @@ func Server(cfg *config.Config) *cli.Command {
}

hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", ogrpc.DefaultClient())
vClient := settingssvc.NewValueService("com.owncloud.api.settings", ogrpc.DefaultClient())

{
server, err := http.Server(
Expand All @@ -113,6 +115,7 @@ func Server(cfg *config.Config) *cli.Command {
http.Consumer(consumer),
http.GatewaySelector(gatewaySelector),
http.History(hClient),
http.Value(vClient),
http.RegisteredEvents(_registeredEvents),
)

Expand Down
2 changes: 2 additions & 0 deletions services/userlog/pkg/config/config.go
Expand Up @@ -28,6 +28,8 @@ type Config struct {
Events Events `yaml:"events"`
Persistence Persistence `yaml:"persistence"`

DisableSSE bool `yaml:"disable_sse" env:"USERLOG_DISABLE_SSE" desc:"Disables server-sent events (sse). When disabled, clients will no longer be able to connect to the sse endpoint."`

Context context.Context `yaml:"-"`
}

Expand Down
9 changes: 9 additions & 0 deletions services/userlog/pkg/server/http/option.go
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"github.com/owncloud/ocis/v2/services/userlog/pkg/metrics"
"github.com/urfave/cli/v2"
Expand All @@ -29,6 +30,7 @@ type Options struct {
Consumer events.Consumer
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
HistoryClient ehsvc.EventHistoryService
ValueClient settingssvc.ValueService
RegisteredEvents []events.Unmarshaller
}

Expand Down Expand Up @@ -119,3 +121,10 @@ func RegisteredEvents(evs []events.Unmarshaller) Option {
o.RegisteredEvents = evs
}
}

// Value provides a function to configure the value service client
func Value(vs settingssvc.ValueService) Option {
return func(o *Options) {
o.ValueClient = vs
}
}
1 change: 1 addition & 0 deletions services/userlog/pkg/server/http/server.go
Expand Up @@ -76,6 +76,7 @@ func Server(opts ...Option) (http.Service, error) {
svc.Config(options.Config),
svc.HistoryClient(options.HistoryClient),
svc.GatewaySelector(options.GatewaySelector),
svc.ValueClient(options.ValueClient),
svc.RegisteredEvents(options.RegisteredEvents),
)
if err != nil {
Expand Down
42 changes: 13 additions & 29 deletions services/userlog/pkg/service/conversion.go
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"embed"
"errors"
"fmt"
"io/fs"
"strings"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/leonelquinteros/gotext"
ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0"
)

//go:embed l10n/locale
Expand Down Expand Up @@ -56,7 +54,6 @@ type Converter struct {
gatewaySelector pool.Selectable[gateway.GatewayAPIClient]
machineAuthAPIKey string
serviceName string
registeredEvents map[string]events.Unmarshaller
translationPath string

// cached within one request not to query other service too much
Expand All @@ -67,13 +64,12 @@ type Converter struct {
}

// NewConverter returns a new Converter
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string, registeredEvents map[string]events.Unmarshaller) *Converter {
func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPIClient], machineAuthAPIKey string, name string, translationPath string) *Converter {
return &Converter{
locale: loc,
gatewaySelector: gatewaySelector,
machineAuthAPIKey: machineAuthAPIKey,
serviceName: name,
registeredEvents: registeredEvents,
translationPath: translationPath,
spaces: make(map[string]*storageprovider.StorageSpace),
users: make(map[string]*user.User),
Expand All @@ -83,52 +79,40 @@ func NewConverter(loc string, gatewaySelector pool.Selectable[gateway.GatewayAPI
}

// ConvertEvent converts an eventhistory event to an OC10Notification
func (c *Converter) ConvertEvent(event *ehmsg.Event) (OC10Notification, error) {
etype, ok := c.registeredEvents[event.Type]
if !ok {
// this should not happen
return OC10Notification{}, errors.New("eventtype not registered")
}

einterface, err := etype.Unmarshal(event.Event)
if err != nil {
// this shouldn't happen either
return OC10Notification{}, errors.New("cant unmarshal event")
}

switch ev := einterface.(type) {
func (c *Converter) ConvertEvent(eventid string, event interface{}) (OC10Notification, error) {
switch ev := event.(type) {
default:
return OC10Notification{}, fmt.Errorf("unknown event type: %T", ev)
// file related
case events.PostprocessingStepFinished:
switch ev.FinishedStep {
case events.PPStepAntivirus:
res := ev.Result.(events.VirusscanResult)
return c.virusMessage(event.Id, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
return c.virusMessage(eventid, VirusFound, ev.ExecutingUser, res.ResourceID, ev.Filename, res.Description, res.Scandate)
case events.PPStepPolicies:
return c.policiesMessage(event.Id, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
return c.policiesMessage(eventid, PoliciesEnforced, ev.ExecutingUser, ev.Filename, time.Now())
default:
return OC10Notification{}, fmt.Errorf("unknown postprocessing step: %s", ev.FinishedStep)
}
// space related
case events.SpaceDisabled:
return c.spaceMessage(event.Id, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceDisabled, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceDeleted:
return c.spaceDeletedMessage(event.Id, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
return c.spaceDeletedMessage(eventid, ev.Executant, ev.ID.GetOpaqueId(), ev.SpaceName, ev.Timestamp)
case events.SpaceShared:
return c.spaceMessage(event.Id, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceShared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceUnshared:
return c.spaceMessage(event.Id, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
return c.spaceMessage(eventid, SpaceUnshared, ev.Executant, ev.ID.GetOpaqueId(), ev.Timestamp)
case events.SpaceMembershipExpired:
return c.spaceMessage(event.Id, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)
return c.spaceMessage(eventid, SpaceMembershipExpired, ev.SpaceOwner, ev.SpaceID.GetOpaqueId(), ev.ExpiredAt)

// share related
case events.ShareCreated:
return c.shareMessage(event.Id, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
return c.shareMessage(eventid, ShareCreated, ev.Executant, ev.ItemID, ev.ShareID, utils.TSToTime(ev.CTime))
case events.ShareExpired:
return c.shareMessage(event.Id, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
return c.shareMessage(eventid, ShareExpired, ev.ShareOwner, ev.ItemID, ev.ShareID, ev.ExpiredAt)
case events.ShareRemoved:
return c.shareMessage(event.Id, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
return c.shareMessage(eventid, ShareRemoved, ev.Executant, ev.ItemID, ev.ShareID, ev.Timestamp)
}
}

Expand Down
44 changes: 42 additions & 2 deletions services/userlog/pkg/service/http.go
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"net/http"

"github.com/cs3org/reva/v2/pkg/ctx"
revactx "github.com/cs3org/reva/v2/pkg/ctx"
)

Expand Down Expand Up @@ -31,11 +32,23 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
return
}

conv := NewConverter(r.Header.Get(HeaderAcceptLanguage), ul.gatewaySelector, ul.cfg.MachineAuthAPIKey, ul.cfg.Service.Name, ul.cfg.TranslationPath, ul.registeredEvents)
conv := ul.getConverter(r.Header.Get(HeaderAcceptLanguage))

resp := GetEventResponseOC10{}
for _, e := range evs {
noti, err := conv.ConvertEvent(e)
etype, ok := ul.registeredEvents[e.Type]
if !ok {
ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("event not registered")
continue
}

einterface, err := etype.Unmarshal(e.Event)
if err != nil {
ul.log.Error().Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to umarshal event")
continue
}

noti, err := conv.ConvertEvent(e.Id, einterface)
if err != nil {
ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event")
continue
Expand All @@ -49,6 +62,33 @@ func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request
w.Write(b)
}

// HandleSSE is the GET handler for events
func (ul *UserlogService) HandleSSE(w http.ResponseWriter, r *http.Request) {
u, ok := ctx.ContextGetUser(r.Context())
if !ok {
ul.log.Error().Msg("sse: no user in context")
w.WriteHeader(http.StatusInternalServerError)
return
}

uid := u.GetId().GetOpaqueId()
if uid == "" {
ul.log.Error().Msg("sse: user in context is broken")
w.WriteHeader(http.StatusInternalServerError)
return
}

stream := ul.sse.CreateStream(uid)
stream.AutoReplay = false

// add stream to URL
q := r.URL.Query()
q.Set("stream", uid)
r.URL.RawQuery = q.Encode()

ul.sse.ServeHTTP(w, r)
}

// HandleDeleteEvents is the DELETE handler for events
func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) {
u, ok := revactx.ContextGetUser(r.Context())
Expand Down
8 changes: 8 additions & 0 deletions services/userlog/pkg/service/options.go
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-chi/chi/v5"
"github.com/owncloud/ocis/v2/ocis-pkg/log"
ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0"
settingssvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/settings/v0"
"github.com/owncloud/ocis/v2/services/userlog/pkg/config"
"go-micro.dev/v4/store"
)
Expand All @@ -23,6 +24,7 @@ type Options struct {
Config *config.Config
HistoryClient ehsvc.EventHistoryService
GatewaySelector pool.Selectable[gateway.GatewayAPIClient]
ValueClient settingssvc.ValueService
RegisteredEvents []events.Unmarshaller
}

Expand Down Expand Up @@ -81,3 +83,9 @@ func RegisteredEvents(e []events.Unmarshaller) Option {
o.RegisteredEvents = e
}
}

func ValueClient(vs settingssvc.ValueService) Option {
return func(o *Options) {
o.ValueClient = vs
}
}

0 comments on commit 5284820

Please sign in to comment.