Skip to content

Commit

Permalink
Enhance notification centre
Browse files Browse the repository at this point in the history
Signed-off-by: ashish <ashishjaitiwari15112000@gmail.com>
  • Loading branch information
Revolyssup committed Aug 21, 2022
1 parent 0f14a53 commit 68b3a9e
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 26 deletions.
15 changes: 5 additions & 10 deletions adapter/adapter.go
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"sync"

"github.com/layer5io/meshery-adapter-library/events"
meshkitCfg "github.com/layer5io/meshkit/config"
"github.com/layer5io/meshkit/logger"
)
Expand All @@ -30,9 +31,9 @@ type Handler interface {
GetName() string // Returns the name of the adapter.
GetComponentInfo(interface{}) error // Returns the component info.
// CreateInstance(*chan interface{}) error // Instantiates clients used in deploying and managing mesh instances, e.g. Kubernetes clients.
ApplyOperation(context.Context, OperationRequest, *chan interface{}) error // Applies an adapter operation. This is adapter specific and needs to be implemented by each adapter.
ListOperations() (Operations, error) // List all operations an adapter supports.
ProcessOAM(ctx context.Context, srv OAMRequest, hchan *chan interface{}) (string, error)
ApplyOperation(context.Context, OperationRequest) error // Applies an adapter operation. This is adapter specific and needs to be implemented by each adapter.
ListOperations() (Operations, error) // List all operations an adapter supports.
ProcessOAM(ctx context.Context, srv OAMRequest) (string, error)

// Need not implement this method and can be reused
StreamErr(*Event, error) // Streams an error event, e.g. to a channel
Expand All @@ -45,12 +46,6 @@ type Adapter struct {
Config meshkitCfg.Handler
KubeconfigHandler meshkitCfg.Handler
Log logger.Handler
Channel *chan interface{}
EventsBuffer *events.EventBuffer
mx sync.Mutex
}

func (h *Adapter) SetChannel(hchan *chan interface{}) {
h.mx.Lock()
defer h.mx.Unlock()
h.Channel = hchan
}
8 changes: 4 additions & 4 deletions adapter/logger.go
Expand Up @@ -56,19 +56,19 @@ func (s *adapterLogger) GetComponentInfo(svc interface{}) error {
// return err
// }

func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest, hchan *chan interface{}) error {
func (s *adapterLogger) ApplyOperation(ctx context.Context, op OperationRequest) error {
s.log.Info("Applying operation ", op.OperationName)
err := s.next.ApplyOperation(ctx, op, hchan)
err := s.next.ApplyOperation(ctx, op)
if err != nil {
s.log.Error(err)
}
return err
}

// ProcessOAM wraps the Handler's ProcessOAM method along with relevant logging
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest, hchan *chan interface{}) (string, error) {
func (s *adapterLogger) ProcessOAM(ctx context.Context, oamRequest OAMRequest) (string, error) {
s.log.Info("Process OAM components")
msg, err := s.next.ProcessOAM(ctx, oamRequest, hchan)
msg, err := s.next.ProcessOAM(ctx, oamRequest)
if err != nil {
s.log.Error(err)
}
Expand Down
15 changes: 12 additions & 3 deletions adapter/stream.go
Expand Up @@ -24,11 +24,20 @@ type Event struct {
func (h *Adapter) StreamErr(e *Event, err error) {
h.Log.Error(err)
e.EType = 2
*h.Channel <- e
//Putting this under a go routine so that this function is never blocking. If this push is performed synchronously then the call will be blocking in case
//when the channel is full with no client to recieve the events. This blocking may cause many operations to not return.
go func() {
h.EventsBuffer.Enqueue(e)
h.Log.Info("Event stored and sent successfully")
}()
}

func (h *Adapter) StreamInfo(e *Event) {
h.Log.Info("Sending event")
e.EType = 0
*h.Channel <- e
//Putting this under a go routine so that this function is never blocking. If this push is performed synchronously then the call will be blocking in case
//when the channel is full with no client to recieve the events. This blocking may cause many operations to not return.
go func() {
h.EventsBuffer.Enqueue(e)
h.Log.Info("Event stored and sent successfully")
}()
}
5 changes: 3 additions & 2 deletions api/grpc/grpc.go
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/layer5io/meshery-adapter-library/adapter"
"github.com/layer5io/meshery-adapter-library/api/tracing"
"github.com/layer5io/meshery-adapter-library/events"
"github.com/layer5io/meshery-adapter-library/meshes"

"fmt"
Expand All @@ -50,8 +51,8 @@ type Service struct {
StartedAt time.Time `json:"startedat"`
TraceURL string `json:"traceurl"`

Handler adapter.Handler
Channel chan interface{}
Handler adapter.Handler
EventBuffer *events.EventBuffer

meshes.UnimplementedMeshServiceServer
}
Expand Down
14 changes: 7 additions & 7 deletions api/grpc/handlers.go
Expand Up @@ -59,7 +59,7 @@ func (s *Service) ApplyOperation(ctx context.Context, req *meshes.ApplyRuleReque
OperationID: req.OperationId,
K8sConfigs: req.KubeConfigs,
}
err := s.Handler.ApplyOperation(ctx, operation, &s.Channel)
err := s.Handler.ApplyOperation(ctx, operation)
if err != nil {
return &meshes.ApplyRuleResponse{
Error: err.Error(),
Expand Down Expand Up @@ -97,19 +97,19 @@ func (s *Service) SupportedOperations(ctx context.Context, req *meshes.Supported

// StreamEvents is the handler function for the method StreamEvents.
func (s *Service) StreamEvents(ctx *meshes.EventsRequest, srv meshes.MeshService_StreamEventsServer) error {
clientchan := make(chan interface{}, 10)
s.EventBuffer.Copy(clientchan)
go s.EventBuffer.Subscribe(clientchan)
for {
data := <-s.Channel
data := <-clientchan
event := &meshes.EventsResponse{
OperationId: data.(*adapter.Event).Operationid,
EventType: meshes.EventType(data.(*adapter.Event).EType),
Summary: data.(*adapter.Event).Summary,
Details: data.(*adapter.Event).Details,
}

if err := srv.Send(event); err != nil {
// to prevent loosing the event, will re-add to the channel
go func() {
s.Channel <- data
}()
return err
}
time.Sleep(500 * time.Millisecond)
Expand All @@ -126,7 +126,7 @@ func (s *Service) ProcessOAM(ctx context.Context, srv *meshes.ProcessOAMRequest)
K8sConfigs: srv.KubeConfigs,
}

msg, err := s.Handler.ProcessOAM(ctx, operation, &s.Channel)
msg, err := s.Handler.ProcessOAM(ctx, operation)
return &meshes.ProcessOAMResponse{Message: msg}, err
}

Expand Down
63 changes: 63 additions & 0 deletions events/event.go
@@ -0,0 +1,63 @@
package events

import (
"fmt"
"sync"
)

type EventBuffer struct {
circularqueue []interface{}
clientChannels []chan interface{}
last int //last=-1 means queue is empty
mx sync.Mutex
clmx sync.Mutex
}

//TODO: Persist older than 10 events in non-volatile db for obtaining history of operations
// This is the maximum number of events that will be stored in-memory in each adapter.

var maxElements = 10

func NewEventBuffer() *EventBuffer {
return &EventBuffer{
circularqueue: make([]interface{}, maxElements),
last: 0,
}
}

// Each event will be first stored in a circular queue to return the last 10 operations whenever a new client connects.
// After that, further events will be pushed to the client.
func (e *EventBuffer) Enqueue(i interface{}) {
e.mx.Lock()
defer e.mx.Unlock()
pos := e.last % len(e.circularqueue)
fmt.Println("pushing data to queue at pos", pos)
e.circularqueue[pos] = i
e.last++
go func() {
for _, ch := range e.clientChannels {
ch <- i
}
}()
}
func (e *EventBuffer) Copy(client chan interface{}) {
e.mx.Lock()
defer e.mx.Unlock()
var events []interface{}
client = make(chan interface{}, 10)
for i := 0; i < e.last%len(e.circularqueue); i++ {
ev := e.circularqueue[i]
events = append(events, ev)
go func(ev interface{}) {
fmt.Println("copying data to new client ", ev)
client <- ev
}(ev)
}
return
}

func (e *EventBuffer) Subscribe(ch chan interface{}) {
e.clmx.Lock()
defer e.clmx.Unlock()
e.clientChannels = append(e.clientChannels, ch)
}

0 comments on commit 68b3a9e

Please sign in to comment.