/
characterservice.go
173 lines (145 loc) · 4.3 KB
/
characterservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package services
import (
"errors"
"io"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/soupstore/coda-world/simulation"
"github.com/soupstore/coda-world/simulation/model"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
var (
// ErrMalformedCharacterID means that the client connected with a malformed or missing
// character ID in the metadata.
ErrMalformedCharacterID = errors.New("character ID in metadata was missing or malformed")
// ErrUnknownEventType is returned when an unknown type is dispatched to a characters event queue
ErrUnknownEventType = errors.New("unknown event type")
errConnectionEnded = errors.New("connection ended")
)
// CharacterService is a GRPC service for controlling characters.
type CharacterService struct {
controller simulation.CharacterController
logger *zap.Logger
}
// NewCharacterService returns a pointer to a character service and sets the character controller.
func NewCharacterService(controller simulation.CharacterController, logger *zap.Logger) *CharacterService {
return &CharacterService{controller, logger}
}
// Subscribe is the handler for the bidrirectional GRPC stream of commands and events.
func (s *CharacterService) Subscribe(stream Character_SubscribeServer) error {
var err error
s.logger.Info("Player connected")
// get characterID from metadata
characterID, err := s.extractCharacterID(stream)
if err != nil {
return err
}
// wake up the character, and it put it back to sleep when the controller disconnects
events, err := s.controller.WakeUpCharacter(characterID)
if err != nil {
return err
}
defer s.controller.SleepCharacter(characterID)
// start listening for commands and events
quit := make(chan struct{})
var g errgroup.Group
g.Go(s.listenForCommands(stream, characterID, quit))
g.Go(s.sendEvents(stream, events, quit))
err = g.Wait()
if err != nil && err != errConnectionEnded {
s.logger.Error(err.Error())
}
s.logger.Info("Player disconnected")
return err
}
// extractCharacterID gets the id of the character the stream client wants to control
// the id is stored in the metadata inside the stream context
func (s *CharacterService) extractCharacterID(stream Character_SubscribeServer) (model.CharacterID, error) {
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return 0, ErrMalformedCharacterID
}
if len(md["characterid"]) != 1 {
return 0, ErrMalformedCharacterID
}
characterIDint, err := strconv.Atoi(md["characterid"][0])
if err != nil {
return 0, ErrMalformedCharacterID
}
return model.CharacterID(characterIDint), nil
}
func (s *CharacterService) listenForCommands(stream Character_SubscribeServer, characterID model.CharacterID, quit chan<- struct{}) func() error {
return func() error {
for {
command, err := stream.Recv()
// client disconnected
if err == io.EOF {
quit <- struct{}{}
return errConnectionEnded
}
grpcStatus, ok := status.FromError(err)
if ok {
switch grpcStatus.Code() {
case codes.Canceled:
quit <- struct{}{}
return errConnectionEnded
}
}
// unknown error
if err != nil {
quit <- struct{}{}
return err
}
err = s.handleCommand(characterID, command)
if err != nil {
return err
}
}
}
}
func (s *CharacterService) sendEvents(stream Character_SubscribeServer, events <-chan interface{}, quit <-chan struct{}) func() error {
return func() error {
for {
select {
case <-quit:
return nil
case event, more := <-events:
if !more {
// TODO: log warning
return nil
}
// parse event
eventMessage, err := buildEventMessage(event)
if err != nil {
return err
}
if eventMessage == nil {
continue
}
// post the event over the stream
if err = stream.Send(eventMessage); err != nil {
return err
}
}
}
}
}
func (s *CharacterService) handleCommand(characterID model.CharacterID, cmd *CommandMessage) error {
// TODO: address the concurrency issues with this approach
switch cmd.Type {
case CommandType_CmdLook:
s.controller.Look(characterID)
case CommandType_CmdSay:
var msg SayCommand
err := proto.Unmarshal(cmd.Payload, &msg)
if err != nil {
return err
}
s.controller.Say(characterID, msg.Content)
}
return nil
}