This repository has been archived by the owner on Apr 15, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.go
104 lines (88 loc) · 2.42 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package remote
import (
"fmt"
"log"
"reflect"
mbridge "github.com/quorumcontrol/messages/v2/build/go/bridge"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/gogo/protobuf/proto"
ptypes "github.com/gogo/protobuf/types"
"github.com/quorumcontrol/tupelo-go-sdk/tracing"
)
type process struct {
pid *actor.PID
gateway *actor.PID
}
func newProcess(pid, gateway *actor.PID) actor.Process {
return &process{
pid: pid,
gateway: gateway,
}
}
// Send user message, implements actor.Process.
func (ref *process) SendUserMessage(pid *actor.PID, message interface{}) {
header, msg, sender := actor.UnwrapEnvelope(message)
wireMsg, ok := msg.(proto.Message)
if !ok {
log.Printf("error sending user message, message doesn't implement proto.Message: %s\n",
reflect.TypeOf(msg))
return
}
sendMessage(ref.gateway, pid, header, wireMsg, sender, -1)
}
func sendMessage(gateway, pid *actor.PID, header actor.ReadonlyMessageHeader, message proto.Message, sender *actor.PID, serializerID int32) {
marshaled, err := ptypes.MarshalAny(message)
if err != nil {
panic(fmt.Errorf("could not marshal message: %v", err))
}
wd := &mbridge.WireDelivery{
Message: marshaled,
Target: toActorPid(pid),
Sender: toActorPid(sender),
}
if header != nil {
wd.Header = header.ToMap()
}
wd.Outgoing = true
if tracing.Enabled {
traceable, ok := message.(tracing.Traceable)
if ok && traceable.Started() {
traceable.NewSpan("process-sendMessage").Finish()
}
}
wrapper := &wireDeliveryWrapper{
WireDelivery: wd,
originalMessage: message,
}
actor.EmptyRootContext.Send(gateway, wrapper)
}
func (ref *process) SendSystemMessage(pid *actor.PID, message interface{}) {
//intercept any Watch messages and direct them to the endpoint manager
switch msg := message.(type) {
case *actor.Watch:
panic("remote watching unsupported")
case *actor.Unwatch:
panic("remote unwatching unsupported")
case proto.Message:
sendMessage(ref.gateway, pid, nil, msg, nil, -1)
default:
log.Printf("error sending system message, not convertible to WireMessage: %s\n",
reflect.TypeOf(message))
return
}
}
func (ref *process) Stop(pid *actor.PID) {
panic("remote stop is unsupported")
}
func toActorPid(a *actor.PID) *mbridge.ActorPID {
if a == nil {
return nil
}
return &mbridge.ActorPID{
Address: a.Address,
Id: a.Id,
}
}
func fromActorPid(a *mbridge.ActorPID) *actor.PID {
return actor.NewPID(a.Address, a.Id)
}