/
partition_actor.go
144 lines (127 loc) · 3.88 KB
/
partition_actor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package cluster
import (
"log"
"time"
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/AsynkronIT/protoactor-go/remote"
)
var (
kindPIDMap map[string]*actor.PID
)
func subscribePartitionKindsToEventStream() {
actor.EventStream.Subscribe(func(m interface{}) {
if mse, ok := m.(MemberStatusEvent); ok {
for _, k := range mse.GetKinds() {
kindPID := kindPIDMap[k]
if kindPID != nil {
kindPID.Tell(m)
}
}
}
})
}
func spawnPartitionActor(kind string) *actor.PID {
partitionPid := actor.SpawnNamed(actor.FromProducer(newPartitionActor(kind)), "#partition-"+kind)
return partitionPid
}
func partitionForKind(address, kind string) *actor.PID {
pid := actor.NewPID(address, "#partition-"+kind)
return pid
}
func newPartitionActor(kind string) actor.Producer {
return func() actor.Actor {
return &partitionActor{
partition: make(map[string]*actor.PID),
kind: kind,
}
}
}
type partitionActor struct {
partition map[string]*actor.PID //actor/grain name to PID
kind string
}
func (state *partitionActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
log.Printf("[CLUSTER] Started %v", context.Self().Id)
case *remote.ActorPidRequest:
state.spawn(msg, context)
case *MemberJoinedEvent:
state.memberJoined(msg)
case *MemberRejoinedEvent:
state.memberRejoined(msg)
case *MemberLeftEvent:
state.memberLeft(msg)
case *MemberAvailableEvent:
log.Printf("[CLUSTER] Node Available %v", msg.Name())
case *MemberUnavailableEvent:
log.Printf("[CLUSTER] Node Unavailable %v", msg.Name())
case *TakeOwnership:
state.takeOwnership(msg)
default:
log.Printf("[CLUSTER] Partition got unknown message %+v", msg)
}
}
func (state *partitionActor) spawn(msg *remote.ActorPidRequest, context actor.Context) {
//TODO: make this async
pid := state.partition[msg.Name]
if pid == nil {
//get a random node
random := getRandomActivator(msg.Kind)
var err error
pid, err = remote.SpawnNamed(random, msg.Name, msg.Kind, 5*time.Second)
if err != nil {
log.Printf("[CLUSTER] Partition failed to spawn '%v' of kind '%v' on address '%v'", msg.Name, msg.Kind, random)
return
}
state.partition[msg.Name] = pid
}
response := &remote.ActorPidResponse{
Pid: pid,
}
context.Respond(response)
}
func (state *partitionActor) memberRejoined(msg *MemberRejoinedEvent) {
log.Printf("[CLUSTER] Node Rejoined %v", msg.Name())
for actorID, pid := range state.partition {
//if the mapped PID is on the address that left, forget it
if pid.Address == msg.Name() {
// log.Printf("[CLUSTER] Forgetting '%v' - '%v'", actorID, msg.Name())
delete(state.partition, actorID)
}
}
}
func (state *partitionActor) memberLeft(msg *MemberLeftEvent) {
log.Printf("[CLUSTER] Node Left %v", msg.Name())
for actorID, pid := range state.partition {
//if the mapped PID is on the address that left, forget it
if pid.Address == msg.Name() {
// log.Printf("[CLUSTER] Forgetting '%v' - '%v'", actorID, msg.Name())
delete(state.partition, actorID)
}
}
}
func (state *partitionActor) memberJoined(msg *MemberJoinedEvent) {
log.Printf("[CLUSTER] Node Joined %v", msg.Name())
for actorID := range state.partition {
address := getNode(actorID, state.kind)
if address != actor.ProcessRegistry.Address {
state.transferOwnership(actorID, address)
}
}
}
func (state *partitionActor) transferOwnership(actorID string, address string) {
// log.Printf("[CLUSTER] Giving ownership of %v to Node %v", actorID, address)
pid := state.partition[actorID]
owner := partitionForKind(address, state.kind)
owner.Tell(&TakeOwnership{
Pid: pid,
Name: actorID,
})
//we can safely delete this entry as the consisntent hash no longer points to us
delete(state.partition, actorID)
}
func (state *partitionActor) takeOwnership(msg *TakeOwnership) {
// log.Printf("[CLUSTER] Took ownerhip of %v", msg.Pid)
state.partition[msg.Name] = msg.Pid
}