Skip to content

Commit

Permalink
Merge 0c56a1c into 9677a80
Browse files Browse the repository at this point in the history
  • Loading branch information
lftakakura committed Aug 29, 2019
2 parents 9677a80 + 0c56a1c commit 0f6c820
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions plugins/grpc/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type GRPCForwarder struct {
client pb.GRPCForwarderClient
logger log.FieldLogger
serverAddress string
metadata map[string]interface{}
}

// ForwarderFunc is the type of functions in GRPCForwarder
Expand Down Expand Up @@ -345,26 +346,38 @@ func (g *GRPCForwarder) SchedulerEvent(ctx context.Context, infos, fwdMetadata m

//Forward send room or player status to specified server
func (g *GRPCForwarder) Forward(ctx context.Context, event string, infos, fwdMetadata map[string]interface{}) (status int32, message string, err error) {

// Add forwarder metadata (from maestro config) to request
// Client metadata should take priority
metadata := g.metadata
if metadata != nil {
for k := range fwdMetadata {
metadata[k] = fwdMetadata[k]
}
} else {
metadata = fwdMetadata
}

l := g.logger.WithFields(log.Fields{
"op": "Forward",
"source": "plugin/grpc",
"event": event,
"infos": fmt.Sprintf("%v", infos),
"fwdMetadata": fmt.Sprintf("%v", fwdMetadata),
"fwdMetadata": fmt.Sprintf("%v", metadata),
"serverAddr": g.serverAddress,
})
l.Info("forwarding event")
l.Debug("forwarding event")
f := reflect.ValueOf(g).MethodByName(strings.Title(event))
if !f.IsValid() {
return 500, "", fmt.Errorf("error calling method %s in plugin", event)
}
ret := f.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(infos), reflect.ValueOf(fwdMetadata)})
ret := f.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(infos), reflect.ValueOf(metadata)})
err, ok := ret[2].Interface().(error)
if ok {
l.WithError(err).Error("forward event failed")
return ret[0].Interface().(int32), ret[1].Interface().(string), ret[2].Interface().(error)
}
l.Info("successfully forwarded event")
l.Debug("successfully forwarded event")
return ret[0].Interface().(int32), ret[1].Interface().(string), nil
}

Expand All @@ -387,6 +400,7 @@ func (g *GRPCForwarder) configure() error {
return err
}
g.client = pb.NewGRPCForwarderClient(conn)
g.metadata = g.config.GetStringMap("metadata")
return nil
}

Expand Down

0 comments on commit 0f6c820

Please sign in to comment.