/
gateway.go
3324 lines (3075 loc) · 97.3 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2018-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"bytes"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/url"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
defaultSolicitGatewaysDelay = time.Second
defaultGatewayConnectDelay = time.Second
defaultGatewayReconnectDelay = time.Second
defaultGatewayRecentSubExpiration = 2 * time.Second
defaultGatewayMaxRUnsubBeforeSwitch = 1000
oldGWReplyPrefix = "$GR."
oldGWReplyPrefixLen = len(oldGWReplyPrefix)
oldGWReplyStart = oldGWReplyPrefixLen + 5 // len of prefix above + len of hash (4) + "."
// The new prefix is "_GR_.<cluster>.<server>." where <cluster> is 6 characters
// hash of origin cluster name and <server> is 6 characters hash of origin server pub key.
gwReplyPrefix = "_GR_."
gwReplyPrefixLen = len(gwReplyPrefix)
gwHashLen = 6
gwClusterOffset = gwReplyPrefixLen
gwServerOffset = gwClusterOffset + gwHashLen + 1
gwSubjectOffset = gwServerOffset + gwHashLen + 1
// Gateway connections send PINGs regardless of traffic. The interval is
// either Options.PingInterval or this value, whichever is the smallest.
gwMaxPingInterval = 15 * time.Second
)
var (
gatewayConnectDelay = defaultGatewayConnectDelay
gatewayReconnectDelay = defaultGatewayReconnectDelay
gatewayMaxRUnsubBeforeSwitch = defaultGatewayMaxRUnsubBeforeSwitch
gatewaySolicitDelay = int64(defaultSolicitGatewaysDelay)
gatewayMaxPingInterval = gwMaxPingInterval
)
// Warning when user configures gateway TLS insecure
const gatewayTLSInsecureWarning = "TLS certificate chain and hostname of solicited gateways will not be verified. DO NOT USE IN PRODUCTION!"
// SetGatewaysSolicitDelay sets the initial delay before gateways
// connections are initiated.
// Used by tests.
func SetGatewaysSolicitDelay(delay time.Duration) {
atomic.StoreInt64(&gatewaySolicitDelay, int64(delay))
}
// ResetGatewaysSolicitDelay resets the initial delay before gateways
// connections are initiated to its default values.
// Used by tests.
func ResetGatewaysSolicitDelay() {
atomic.StoreInt64(&gatewaySolicitDelay, int64(defaultSolicitGatewaysDelay))
}
const (
gatewayCmdGossip byte = 1
gatewayCmdAllSubsStart byte = 2
gatewayCmdAllSubsComplete byte = 3
)
// GatewayInterestMode represents an account interest mode for a gateway connection
type GatewayInterestMode byte
// GatewayInterestMode values
const (
// optimistic is the default mode where a cluster will send
// to a gateway unless it is been told that there is no interest
// (this is for plain subscribers only).
Optimistic GatewayInterestMode = iota
// transitioning is when a gateway has to send too many
// no interest on subjects to the remote and decides that it is
// now time to move to modeInterestOnly (this is on a per account
// basis).
Transitioning
// interestOnly means that a cluster sends all it subscriptions
// interest to the gateway, which in return does not send a message
// unless it knows that there is explicit interest.
InterestOnly
)
func (im GatewayInterestMode) String() string {
switch im {
case Optimistic:
return "Optimistic"
case InterestOnly:
return "Interest-Only"
case Transitioning:
return "Transitioning"
default:
return "Unknown"
}
}
var gwDoNotForceInterestOnlyMode bool
// GatewayDoNotForceInterestOnlyMode is used ONLY in tests.
// DO NOT USE in normal code or if you embed the NATS Server.
func GatewayDoNotForceInterestOnlyMode(doNotForce bool) {
gwDoNotForceInterestOnlyMode = doNotForce
}
type srvGateway struct {
totalQSubs int64 //total number of queue subs in all remote gateways (used with atomic operations)
sync.RWMutex
enabled bool // Immutable, true if both a name and port are configured
name string // Name of the Gateway on this server
out map[string]*client // outbound gateways
outo []*client // outbound gateways maintained in an order suitable for sending msgs (currently based on RTT)
in map[uint64]*client // inbound gateways
remotes map[string]*gatewayCfg // Config of remote gateways
URLs refCountedUrlSet // Set of all Gateway URLs in the cluster
URL string // This server gateway URL (after possible random port is resolved)
info *Info // Gateway Info protocol
infoJSON []byte // Marshal'ed Info protocol
runknown bool // Rejects unknown (not configured) gateway connections
replyPfx []byte // Will be "$GNR.<1:reserved>.<8:cluster hash>.<8:server hash>."
// For backward compatibility
oldReplyPfx []byte
oldHash []byte
// We maintain the interest of subjects and queues per account.
// For a given account, entries in the map could be something like this:
// foo.bar {n: 3} // 3 subs on foo.bar
// foo.> {n: 6} // 6 subs on foo.>
// foo bar {n: 1, q: true} // 1 qsub on foo, queue bar
// foo baz {n: 3, q: true} // 3 qsubs on foo, queue baz
pasi struct {
// Protect map since accessed from different go-routine and avoid
// possible race resulting in RS+ being sent before RS- resulting
// in incorrect interest suppression.
// Will use while sending QSubs (on GW connection accept) and when
// switching to the send-all-subs mode.
sync.Mutex
m map[string]map[string]*sitally
}
// This is to track recent subscriptions for a given account
rsubs sync.Map
resolver netResolver // Used to resolve host name before calling net.Dial()
sqbsz int // Max buffer size to send queue subs protocol. Used for testing.
recSubExp time.Duration // For how long do we check if there is a subscription match for a message with reply
// These are used for routing of mapped replies.
sIDHash []byte // Server ID hash (6 bytes)
routesIDByHash sync.Map // Route's server ID is hashed (6 bytes) and stored in this map.
// If a server has its own configuration in the "Gateways" remotes configuration
// we will keep track of the URLs that are defined in the config so they can
// be reported in monitoring.
ownCfgURLs []string
}
// Subject interest tally. Also indicates if the key in the map is a
// queue or not.
type sitally struct {
n int32 // number of subscriptions directly matching
q bool // indicate that this is a queue
}
type gatewayCfg struct {
sync.RWMutex
*RemoteGatewayOpts
hash []byte
oldHash []byte
urls map[string]*url.URL
connAttempts int
tlsName string
implicit bool
varzUpdateURLs bool // Tells monitoring code to update URLs when varz is inspected.
}
// Struct for client's gateway related fields
type gateway struct {
name string
cfg *gatewayCfg
connectURL *url.URL // Needed when sending CONNECT after receiving INFO from remote
outsim *sync.Map // Per-account subject interest (or no-interest) (outbound conn)
insim map[string]*insie // Per-account subject no-interest sent or modeInterestOnly mode (inbound conn)
// This is an outbound GW connection
outbound bool
// Set/check in readLoop without lock. This is to know that an inbound has sent the CONNECT protocol first
connected bool
// Set to true if outbound is to a server that only knows about $GR, not $GNR
useOldPrefix bool
// If true, it indicates that the inbound side will switch any account to
// interest-only mode "immediately", so the outbound should disregard
// the optimistic mode when checking for interest.
interestOnlyMode bool
}
// Outbound subject interest entry.
type outsie struct {
sync.RWMutex
// Indicate that all subs should be stored. This is
// set to true when receiving the command from the
// remote that we are about to receive all its subs.
mode GatewayInterestMode
// If not nil, used for no-interest for plain subs.
// If a subject is present in this map, it means that
// the remote is not interested in that subject.
// When we have received the command that says that
// the remote has sent all its subs, this is set to nil.
ni map[string]struct{}
// Contains queue subscriptions when in optimistic mode,
// and all subs when pk is > 0.
sl *Sublist
// Number of queue subs
qsubs int
}
// Inbound subject interest entry.
// If `ni` is not nil, it stores the subjects for which an
// RS- was sent to the remote gateway. When a subscription
// is created, this is used to know if we need to send
// an RS+ to clear the no-interest in the remote.
// When an account is switched to modeInterestOnly (we send
// all subs of an account to the remote), then `ni` is nil and
// when all subs have been sent, mode is set to modeInterestOnly
type insie struct {
ni map[string]struct{} // Record if RS- was sent for given subject
mode GatewayInterestMode
}
type gwReplyMap struct {
ms string
exp int64
}
type gwReplyMapping struct {
// Indicate if we should check the map or not. Since checking the map is done
// when processing inbound messages and requires the lock we want to
// check only when needed. This is set/get using atomic, so needs to
// be memory aligned.
check int32
// To keep track of gateway replies mapping
mapping map[string]*gwReplyMap
}
// Returns the corresponding gw routed subject, and `true` to indicate that a
// mapping was found. If no entry is found, the passed subject is returned
// as-is and `false` is returned to indicate that no mapping was found.
// Caller is responsible to ensure the locking.
func (g *gwReplyMapping) get(subject []byte) ([]byte, bool) {
rm, ok := g.mapping[string(subject)]
if !ok {
return subject, false
}
subj := []byte(rm.ms)
return subj, true
}
// clone returns a deep copy of the RemoteGatewayOpts object
func (r *RemoteGatewayOpts) clone() *RemoteGatewayOpts {
if r == nil {
return nil
}
clone := &RemoteGatewayOpts{
Name: r.Name,
URLs: deepCopyURLs(r.URLs),
}
if r.TLSConfig != nil {
clone.TLSConfig = r.TLSConfig.Clone()
clone.TLSTimeout = r.TLSTimeout
}
return clone
}
// Ensure that gateway is properly configured.
func validateGatewayOptions(o *Options) error {
if o.Gateway.Name == "" && o.Gateway.Port == 0 {
return nil
}
if o.Gateway.Name == "" {
return fmt.Errorf("gateway has no name")
}
if o.Gateway.Port == 0 {
return fmt.Errorf("gateway %q has no port specified (select -1 for random port)", o.Gateway.Name)
}
for i, g := range o.Gateway.Gateways {
if g.Name == "" {
return fmt.Errorf("gateway in the list %d has no name", i)
}
if len(g.URLs) == 0 {
return fmt.Errorf("gateway %q has no URL", g.Name)
}
}
if err := validatePinnedCerts(o.Gateway.TLSPinnedCerts); err != nil {
return fmt.Errorf("gateway %q: %v", o.Gateway.Name, err)
}
return nil
}
// Computes a hash of 6 characters for the name.
// This will be used for routing of replies.
func getGWHash(name string) []byte {
return []byte(getHashSize(name, gwHashLen))
}
func getOldHash(name string) []byte {
sha := sha256.New()
sha.Write([]byte(name))
fullHash := []byte(fmt.Sprintf("%x", sha.Sum(nil)))
return fullHash[:4]
}
// Initialize the s.gateway structure. We do this even if the server
// does not have a gateway configured. In some part of the code, the
// server will check the number of outbound gateways, etc.. and so
// we don't have to check if s.gateway is nil or not.
func (s *Server) newGateway(opts *Options) error {
gateway := &srvGateway{
name: opts.Gateway.Name,
out: make(map[string]*client),
outo: make([]*client, 0, 4),
in: make(map[uint64]*client),
remotes: make(map[string]*gatewayCfg),
URLs: make(refCountedUrlSet),
resolver: opts.Gateway.resolver,
runknown: opts.Gateway.RejectUnknown,
oldHash: getOldHash(opts.Gateway.Name),
}
gateway.Lock()
defer gateway.Unlock()
gateway.sIDHash = getGWHash(s.info.ID)
clusterHash := getGWHash(opts.Gateway.Name)
prefix := make([]byte, 0, gwSubjectOffset)
prefix = append(prefix, gwReplyPrefix...)
prefix = append(prefix, clusterHash...)
prefix = append(prefix, '.')
prefix = append(prefix, gateway.sIDHash...)
prefix = append(prefix, '.')
gateway.replyPfx = prefix
prefix = make([]byte, 0, oldGWReplyStart)
prefix = append(prefix, oldGWReplyPrefix...)
prefix = append(prefix, gateway.oldHash...)
prefix = append(prefix, '.')
gateway.oldReplyPfx = prefix
gateway.pasi.m = make(map[string]map[string]*sitally)
if gateway.resolver == nil {
gateway.resolver = netResolver(net.DefaultResolver)
}
// Create remote gateways
for _, rgo := range opts.Gateway.Gateways {
// Ignore if there is a remote gateway with our name.
if rgo.Name == gateway.name {
gateway.ownCfgURLs = getURLsAsString(rgo.URLs)
continue
}
cfg := &gatewayCfg{
RemoteGatewayOpts: rgo.clone(),
hash: getGWHash(rgo.Name),
oldHash: getOldHash(rgo.Name),
urls: make(map[string]*url.URL, len(rgo.URLs)),
}
if opts.Gateway.TLSConfig != nil && cfg.TLSConfig == nil {
cfg.TLSConfig = opts.Gateway.TLSConfig.Clone()
}
if cfg.TLSTimeout == 0 {
cfg.TLSTimeout = opts.Gateway.TLSTimeout
}
for _, u := range rgo.URLs {
// For TLS, look for a hostname that we can use for TLSConfig.ServerName
cfg.saveTLSHostname(u)
cfg.urls[u.Host] = u
}
gateway.remotes[cfg.Name] = cfg
}
gateway.sqbsz = opts.Gateway.sendQSubsBufSize
if gateway.sqbsz == 0 {
gateway.sqbsz = maxBufSize
}
gateway.recSubExp = defaultGatewayRecentSubExpiration
gateway.enabled = opts.Gateway.Name != "" && opts.Gateway.Port != 0
s.gateway = gateway
return nil
}
// Update remote gateways TLS configurations after a config reload.
func (g *srvGateway) updateRemotesTLSConfig(opts *Options) {
g.Lock()
defer g.Unlock()
for _, ro := range opts.Gateway.Gateways {
if ro.Name == g.name {
continue
}
if cfg, ok := g.remotes[ro.Name]; ok {
cfg.Lock()
// If TLS config is in remote, use that one, otherwise,
// use the TLS config from the main block.
if ro.TLSConfig != nil {
cfg.TLSConfig = ro.TLSConfig.Clone()
} else if opts.Gateway.TLSConfig != nil {
cfg.TLSConfig = opts.Gateway.TLSConfig.Clone()
}
// Ensure that OCSP callbacks are always setup after a reload if needed.
mustStaple := opts.OCSPConfig != nil && opts.OCSPConfig.Mode == OCSPModeAlways
if mustStaple && opts.Gateway.TLSConfig != nil {
clientCB := opts.Gateway.TLSConfig.GetClientCertificate
verifyCB := opts.Gateway.TLSConfig.VerifyConnection
if mustStaple && cfg.TLSConfig != nil {
if clientCB != nil && cfg.TLSConfig.GetClientCertificate == nil {
cfg.TLSConfig.GetClientCertificate = clientCB
}
if verifyCB != nil && cfg.TLSConfig.VerifyConnection == nil {
cfg.TLSConfig.VerifyConnection = verifyCB
}
}
}
cfg.Unlock()
}
}
}
// Returns if this server rejects connections from gateways that are not
// explicitly configured.
func (g *srvGateway) rejectUnknown() bool {
g.RLock()
reject := g.runknown
g.RUnlock()
return reject
}
// Starts the gateways accept loop and solicit explicit gateways
// after an initial delay. This delay is meant to give a chance to
// the cluster to form and this server gathers gateway URLs for this
// cluster in order to send that as part of the connect/info process.
func (s *Server) startGateways() {
s.startGatewayAcceptLoop()
// Delay start of creation of gateways to give a chance
// to the local cluster to form.
s.startGoRoutine(func() {
defer s.grWG.Done()
dur := s.getOpts().gatewaysSolicitDelay
if dur == 0 {
dur = time.Duration(atomic.LoadInt64(&gatewaySolicitDelay))
}
select {
case <-time.After(dur):
s.solicitGateways()
case <-s.quitCh:
return
}
})
}
// This starts the gateway accept loop in a go routine, unless it
// is detected that the server has already been shutdown.
func (s *Server) startGatewayAcceptLoop() {
if s.isShuttingDown() {
return
}
// Snapshot server options.
opts := s.getOpts()
port := opts.Gateway.Port
if port == -1 {
port = 0
}
s.mu.Lock()
hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.gatewayListenerErr = e
if e != nil {
s.mu.Unlock()
s.Fatalf("Error listening on gateway port: %d - %v", opts.Gateway.Port, e)
return
}
s.Noticef("Gateway name is %s", s.getGatewayName())
s.Noticef("Listening for gateways connections on %s",
net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
tlsReq := opts.Gateway.TLSConfig != nil
authRequired := opts.Gateway.Username != ""
info := &Info{
ID: s.info.ID,
Name: opts.ServerName,
Version: s.info.Version,
AuthRequired: authRequired,
TLSRequired: tlsReq,
TLSVerify: tlsReq,
MaxPayload: s.info.MaxPayload,
Gateway: opts.Gateway.Name,
GatewayNRP: true,
Headers: s.supportsHeaders(),
}
// Unless in some tests we want to keep the old behavior, we are now
// (since v2.9.0) indicate that this server will switch all accounts
// to InterestOnly mode when accepting an inbound or when a new
// account is fetched.
if !gwDoNotForceInterestOnlyMode {
info.GatewayIOM = true
}
// If we have selected a random port...
if port == 0 {
// Write resolved port back to options.
opts.Gateway.Port = l.Addr().(*net.TCPAddr).Port
}
// Possibly override Host/Port based on Gateway.Advertise
if err := s.setGatewayInfoHostPort(info, opts); err != nil {
s.Fatalf("Error setting gateway INFO with Gateway.Advertise value of %s, err=%v", opts.Gateway.Advertise, err)
l.Close()
s.mu.Unlock()
return
}
// Setup state that can enable shutdown
s.gatewayListener = l
// Warn if insecure is configured in the main Gateway configuration
// or any of the RemoteGateway's. This means that we need to check
// remotes even if TLS would not be configured for the accept.
warn := tlsReq && opts.Gateway.TLSConfig.InsecureSkipVerify
if !warn {
for _, g := range opts.Gateway.Gateways {
if g.TLSConfig != nil && g.TLSConfig.InsecureSkipVerify {
warn = true
break
}
}
}
if warn {
s.Warnf(gatewayTLSInsecureWarning)
}
go s.acceptConnections(l, "Gateway", func(conn net.Conn) { s.createGateway(nil, nil, conn) }, nil)
s.mu.Unlock()
}
// Similar to setInfoHostPortAndGenerateJSON, but for gatewayInfo.
func (s *Server) setGatewayInfoHostPort(info *Info, o *Options) error {
gw := s.gateway
gw.Lock()
defer gw.Unlock()
gw.URLs.removeUrl(gw.URL)
if o.Gateway.Advertise != "" {
advHost, advPort, err := parseHostPort(o.Gateway.Advertise, o.Gateway.Port)
if err != nil {
return err
}
info.Host = advHost
info.Port = advPort
} else {
info.Host = o.Gateway.Host
info.Port = o.Gateway.Port
// If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
// This will return at most 1 IP.
hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(info.Host, false)
if err != nil {
return err
}
if hostIsIPAny {
if len(ips) == 0 {
// TODO(ik): Should we fail here (prevent starting)? If not, we
// are going to "advertise" the 0.0.0.0:<port> url, which means
// that remote are going to try to connect to 0.0.0.0:<port>,
// which means a connect to loopback address, which is going
// to fail with either TLS error, conn refused if the remote
// is using different gateway port than this one, or error
// saying that it tried to connect to itself.
s.Errorf("Could not find any non-local IP for gateway %q with listen specification %q",
gw.name, info.Host)
} else {
// Take the first from the list...
info.Host = ips[0]
}
}
}
gw.URL = net.JoinHostPort(info.Host, strconv.Itoa(info.Port))
if o.Gateway.Advertise != "" {
s.Noticef("Advertise address for gateway %q is set to %s", gw.name, gw.URL)
} else {
s.Noticef("Address for gateway %q is %s", gw.name, gw.URL)
}
gw.URLs[gw.URL]++
gw.info = info
info.GatewayURL = gw.URL
// (re)generate the gatewayInfoJSON byte array
gw.generateInfoJSON()
return nil
}
// Generates the Gateway INFO protocol.
// The gateway lock is held on entry
func (g *srvGateway) generateInfoJSON() {
// We could be here when processing a route INFO that has a gateway URL,
// but this server is not configured for gateways, so simply ignore here.
// The configuration mismatch is reported somewhere else.
if !g.enabled || g.info == nil {
return
}
g.info.GatewayURLs = g.URLs.getAsStringSlice()
b, err := json.Marshal(g.info)
if err != nil {
panic(err)
}
g.infoJSON = []byte(fmt.Sprintf(InfoProto, b))
}
// Goes through the list of registered gateways and try to connect to those.
// The list (remotes) is initially containing the explicit remote gateways,
// but the list is augmented with any implicit (discovered) gateway. Therefore,
// this function only solicit explicit ones.
func (s *Server) solicitGateways() {
gw := s.gateway
gw.RLock()
defer gw.RUnlock()
for _, cfg := range gw.remotes {
// Since we delay the creation of gateways, it is
// possible that server starts to receive inbound from
// other clusters and in turn create outbounds. So here
// we create only the ones that are configured.
if !cfg.isImplicit() {
cfg := cfg // Create new instance for the goroutine.
s.startGoRoutine(func() {
s.solicitGateway(cfg, true)
s.grWG.Done()
})
}
}
}
// Reconnect to the gateway after a little wait period. For explicit
// gateways, we also wait for the default reconnect time.
func (s *Server) reconnectGateway(cfg *gatewayCfg) {
defer s.grWG.Done()
delay := time.Duration(rand.Intn(100)) * time.Millisecond
if !cfg.isImplicit() {
delay += gatewayReconnectDelay
}
select {
case <-time.After(delay):
case <-s.quitCh:
return
}
s.solicitGateway(cfg, false)
}
// This function will loop trying to connect to any URL attached
// to the given Gateway. It will return once a connection has been created.
func (s *Server) solicitGateway(cfg *gatewayCfg, firstConnect bool) {
var (
opts = s.getOpts()
isImplicit = cfg.isImplicit()
attempts int
typeStr string
)
if isImplicit {
typeStr = "implicit"
} else {
typeStr = "explicit"
}
const connFmt = "Connecting to %s gateway %q (%s) at %s (attempt %v)"
const connErrFmt = "Error connecting to %s gateway %q (%s) at %s (attempt %v): %v"
for s.isRunning() {
urls := cfg.getURLs()
if len(urls) == 0 {
break
}
attempts++
report := s.shouldReportConnectErr(firstConnect, attempts)
// Iteration is random
for _, u := range urls {
address, err := s.getRandomIP(s.gateway.resolver, u.Host, nil)
if err != nil {
s.Errorf("Error getting IP for %s gateway %q (%s): %v", typeStr, cfg.Name, u.Host, err)
continue
}
if report {
s.Noticef(connFmt, typeStr, cfg.Name, u.Host, address, attempts)
} else {
s.Debugf(connFmt, typeStr, cfg.Name, u.Host, address, attempts)
}
conn, err := natsDialTimeout("tcp", address, DEFAULT_ROUTE_DIAL)
if err == nil {
// We could connect, create the gateway connection and return.
s.createGateway(cfg, u, conn)
return
}
if report {
s.Errorf(connErrFmt, typeStr, cfg.Name, u.Host, address, attempts, err)
} else {
s.Debugf(connErrFmt, typeStr, cfg.Name, u.Host, address, attempts, err)
}
// Break this loop if server is being shutdown...
if !s.isRunning() {
break
}
}
if isImplicit {
if opts.Gateway.ConnectRetries == 0 || attempts > opts.Gateway.ConnectRetries {
s.gateway.Lock()
// We could have just accepted an inbound for this remote gateway.
// So if there is an inbound, let's try again to connect.
if s.gateway.hasInbound(cfg.Name) {
s.gateway.Unlock()
continue
}
delete(s.gateway.remotes, cfg.Name)
s.gateway.Unlock()
return
}
}
select {
case <-s.quitCh:
return
case <-time.After(gatewayConnectDelay):
continue
}
}
}
// Returns true if there is an inbound for the given `name`.
// Lock held on entry.
func (g *srvGateway) hasInbound(name string) bool {
for _, ig := range g.in {
ig.mu.Lock()
igname := ig.gw.name
ig.mu.Unlock()
if igname == name {
return true
}
}
return false
}
// Called when a gateway connection is either accepted or solicited.
// If accepted, the gateway is marked as inbound.
// If solicited, the gateway is marked as outbound.
func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
// Snapshot server options.
opts := s.getOpts()
now := time.Now()
c := &client{srv: s, nc: conn, start: now, last: now, kind: GATEWAY}
// Are we creating the gateway based on the configuration
solicit := cfg != nil
var tlsRequired bool
s.gateway.RLock()
infoJSON := s.gateway.infoJSON
s.gateway.RUnlock()
// Perform some initialization under the client lock
c.mu.Lock()
c.initClient()
c.gw = &gateway{}
if solicit {
// This is an outbound gateway connection
cfg.RLock()
tlsRequired = cfg.TLSConfig != nil
cfgName := cfg.Name
cfg.RUnlock()
c.gw.outbound = true
c.gw.name = cfgName
c.gw.cfg = cfg
cfg.bumpConnAttempts()
// Since we are delaying the connect until after receiving
// the remote's INFO protocol, save the URL we need to connect to.
c.gw.connectURL = url
c.Noticef("Creating outbound gateway connection to %q", cfgName)
} else {
c.flags.set(expectConnect)
// Inbound gateway connection
c.Noticef("Processing inbound gateway connection")
// Check if TLS is required for inbound GW connections.
tlsRequired = opts.Gateway.TLSConfig != nil
// We expect a CONNECT from the accepted connection.
c.setAuthTimer(secondsToDuration(opts.Gateway.AuthTimeout))
}
// Check for TLS
if tlsRequired {
var tlsConfig *tls.Config
var tlsName string
var timeout float64
if solicit {
var (
mustStaple = opts.OCSPConfig != nil && opts.OCSPConfig.Mode == OCSPModeAlways
clientCB func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
verifyCB func(tls.ConnectionState) error
)
// Snapshot callbacks for OCSP outside an ongoing reload which might be happening.
if mustStaple {
s.reloadMu.RLock()
s.optsMu.RLock()
clientCB = s.opts.Gateway.TLSConfig.GetClientCertificate
verifyCB = s.opts.Gateway.TLSConfig.VerifyConnection
s.optsMu.RUnlock()
s.reloadMu.RUnlock()
}
cfg.RLock()
tlsName = cfg.tlsName
tlsConfig = cfg.TLSConfig.Clone()
timeout = cfg.TLSTimeout
// Ensure that OCSP callbacks are always setup on gateway reconnect when OCSP policy is set to always.
if mustStaple {
if clientCB != nil && tlsConfig.GetClientCertificate == nil {
tlsConfig.GetClientCertificate = clientCB
}
if verifyCB != nil && tlsConfig.VerifyConnection == nil {
tlsConfig.VerifyConnection = verifyCB
}
}
cfg.RUnlock()
} else {
tlsConfig = opts.Gateway.TLSConfig
timeout = opts.Gateway.TLSTimeout
}
// Perform (either server or client side) TLS handshake.
if resetTLSName, err := c.doTLSHandshake("gateway", solicit, url, tlsConfig, tlsName, timeout, opts.Gateway.TLSPinnedCerts); err != nil {
if resetTLSName {
cfg.Lock()
cfg.tlsName = _EMPTY_
cfg.Unlock()
}
c.mu.Unlock()
return
}
}
// Do final client initialization
c.in.pacache = make(map[string]*perAccountCache)
if solicit {
// This is an outbound gateway connection
c.gw.outsim = &sync.Map{}
} else {
// Inbound gateway connection
c.gw.insim = make(map[string]*insie)
}
// Register in temp map for now until gateway properly registered
// in out or in gateways.
if !s.addToTempClients(c.cid, c) {
c.mu.Unlock()
c.closeConnection(ServerShutdown)
return
}
// Only send if we accept a connection. Will send CONNECT+INFO as an
// outbound only after processing peer's INFO protocol.
if !solicit {
c.enqueueProto(infoJSON)
}
// Spin up the read loop.
s.startGoRoutine(func() { c.readLoop(nil) })
// Spin up the write loop.
s.startGoRoutine(func() { c.writeLoop() })
if tlsRequired {
c.Debugf("TLS handshake complete")
cs := c.nc.(*tls.Conn).ConnectionState()
c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tlsCipher(cs.CipherSuite))
}
c.mu.Unlock()
// Announce ourselves again to new connections.
if solicit && s.EventsEnabled() {
s.sendStatszUpdate()
}
}
// Builds and sends the CONNECT protocol for a gateway.
// Client lock held on entry.
func (c *client) sendGatewayConnect(opts *Options) {
// FIXME: This can race with updateRemotesTLSConfig
tlsRequired := c.gw.cfg.TLSConfig != nil
url := c.gw.connectURL
c.gw.connectURL = nil
var user, pass string
if userInfo := url.User; userInfo != nil {
user = userInfo.Username()
pass, _ = userInfo.Password()
} else if opts != nil {
user = opts.Gateway.Username
pass = opts.Gateway.Password
}
cinfo := connectInfo{
Verbose: false,
Pedantic: false,
User: user,
Pass: pass,
TLS: tlsRequired,
Name: c.srv.info.ID,
Gateway: c.srv.gateway.name,
}
b, err := json.Marshal(cinfo)
if err != nil {
panic(err)
}
c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
}
// Process the CONNECT protocol from a gateway connection.
// Returns an error to the connection if the CONNECT is not from a gateway
// (for instance a client or route connecting to the gateway port), or
// if the destination does not match the gateway name of this server.
//
// <Invoked from inbound connection's readLoop>
func (c *client) processGatewayConnect(arg []byte) error {
connect := &connectInfo{}
if err := json.Unmarshal(arg, connect); err != nil {
return err
}
// Coming from a client or a route, reject
if connect.Gateway == "" {
c.sendErrAndErr(ErrClientOrRouteConnectedToGatewayPort.Error())
c.closeConnection(WrongPort)
return ErrClientOrRouteConnectedToGatewayPort
}
c.mu.Lock()
s := c.srv
c.mu.Unlock()
// If we reject unknown gateways, make sure we have it configured,
// otherwise return an error.
if s.gateway.rejectUnknown() && s.getRemoteGateway(connect.Gateway) == nil {
c.Errorf("Rejecting connection from gateway %q", connect.Gateway)
c.sendErr(fmt.Sprintf("Connection to gateway %q rejected", s.getGatewayName()))
c.closeConnection(WrongGateway)
return ErrWrongGateway
}
c.mu.Lock()
c.gw.connected = true
// Set the Ping timer after sending connect and info.
c.setFirstPingTimer()
c.mu.Unlock()
return nil
}
// Process the INFO protocol from a gateway connection.
//
// If the gateway connection is an outbound (this server initiated the connection),
// this function checks that the incoming INFO contains the Gateway field. If empty,
// it means that this is a response from an older server or that this server connected
// to the wrong port.
// The outbound gateway may also receive a gossip INFO protocol from the remote gateway,
// indicating other gateways that the remote knows about. This server will try to connect
// to those gateways (if not explicitly configured or already implicitly connected).
// In both cases (explicit or implicit), the local cluster is notified about the existence
// of this new gateway. This allows servers in the cluster to ensure that they have an