-
Notifications
You must be signed in to change notification settings - Fork 0
/
translator.go
199 lines (168 loc) · 5.22 KB
/
translator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package translator
import (
"context"
"io"
"time"
pb "github.com/krixlion/dev_forum-auth/pkg/grpc/v1"
"github.com/krixlion/dev_forum-lib/logging"
"github.com/krixlion/dev_forum-lib/nulls"
sync "github.com/sasha-s/go-deadlock"
)
type Translator struct {
grpcClient pb.AuthServiceClient
mu *sync.RWMutex // Protects the stream.
stream pb.AuthService_TranslateAccessTokenClient
// Receives signals when a stream is aborted and needs to be renewed.
streamAborted chan struct{}
jobs chan job
logger logging.Logger
config Config
}
type Config struct {
// Duration between stream renewal attempts.
StreamRenewalInterval time.Duration
JobQueueSize int
}
// NewTranslator returns a new, initialized instance of the Translator.
// Run() has to be invoked before use. Logging is disabled by default
// unless a logger option is given.
func NewTranslator(grpcClient pb.AuthServiceClient, config Config, opts ...Option) *Translator {
t := &Translator{
grpcClient: grpcClient,
mu: &sync.RWMutex{},
stream: nil,
streamAborted: make(chan struct{}),
jobs: make(chan job, config.JobQueueSize),
logger: nulls.NullLogger{},
config: config,
}
for _, opt := range opts {
opt.apply(t)
}
return t
}
// Run starts up necessary goroutines for automatic stream renewals
// and job handling. Blocks until given context is cancelled.
// It is intended to be invoked in a separate goroutine.
func (t *Translator) Run(ctx context.Context) {
// Init stream on start.
t.renewStream(ctx)
go t.handleStreamRenewals(ctx)
t.handleJobs(ctx)
}
// TranslateAccessToken takes in an opaqueAccessToken and translates it to an
// encoded JWT token or returns a non-nil error.
func (t *Translator) TranslateAccessToken(opaqueAccessToken string) (string, error) {
job := job{
OpaqueAccessToken: opaqueAccessToken,
ResultC: make(chan result),
}
t.jobs <- job
res := <-job.ResultC
return res.TranslatedAccessToken, res.Err
}
// job contains the request to be made and a channel to which the
// translated token or an error will be sent. Channel should be initialized
// by the caller. Only one result is sent through it, so
// no need for a buffer. Translator will automatically close
// the channel once it sends the result.
type job struct {
OpaqueAccessToken string
ResultC chan result
}
// result contains either a translated token or a non-nil error.
// Always check if the Err is nil and if it is then discard
// the response and handle the error.
type result struct {
TranslatedAccessToken string
Err error
}
// handleJobs blocks until given context is cancelled.
// It reads incoming jobs and executes them, optionally triggering a stream
// renewal on error. It is intended to be invoked in a separate goroutine.
func (t *Translator) handleJobs(ctx context.Context) {
for {
select {
case job := <-t.jobs:
func() {
t.mu.RLock()
defer t.mu.RUnlock()
if err := t.stream.Send(&pb.TranslateAccessTokenRequest{OpaqueAccessToken: job.OpaqueAccessToken}); err != nil {
t.maybeSendRenewStreamSig(err)
job.ResultC <- result{Err: err}
close(job.ResultC)
return
}
resp, err := t.stream.Recv()
t.maybeSendRenewStreamSig(err)
job.ResultC <- result{TranslatedAccessToken: resp.AccessToken, Err: err}
close(job.ResultC)
}()
case <-ctx.Done():
return
}
}
}
// maybeSendRenewStreamSig sends a signal to Translator if
// the following conditions are met:
// - given error is not nil,
// - given error is not io.EOF,
// - Translator is currently not renewing the stream.
//
// Use this func to determine whether the error returned by grpc.ClientStream
// methods indicates that the stream was aborted and needs to be renewed.
func (t *Translator) maybeSendRenewStreamSig(err error) {
if isStreamRenewable(err) {
select {
case t.streamAborted <- struct{}{}:
default:
// Stream is being renewed or is going to be renewed shortly.
// No need to bloat the buffer.
return
}
}
}
// isStreamRenewable returns true if given error is non-nil and not io.EOF.
func isStreamRenewable(err error) bool {
if err == nil {
return false
}
if err == io.EOF {
// Stream was closed naturally and does not need to be renewed.
return false
}
return true
}
// handleStreamRenewals listens for Translator signals
// and renews the stream once a signal is received.
// It blocks until given context is cancelled.
// It is intended to be invoked in a separate goroutine.
func (t *Translator) handleStreamRenewals(ctx context.Context) {
for {
select {
case <-t.streamAborted:
t.renewStream(ctx)
case <-ctx.Done():
return
}
}
}
// renewStream attempts to renew the stream until it succeeds or the context is cancelled.
// Mutex protecting the stream remains locked until this func returns.
func (t *Translator) renewStream(ctx context.Context) {
t.mu.Lock()
defer t.mu.Unlock()
for {
if err := ctx.Err(); err != nil {
return
}
t.logger.Log(ctx, "Renewing the token translation stream")
var err error
t.stream, err = t.grpcClient.TranslateAccessToken(ctx)
if err == nil {
return
}
t.logger.Log(ctx, "Failed to renew the token translation stream", "err", err)
time.Sleep(t.config.StreamRenewalInterval)
}
}