forked from asynkron/protoactor-go
/
endpoint_reader.go
87 lines (74 loc) · 2.11 KB
/
endpoint_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package remote
import (
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/log"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type endpointReader struct {
suspended bool
}
func (s *endpointReader) Connect(ctx context.Context, req *ConnectRequest) (*ConnectResponse, error) {
if s.suspended {
return nil, status.Error(codes.Canceled, "Suspended")
}
return &ConnectResponse{DefaultSerializerId: DefaultSerializerID}, nil
}
func (s *endpointReader) Receive(stream Remoting_ReceiveServer) error {
targets := make([]*actor.PID, 100)
for {
if s.suspended {
time.Sleep(time.Millisecond * 500)
continue
}
batch, err := stream.Recv()
if err != nil {
plog.Debug("EndpointReader failed to read", log.Error(err))
return err
}
//only grow pid lookup if needed
if len(batch.TargetNames) > len(targets) {
targets = make([]*actor.PID, len(batch.TargetNames))
}
for i := 0; i < len(batch.TargetNames); i++ {
targets[i] = actor.NewLocalPID(batch.TargetNames[i])
}
for _, envelope := range batch.Envelopes {
pid := targets[envelope.Target]
message, err := Deserialize(envelope.MessageData, batch.TypeNames[envelope.TypeId], envelope.SerializerId)
if err != nil {
plog.Debug("EndpointReader failed to deserialize", log.Error(err))
return err
}
//if message is system message send it as sysmsg instead of usermsg
sender := envelope.Sender
switch msg := message.(type) {
case *actor.Terminated:
rt := &remoteTerminate{
Watchee: msg.Who,
Watcher: pid,
}
endpointManager.remoteTerminate(rt)
case actor.SystemMessage:
ref, _ := actor.ProcessRegistry.GetLocal(pid.Id)
ref.SendSystemMessage(pid, msg)
default:
var header map[string]string
if envelope.MessageHeader != nil {
header = envelope.MessageHeader.HeaderData
}
localEnvelope := &actor.MessageEnvelope{
Header: header,
Message: message,
Sender: sender,
}
pid.Tell(localEnvelope)
}
}
}
}
func (s *endpointReader) suspend(toSuspend bool) {
s.suspended = toSuspend
}