-
Notifications
You must be signed in to change notification settings - Fork 256
/
tun.go
3080 lines (2499 loc) · 97.2 KB
/
tun.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2017, Psiphon Inc.
* All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
Package tun is an IP packet tunnel server and client. It supports tunneling
both IPv4 and IPv6.
......................................................... .-,( ),-.
. [server] .-----. . .-( )-.
. | NIC |<---->( Internet )
. ....................................... '-----' . '-( ).-'
. . [packet tunnel daemon] . ^ . '-.( ).-'
. . . | .
. . ........................... . | .
. . . [session] . . NAT .
. . . . . | .
. . . . . v .
. . . . . .---. .
. . . . . | t | .
. . . . . | u | .
. . . .---. . .---. . | n | .
. . . | q | . | d | . | | .
. . . | u | . | e | . | d | .
. . . .------| e |<-----| m |<---------| e | .
. . . | | u | . | u | . | v | .
. . . | | e | . | x | . | i | .
. . . rewrite '---' . '---' . | c | .
. . . | . . | e | .
. . . v . . '---' .
. . . .---------. . . ^ .
. . . | channel |--rewrite--------------------' .
. . . '---------' . . .
. . ...........^............... . .
. .............|......................... .
...............|.........................................
|
| (typically via Internet)
|
...............|.................
. [client] | .
. | .
. .............|............... .
. . v . .
. . .---------. . .
. . | channel | . .
. . '---------' . .
. . ^ . .
. .............|............... .
. v .
. .------------. .
. | tun device | .
. '------------' .
.................................
The client relays IP packets between a local tun device and a channel, which
is a transport to the server. In Psiphon, the channel will be an SSH channel
within an SSH connection to a Psiphon server.
The server relays packets between each client and its own tun device. The
server tun device is NATed to the Internet via an external network interface.
In this way, client traffic is tunneled and will egress from the server host.
Similar to a typical VPN, IP addresses are assigned to each client. Unlike
a typical VPN, the assignment is not transmitted to the client. Instead, the
server transparently rewrites the source addresses of client packets to
the assigned IP address. The server also rewrites the destination address of
certain DNS packets. The purpose of this is to allow clients to reconnect
to different servers without having to tear down or change their local
network configuration. Clients may configure their local tun device with an
arbitrary IP address and a static DNS resolver address.
The server uses the 24-bit 10.0.0.0/8 IPv4 private address space to maximize
the number of addresses available, due to Psiphon client churn and minimum
address lease time constraints. For IPv6, a 24-bit unique local space is used.
When a client is allocated addresses, a unique, unused 24-bit "index" is
reserved/leased. This index maps to and from IPv4 and IPv6 private addresses.
The server multiplexes all client packets into a single tun device. When a
packet is read, the destination address is used to map the packet back to the
correct index, which maps back to the client.
The server maintains client "sessions". A session maintains client IP
address state and effectively holds the lease on assigned addresses. If a
client is disconnected and quickly reconnects, it will resume its previous
session, retaining its IP address and network connection states. Idle
sessions with no client connection will eventually expire.
Packet count and bytes transferred metrics are logged for each client session.
The server integrates with and enforces Psiphon traffic rules and logging
facilities. The server parses and validates packets. Client-to-client packets
are not permitted. Only global unicast packets are permitted. Only TCP and UDP
packets are permitted. The client also filters out, before sending, packets
that the server won't route.
Certain aspects of packet tunneling are outside the scope of this package;
e.g, the Psiphon client and server are responsible for establishing an SSH
channel and negotiating the correct MTU and DNS settings. The Psiphon
server will call Server.ClientConnected when a client connects and establishes
a packet tunnel channel; and Server.ClientDisconnected when the client closes
the channel and/or disconnects.
*/
package tun
import (
"context"
"encoding/binary"
"fmt"
"io"
"math/rand"
"net"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/errors"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/monotime"
"github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng"
)
const (
DEFAULT_MTU = 1500
DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE = 32768 * 16
DEFAULT_UPSTREAM_PACKET_QUEUE_SIZE = 32768
DEFAULT_IDLE_SESSION_EXPIRY_SECONDS = 300
ORPHAN_METRICS_CHECKPOINTER_PERIOD = 30 * time.Minute
FLOW_IDLE_EXPIRY = 60 * time.Second
)
// ServerConfig specifies the configuration of a packet tunnel server.
type ServerConfig struct {
// Logger is used for logging events and metrics.
Logger common.Logger
// SudoNetworkConfigCommands specifies whether to use "sudo"
// when executing network configuration commands. This is required
// when the packet tunnel server is not run as root and when
// process capabilities are not available (only Linux kernel 4.3+
// has the required capabilities support). The host sudoers file
// must be configured to allow the tunnel server process user to
// execute the commands invoked in configureServerInterface; see
// the implementation for the appropriate platform.
SudoNetworkConfigCommands bool
// AllowNoIPv6NetworkConfiguration indicates that failures while
// configuring tun interfaces and routing for IPv6 are to be
// logged as warnings only. This option is intended to support
// test cases on hosts without IPv6 and is not for production use;
// the packet tunnel server will still accept IPv6 packets and
// relay them to the tun device.
// AllowNoIPv6NetworkConfiguration may not be supported on all
// platforms.
AllowNoIPv6NetworkConfiguration bool
// EgressInterface is the interface to which client traffic is
// masqueraded/NATed. For example, "eth0". If blank, a platform-
// appropriate default is used.
EgressInterface string
// GetDNSResolverIPv4Addresses is a function which returns the
// DNS resolvers to use as transparent DNS rewrite targets for
// IPv4 DNS traffic.
//
// GetDNSResolverIPv4Addresses is invoked for each new client
// session and the list of resolvers is stored with the session.
// This is a compromise between checking current resolvers for
// each packet (too expensive) and simply passing in a static
// list (won't pick up resolver changes). As implemented, only
// new client sessions will pick up resolver changes.
//
// Transparent DNS rewriting occurs when the client uses the
// specific, target transparent DNS addresses specified by
// GetTransparentDNSResolverIPv4/6Address.
//
// For outbound DNS packets with a target resolver IP address,
// a random resolver is selected and used for the rewrite.
// For inbound packets, _any_ resolver in the list is rewritten
// back to the target resolver IP address. As a side-effect,
// responses to client DNS packets originally destined for a
// resolver in GetDNSResolverIPv4Addresses will be lost.
GetDNSResolverIPv4Addresses func() []net.IP
// GetDNSResolverIPv6Addresses is a function which returns the
// DNS resolvers to use as transparent DNS rewrite targets for
// IPv6 DNS traffic. It functions like GetDNSResolverIPv4Addresses.
GetDNSResolverIPv6Addresses func() []net.IP
// EnableDNSFlowTracking specifies whether to apply flow tracking to DNS
// flows, as required for DNS quality metrics. Typically there are many
// short-lived DNS flows to track and each tracked flow adds some overhead,
// so this defaults to off.
EnableDNSFlowTracking bool
// DownstreamPacketQueueSize specifies the size of the downstream
// packet queue. The packet tunnel server multiplexes all client
// packets through a single tun device, so when a packet is read,
// it must be queued or dropped if it cannot be immediately routed
// to the appropriate client. Note that the TCP and SSH windows
// for the underlying channel transport will impact transfer rate
// and queuing.
// When DownstreamPacketQueueSize is 0, a default value tuned for
// Psiphon is used.
DownstreamPacketQueueSize int
// MTU specifies the maximum transmission unit for the packet
// tunnel. Clients must be configured with the same MTU. The
// server's tun device will be set to this MTU value and is
// assumed not to change for the duration of the server.
// When MTU is 0, a default value is used.
MTU int
// SessionIdleExpirySeconds specifies how long to retain client
// sessions which have no client attached. Sessions are retained
// across client connections so reconnecting clients can resume
// a previous session. Resuming avoids leasing new IP addresses
// for reconnection, and also retains NAT state for active
// tunneled connections.
//
// SessionIdleExpirySeconds is also, effectively, the lease
// time for assigned IP addresses.
SessionIdleExpirySeconds int
// AllowBogons disables bogon checks. This should be used only
// for testing.
AllowBogons bool
}
// Server is a packet tunnel server. A packet tunnel server
// maintains client sessions, relays packets through client
// channels, and multiplexes packets through a single tun
// device. The server assigns IP addresses to clients, performs
// IP address and transparent DNS rewriting, and enforces
// traffic rules.
type Server struct {
config *ServerConfig
device *Device
indexToSession sync.Map
sessionIDToIndex sync.Map
connectedInProgress *sync.WaitGroup
workers *sync.WaitGroup
runContext context.Context
stopRunning context.CancelFunc
orphanMetrics *packetMetrics
}
// NewServer initializes a server.
func NewServer(config *ServerConfig) (*Server, error) {
device, err := NewServerDevice(config)
if err != nil {
return nil, errors.Trace(err)
}
runContext, stopRunning := context.WithCancel(context.Background())
return &Server{
config: config,
device: device,
connectedInProgress: new(sync.WaitGroup),
workers: new(sync.WaitGroup),
runContext: runContext,
stopRunning: stopRunning,
orphanMetrics: new(packetMetrics),
}, nil
}
// Start starts a server and returns with it running.
func (server *Server) Start() {
server.config.Logger.WithTrace().Info("starting")
server.workers.Add(1)
go server.runSessionReaper()
server.workers.Add(1)
go server.runOrphanMetricsCheckpointer()
server.workers.Add(1)
go server.runDeviceDownstream()
}
// Stop halts a running server.
func (server *Server) Stop() {
server.config.Logger.WithTrace().Info("stopping")
server.stopRunning()
// Interrupt blocked device read/writes.
server.device.Close()
// Wait for any in-progress ClientConnected calls to complete.
server.connectedInProgress.Wait()
// After this point, no further clients will be added: all
// in-progress ClientConnected calls have finished; and any
// later ClientConnected calls won't get past their
// server.runContext.Done() checks.
// Close all clients. Client workers will be joined
// by the following server.workers.Wait().
server.indexToSession.Range(func(_, value interface{}) bool {
session := value.(*session)
server.interruptSession(session)
return true
})
server.workers.Wait()
server.config.Logger.WithTrace().Info("stopped")
}
// AllowedPortChecker is a function which returns true when it is
// permitted to relay packets to the specified upstream IP address
// and/or port.
type AllowedPortChecker func(upstreamIPAddress net.IP, port int) bool
// AllowedDomainChecker is a function which returns true when it is
// permitted to resolve the specified domain name.
type AllowedDomainChecker func(string) bool
// FlowActivityUpdater defines an interface for receiving updates for
// flow activity. Values passed to UpdateProgress are bytes transferred
// and flow duration since the previous UpdateProgress.
type FlowActivityUpdater interface {
UpdateProgress(downstreamBytes, upstreamBytes, durationNanoseconds int64)
}
// FlowActivityUpdaterMaker is a function which returns a list of
// appropriate updaters for a new flow to the specified upstream
// hostname (if known -- may be ""), and IP address.
// The flow is TCP when isTCP is true, and UDP otherwise.
type FlowActivityUpdaterMaker func(
isTCP bool, upstreamHostname string, upstreamIPAddress net.IP) []FlowActivityUpdater
// MetricsUpdater is a function which receives a checkpoint summary
// of application bytes transferred through a packet tunnel.
type MetricsUpdater func(
TCPApplicationBytesDown, TCPApplicationBytesUp,
UDPApplicationBytesDown, UDPApplicationBytesUp int64)
// DNSQualityReporter is a function which receives a DNS quality report:
// whether a DNS request received a reponse, the elapsed time, and the
// resolver used.
type DNSQualityReporter func(
receivedResponse bool, requestDuration time.Duration, resolverIP net.IP)
// ClientConnected handles new client connections, creating or resuming
// a session and returns with client packet handlers running.
//
// sessionID is used to identify sessions for resumption.
//
// transport provides the channel for relaying packets to and from
// the client.
//
// checkAllowedTCPPortFunc/checkAllowedUDPPortFunc/checkAllowedDomainFunc
// are callbacks used to enforce traffic rules. For each TCP/UDP flow, the
// corresponding AllowedPort function is called to check if traffic to the
// packet's port is permitted. For upstream DNS query packets,
// checkAllowedDomainFunc is called to check if domain resolution is
// permitted. These callbacks must be efficient and safe for concurrent
// calls.
//
// flowActivityUpdaterMaker is a callback invoked for each new packet
// flow; it may create updaters to track flow activity.
//
// metricsUpdater is a callback invoked at metrics checkpoints (usually
// when the client disconnects) with a summary of application bytes
// transferred.
//
// It is safe to make concurrent calls to ClientConnected for distinct
// session IDs. The caller is responsible for serializing calls with the
// same session ID. Further, the caller must ensure, in the case of a client
// transport reconnect when an existing transport has not yet disconnected,
// that ClientDisconnected is called first -- so it doesn't undo the new
// ClientConnected. (psiphond meets these constraints by closing any
// existing SSH client with duplicate session ID early in the lifecycle of
// a new SSH client connection.)
func (server *Server) ClientConnected(
sessionID string,
transport io.ReadWriteCloser,
checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
checkAllowedDomainFunc AllowedDomainChecker,
flowActivityUpdaterMaker FlowActivityUpdaterMaker,
metricsUpdater MetricsUpdater,
dnsQualityReporter DNSQualityReporter) error {
// It's unusual to call both sync.WaitGroup.Add() _and_ Done() in the same
// goroutine. There's no other place to call Add() since ClientConnected is
// an API entrypoint. And Done() works because the invariant enforced by
// connectedInProgress.Wait() is not that no ClientConnected calls are in
// progress, but that no such calls are in progress past the
// server.runContext.Done() check.
// TODO: will this violate https://golang.org/pkg/sync/#WaitGroup.Add:
// "calls with a positive delta that occur when the counter is zero must happen before a Wait"?
server.connectedInProgress.Add(1)
defer server.connectedInProgress.Done()
select {
case <-server.runContext.Done():
return errors.TraceNew("server stopping")
default:
}
server.config.Logger.WithTraceFields(
common.LogFields{"sessionID": sessionID}).Debug("client connected")
MTU := getMTU(server.config.MTU)
clientSession := server.getSession(sessionID)
if clientSession != nil {
// Call interruptSession to ensure session is in the
// expected idle state.
server.interruptSession(clientSession)
// Note: we don't check the session expiry; whether it has
// already expired and not yet been reaped; or is about
// to expire very shortly. It could happen that the reaper
// will kill this session between now and when the expiry
// is reset in the following resumeSession call. In this
// unlikely case, the packet tunnel client should reconnect.
} else {
// Store IPv4 resolver addresses in 4-byte representation
// for use in rewritting.
resolvers := server.config.GetDNSResolverIPv4Addresses()
DNSResolverIPv4Addresses := make([]net.IP, len(resolvers))
for i, resolver := range resolvers {
// Assumes To4 is non-nil
DNSResolverIPv4Addresses[i] = resolver.To4()
}
clientSession = &session{
allowBogons: server.config.AllowBogons,
lastActivity: int64(monotime.Now()),
sessionID: sessionID,
metrics: new(packetMetrics),
enableDNSFlowTracking: server.config.EnableDNSFlowTracking,
DNSResolverIPv4Addresses: append([]net.IP(nil), DNSResolverIPv4Addresses...),
DNSResolverIPv6Addresses: append([]net.IP(nil), server.config.GetDNSResolverIPv6Addresses()...),
workers: new(sync.WaitGroup),
}
// One-time, for this session, random resolver selection for TCP transparent
// DNS forwarding. See comment in processPacket.
if len(clientSession.DNSResolverIPv4Addresses) > 0 {
clientSession.TCPDNSResolverIPv4Index = prng.Intn(len(clientSession.DNSResolverIPv4Addresses))
}
if len(clientSession.DNSResolverIPv6Addresses) > 0 {
clientSession.TCPDNSResolverIPv6Index = prng.Intn(len(clientSession.DNSResolverIPv6Addresses))
}
// allocateIndex initializes session.index, session.assignedIPv4Address,
// and session.assignedIPv6Address; and updates server.indexToSession and
// server.sessionIDToIndex.
err := server.allocateIndex(clientSession)
if err != nil {
return errors.Trace(err)
}
}
// Note: it's possible that a client disconnects (or reconnects before a
// disconnect is detected) and interruptSession is called between
// allocateIndex and resumeSession calls here, so interruptSession and
// related code must not assume resumeSession has been called.
server.resumeSession(
clientSession,
NewChannel(transport, MTU),
checkAllowedTCPPortFunc,
checkAllowedUDPPortFunc,
checkAllowedDomainFunc,
flowActivityUpdaterMaker,
metricsUpdater,
dnsQualityReporter)
return nil
}
// ClientDisconnected handles clients disconnecting. Packet handlers
// are halted, but the client session is left intact to reserve the
// assigned IP addresses and retain network state in case the client
// soon reconnects.
func (server *Server) ClientDisconnected(sessionID string) {
session := server.getSession(sessionID)
if session != nil {
server.config.Logger.WithTraceFields(
common.LogFields{"sessionID": sessionID}).Debug("client disconnected")
server.interruptSession(session)
}
}
func (server *Server) getSession(sessionID string) *session {
if index, ok := server.sessionIDToIndex.Load(sessionID); ok {
s, ok := server.indexToSession.Load(index.(int32))
if ok {
return s.(*session)
}
server.config.Logger.WithTrace().Warning("unexpected missing session")
}
return nil
}
func (server *Server) resumeSession(
session *session,
channel *Channel,
checkAllowedTCPPortFunc, checkAllowedUDPPortFunc AllowedPortChecker,
checkAllowedDomainFunc AllowedDomainChecker,
flowActivityUpdaterMaker FlowActivityUpdaterMaker,
metricsUpdater MetricsUpdater,
dnsQualityReporter DNSQualityReporter) {
session.mutex.Lock()
defer session.mutex.Unlock()
// Performance/concurrency note: the downstream packet queue
// and various packet event callbacks may be accessed while
// the session is idle, via the runDeviceDownstream goroutine,
// which runs concurrent to resumeSession/interruptSession calls.
// Consequently, all accesses to these fields must be
// synchronized.
//
// Benchmarking indicates the atomic.LoadPointer mechanism
// outperforms a mutex; approx. 2 ns/op vs. 20 ns/op in the case
// of getCheckAllowedTCPPortFunc. Since these accesses occur
// multiple times per packet, atomic.LoadPointer is used and so
// each of these fields is an unsafe.Pointer in the session
// struct.
// Begin buffering downstream packets.
downstreamPacketQueueSize := DEFAULT_DOWNSTREAM_PACKET_QUEUE_SIZE
if server.config.DownstreamPacketQueueSize > 0 {
downstreamPacketQueueSize = server.config.DownstreamPacketQueueSize
}
downstreamPackets := NewPacketQueue(downstreamPacketQueueSize)
session.setDownstreamPackets(downstreamPackets)
// Set new access control, flow monitoring, and metrics
// callbacks; all associated with the new client connection.
// IMPORTANT: any new callbacks or references to the outer client added
// here must be cleared in interruptSession to ensure that a paused
// session does not retain references to old client connection objects
// after the client disconnects.
session.setCheckAllowedTCPPortFunc(&checkAllowedTCPPortFunc)
session.setCheckAllowedUDPPortFunc(&checkAllowedUDPPortFunc)
session.setCheckAllowedDomainFunc(&checkAllowedDomainFunc)
session.setFlowActivityUpdaterMaker(&flowActivityUpdaterMaker)
session.setMetricsUpdater(&metricsUpdater)
session.setDNSQualityReporter(&dnsQualityReporter)
session.channel = channel
// Parent context is not server.runContext so that session workers
// need only check session.stopRunning to act on shutdown events.
session.runContext, session.stopRunning = context.WithCancel(context.Background())
// When a session is interrupted, all goroutines in session.workers
// are joined. When the server is stopped, all goroutines in
// server.workers are joined. So, in both cases we synchronously
// stop all workers associated with this session.
session.workers.Add(1)
go server.runClientUpstream(session)
session.workers.Add(1)
go server.runClientDownstream(session)
session.touch()
}
func (server *Server) interruptSession(session *session) {
session.mutex.Lock()
defer session.mutex.Unlock()
wasRunning := (session.channel != nil)
if session.stopRunning != nil {
session.stopRunning()
}
if session.channel != nil {
// Interrupt blocked channel read/writes.
session.channel.Close()
}
session.workers.Wait()
if session.channel != nil {
// Don't hold a reference to channel, allowing both it and
// its conn to be garbage collected.
// Setting channel to nil must happen after workers.Wait()
// to ensure no goroutine remains which may access
// session.channel.
session.channel = nil
}
metricsUpdater := session.getMetricsUpdater()
// interruptSession may be called for idle sessions, to ensure
// the session is in an expected state: in ClientConnected,
// and in server.Stop(); don't log in those cases.
if wasRunning {
session.metrics.checkpoint(
server.config.Logger,
metricsUpdater,
"server_packet_metrics",
packetMetricsAll)
}
// Release the downstream packet buffer, so the associated
// memory is not consumed while no client is connected.
//
// Since runDeviceDownstream continues to run and will access
// session.downstreamPackets, an atomic pointer is used to
// synchronize access.
session.setDownstreamPackets(nil)
session.setCheckAllowedTCPPortFunc(nil)
session.setCheckAllowedUDPPortFunc(nil)
session.setCheckAllowedDomainFunc(nil)
session.setFlowActivityUpdaterMaker(nil)
session.setMetricsUpdater(nil)
session.setDNSQualityReporter(nil)
}
func (server *Server) runSessionReaper() {
defer server.workers.Done()
// Periodically iterate over all sessions and discard expired
// sessions. This action, removing the index from server.indexToSession,
// releases the IP addresses assigned to the session.
// TODO: As-is, this will discard sessions for live SSH tunnels,
// as long as the SSH channel for such a session has been idle for
// a sufficient period. Should the session be retained as long as
// the SSH tunnel is alive (e.g., expose and call session.touch()
// on keepalive events)? Or is it better to free up resources held
// by idle sessions?
idleExpiry := server.sessionIdleExpiry()
ticker := time.NewTicker(idleExpiry / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
server.indexToSession.Range(func(_, value interface{}) bool {
session := value.(*session)
if session.expired(idleExpiry) {
server.removeSession(session)
}
return true
})
case <-server.runContext.Done():
return
}
}
}
func (server *Server) sessionIdleExpiry() time.Duration {
sessionIdleExpirySeconds := DEFAULT_IDLE_SESSION_EXPIRY_SECONDS
if server.config.SessionIdleExpirySeconds > 2 {
sessionIdleExpirySeconds = server.config.SessionIdleExpirySeconds
}
return time.Duration(sessionIdleExpirySeconds) * time.Second
}
func (server *Server) removeSession(session *session) {
server.sessionIDToIndex.Delete(session.sessionID)
server.indexToSession.Delete(session.index)
server.interruptSession(session)
// Delete flows to ensure any pending flow metrics are reported.
session.deleteFlows()
}
func (server *Server) runOrphanMetricsCheckpointer() {
defer server.workers.Done()
// Periodically log orphan packet metrics. Orphan metrics
// are not associated with any session. This includes
// packets that are rejected before they can be associated
// with a session.
ticker := time.NewTicker(ORPHAN_METRICS_CHECKPOINTER_PERIOD)
defer ticker.Stop()
for {
done := false
select {
case <-ticker.C:
case <-server.runContext.Done():
done = true
}
// TODO: skip log if all zeros?
server.orphanMetrics.checkpoint(
server.config.Logger,
nil,
"server_orphan_packet_metrics",
packetMetricsRejected)
if done {
return
}
}
}
func (server *Server) runDeviceDownstream() {
defer server.workers.Done()
// Read incoming packets from the tun device, parse and validate the
// packets, map them to a session/client, perform rewriting, and relay
// the packets to the client.
for {
readPacket, err := server.device.ReadPacket()
select {
case <-server.runContext.Done():
// No error is logged as shutdown may have interrupted read.
return
default:
}
if err != nil {
server.config.Logger.WithTraceFields(
common.LogFields{"error": err}).Warning("read device packet failed")
// May be temporary error condition, keep reading.
continue
}
// destinationIPAddress determines which client receives this packet.
// At this point, only enough of the packet is inspected to determine
// this routing info; further validation happens in subsequent
// processPacket in runClientDownstream.
// Note that masquerading/NAT stands between the Internet and the tun
// device, so arbitrary packets cannot be sent through to this point.
// TODO: getPacketDestinationIPAddress and processPacket perform redundant
// packet parsing; refactor to avoid extra work?
destinationIPAddress, ok := getPacketDestinationIPAddress(
server.orphanMetrics, packetDirectionServerDownstream, readPacket)
if !ok {
// Packet is dropped. Reason will be counted in orphan metrics.
continue
}
// Map destination IP address to client session.
index := server.convertIPAddressToIndex(destinationIPAddress)
s, ok := server.indexToSession.Load(index)
if !ok {
server.orphanMetrics.rejectedPacket(
packetDirectionServerDownstream, packetRejectNoSession)
continue
}
session := s.(*session)
downstreamPackets := session.getDownstreamPackets()
// No downstreamPackets buffer is maintained when no client is
// connected, so the packet is dropped.
if downstreamPackets == nil {
server.orphanMetrics.rejectedPacket(
packetDirectionServerDownstream, packetRejectNoClient)
continue
}
// Simply enqueue the packet for client handling, and move on to
// read the next packet. The packet tunnel server multiplexes all
// client packets through a single tun device, so we must not block
// on client channel I/O here.
//
// When the queue is full, the packet is dropped. This is standard
// behavior for routers, VPN servers, etc.
//
// TODO: processPacket is performed here, instead of runClientDownstream,
// since packets are packed contiguously into the packet queue and if
// the packet it to be omitted, that should be done before enqueuing.
// The potential downside is that all packet processing is done in this
// single thread of execution, blocking the next packet for the next
// client. Try handing off the packet to another worker which will
// call processPacket and Enqueue?
// In downstream mode, processPacket rewrites the destination address
// to the original client source IP address, and also rewrites DNS
// packets. As documented in runClientUpstream, the original address
// should already be populated via an upstream packet; if not, the
// packet will be rejected.
if !processPacket(
session.metrics,
session,
packetDirectionServerDownstream,
readPacket) {
// Packet is rejected and dropped. Reason will be counted in metrics.
continue
}
downstreamPackets.Enqueue(readPacket)
}
}
func (server *Server) runClientUpstream(session *session) {
defer session.workers.Done()
// Read incoming packets from the client channel, validate the packets,
// perform rewriting, and send them through to the tun device.
for {
readPacket, err := session.channel.ReadPacket()
select {
case <-session.runContext.Done():
// No error is logged as shutdown may have interrupted read.
return
default:
}
if err != nil {
// Debug since channel I/O errors occur during normal operation.
server.config.Logger.WithTraceFields(
common.LogFields{"error": err}).Debug("read channel packet failed")
// Tear down the session. Must be invoked asynchronously.
go server.interruptSession(session)
return
}
session.touch()
// processPacket transparently rewrites the source address to the
// session's assigned address and rewrites the destination of any
// DNS packets destined to the target DNS resolver.
//
// The first time the source address is rewritten, the original
// value is recorded so inbound packets can have the reverse
// rewrite applied. This assumes that the client will send a
// packet before receiving any packet, which is the case since
// only clients can initiate TCP or UDP connections or flows.
if !processPacket(
session.metrics,
session,
packetDirectionServerUpstream,
readPacket) {
// Packet is rejected and dropped. Reason will be counted in metrics.
continue
}
err = server.device.WritePacket(readPacket)
if err != nil {
server.config.Logger.WithTraceFields(
common.LogFields{"error": err}).Warning("write device packet failed")
// May be temporary error condition, keep working. The packet is
// most likely dropped.
continue
}
}
}
func (server *Server) runClientDownstream(session *session) {
defer session.workers.Done()
// Dequeue, process, and relay packets to be sent to the client channel.
for {
downstreamPackets := session.getDownstreamPackets()
// Note: downstreamPackets will not be nil, since this goroutine only
// runs while the session has a connected client.
packetBuffer, ok := downstreamPackets.DequeueFramedPackets(session.runContext)
if !ok {
// Dequeue aborted due to session.runContext.Done()
return
}
err := session.channel.WriteFramedPackets(packetBuffer)
if err != nil {
// Debug since channel I/O errors occur during normal operation.
server.config.Logger.WithTraceFields(
common.LogFields{"error": err}).Debug("write channel packets failed")
downstreamPackets.Replace(packetBuffer)
// Tear down the session. Must be invoked asynchronously.
go server.interruptSession(session)
return
}
session.touch()
downstreamPackets.Replace(packetBuffer)
}
}
var (
serverIPv4AddressCIDR = "10.0.0.1/8"
transparentDNSResolverIPv4Address = net.ParseIP("10.0.0.2").To4() // 4-byte for rewriting
_, privateSubnetIPv4, _ = net.ParseCIDR("10.0.0.0/8")
assignedIPv4AddressTemplate = "10.%d.%d.%d"
serverIPv6AddressCIDR = "fd19:ca83:e6d5:1c44:0000:0000:0000:0001/64"
transparentDNSResolverIPv6Address = net.ParseIP("fd19:ca83:e6d5:1c44:0000:0000:0000:0002")
_, privateSubnetIPv6, _ = net.ParseCIDR("fd19:ca83:e6d5:1c44::/64")
assignedIPv6AddressTemplate = "fd19:ca83:e6d5:1c44:8c57:4434:ee%02x:%02x%02x"
)
func (server *Server) allocateIndex(newSession *session) error {
// Find and assign an available index in the 24-bit index space.
// The index directly maps to and so determines the assigned
// IPv4 and IPv6 addresses.
// Search is a random index selection followed by a linear probe.
// TODO: is this the most effective (fast on average, simple) algorithm?
max := 0x00FFFFFF
randomInt := prng.Intn(max + 1)
index := int32(randomInt)
index &= int32(max)
idleExpiry := server.sessionIdleExpiry()
for tries := 0; tries < 100000; index++ {
tries++
// The index/address space isn't exactly 24-bits:
// - 0 and 0x00FFFFFF are reserved since they map to
// the network identifier (10.0.0.0) and broadcast