Skip to content

Commit

Permalink
[WIP] Active TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
ashellunts committed Nov 8, 2021
1 parent 6d30128 commit bc02898
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 20 deletions.
1 change: 1 addition & 0 deletions AUTHORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Adam Kiss <masterada@gmail.com>
adwpc <adwpc@hotmail.com>
Aleksandr Razumov <ar@gortc.io>
Antoine Baché <antoine@tenten.app>
Artur Shellunts <shellunts.artur@gmail.com>
Assad Obaid <assad@lap5cg901003r.se.axis.com>
Atsushi Watanabe <atsushi.w@ieee.org>
backkem <mail@backkem.me>
Expand Down
117 changes: 117 additions & 0 deletions active_tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package ice

import (
"fmt"
"net"
"testing"

"github.com/pion/logging"
"github.com/stretchr/testify/require"
)

func TestActiveTCP(t *testing.T) {
r := require.New(t)

const port = 7686

listener, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: port,
})
r.NoError(err)
defer func() {
_ = listener.Close()
}()

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)

tcpMux := NewTCPMuxDefault(TCPMuxParams{
Listener: listener,
Logger: loggerFactory.NewLogger("ice"),
ReadBufferSize: 20,
})

defer func() {
_ = tcpMux.Close()
}()

r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil")
fmt.Println(tcpMux.LocalAddr())

passiveAgent, err := NewAgent(&AgentConfig{
TCPMux: tcpMux,
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeTCP4},
LoggerFactory: loggerFactory,
activeTCP: false,
})
r.NoError(err)
r.NotNil(passiveAgent)

activeAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeTCP4},
LoggerFactory: loggerFactory,
activeTCP: true,
})
r.NoError(err)
r.NotNil(activeAgent)

passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent)
r.NotNil(passiveAgentConn)
r.NotNil(activeAgenConn)

pair := passiveAgent.getSelectedPair()
r.NotNil(pair)
r.Equal(port, pair.Local.Port())

// send a packet from mux
data := []byte("hello world")
_, err = passiveAgentConn.Write(data)
r.NoError(err)

buffer := make([]byte, 1024)
n, err := activeAgenConn.Read(buffer)
r.NoError(err)
r.Equal(data, buffer[:n])

// send a packet to mux
data2 := []byte("hello world 2")
_, err = activeAgenConn.Write(data2)
r.NoError(err)

n, err = passiveAgentConn.Read(buffer)
r.NoError(err)
r.Equal(data2, buffer[:n])

//r.NoError(activeAgenConn.Close())
//r.NoError(passiveAgentConn.Close())
//r.NoError(tcpMux.Close())
}

func TestUDP(t *testing.T) {
r := require.New(t)

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)

passiveAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeUDP4},
LoggerFactory: loggerFactory,
})
r.NoError(err)
r.NotNil(passiveAgent)

activeAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeUDP4},
LoggerFactory: loggerFactory,
})
r.NoError(err)
r.NotNil(activeAgent)

passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent)
r.NotNil(passiveAgentConn)
r.NotNil(activeAgenConn)
}
62 changes: 54 additions & 8 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package ice

import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -92,6 +94,8 @@ type Agent struct {
remotePwd string
remoteCandidates map[NetworkType][]Candidate

activeTCP bool

checklist []*CandidatePair
selector pairCandidateSelector

Expand Down Expand Up @@ -309,6 +313,7 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
interfaceFilter: config.InterfaceFilter,

insecureSkipVerify: config.InsecureSkipVerify,
activeTCP: config.activeTCP,
}

a.tcpMux = config.TCPMux
Expand Down Expand Up @@ -646,6 +651,40 @@ func (a *Agent) getBestValidCandidatePair() *CandidatePair {
}

func (a *Agent) addPair(local, remote Candidate) *CandidatePair {

if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypePassive {
a.log.Debugf("artur, addPair: local %s, remote %s", local, remote)
addressToConnect := remote.Address() + ":" + fmt.Sprint(remote.Port())
a.log.Debugf("artur, addressToConnect %s", addressToConnect)

//connect
conn, err := net.Dial("tcp4", addressToConnect)
if err != nil {
panic(err)
}
a.log.Debugf("artur, socket connected, local %s, remote %s", conn.LocalAddr(), conn.RemoteAddr())

//create PacketCon from tcp connection
packetConn := newTCPPacketConn(tcpPacketParams{
ReadBuffer: 10000,
LocalAddr: conn.LocalAddr(),
Logger: a.log,
})

packetConn.AddConn(conn, nil)

//updating local candidate with its real local port
localPort, err := strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1])
if err != nil {
panic(err)
}

localCandidateHost := local.(*CandidateHost)
localCandidateHost.port = localPort

local.start(a, packetConn, a.startedCh)
}

p := newCandidatePair(local, remote, a.isControlling)
a.checklist = append(a.checklist, p)
return p
Expand Down Expand Up @@ -714,12 +753,12 @@ func (a *Agent) AddRemoteCandidate(c Candidate) error {

// cannot check for network yet because it might not be applied
// when mDNS hostame is used.
if c.TCPType() == TCPTypeActive {
// TCP Candidates with tcptype active will probe server passive ones, so
// no need to do anything with them.
a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
return nil
}
// if c.TCPType() == TCPTypeActive {
// // TCP Candidates with tcptype active will probe server passive ones, so
// // no need to do anything with them.
// a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
// return nil
// }

// If we have a mDNS Candidate lets fully resolve it before adding it locally
if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
Expand Down Expand Up @@ -942,7 +981,14 @@ func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Cand

set := a.remoteCandidates[networkType]
for _, c := range set {
if c.Address() == ip.String() && c.Port() == port {
if c.Address() == ip.String() && c.TCPType() == TCPTypeActive && c.Port() == 0 {
//remote active TCP candidate, locally port was zero ()
//change port to match remote candidate
candidateRemote := c.(*CandidateHost)
candidateRemote.port = port
candidateRemote.resolvedAddr = createAddr(networkType, ip, port)
return c
} else if c.Address() == ip.String() && c.Port() == port {
return c
}
}
Expand Down Expand Up @@ -1074,7 +1120,7 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr)
return
}

if remoteCandidate == nil {
if remoteCandidate == nil && local.TCPType() != TCPTypePassive {
ip, port, networkType, ok := parseAddr(remote)
if !ok {
a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
Expand Down
3 changes: 2 additions & 1 deletion agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func defaultCandidateTypes() []CandidateType {
// AgentConfig collects the arguments to ice.Agent construction into
// a single structure, for future-proofness of the interface
type AgentConfig struct {
Urls []*URL
activeTCP bool
Urls []*URL

// PortMin and PortMax are optional. Leave them 0 for the default UDP port allocation strategy.
PortMin uint16
Expand Down
4 changes: 4 additions & 0 deletions candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (c *candidateBase) start(a *Agent, conn net.PacketConn, initializedCh <-cha
c.closeCh = make(chan struct{})
c.closedCh = make(chan struct{})

// if c.tcpType == TCPTypeActive {
// return
// }

go c.recvLoop(initializedCh)
}

Expand Down
6 changes: 6 additions & 0 deletions candidate_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ func NewCandidateHost(config *CandidateHostConfig) (*CandidateHost, error) {
}

if !strings.HasSuffix(config.Address, ".local") {

if config.TCPType == TCPTypeActive {
c.candidateBase.networkType = NetworkTypeTCP4
return c, nil
}

ip := net.ParseIP(config.Address)
if ip == nil {
return nil, ErrAddressParseFailed
Expand Down
26 changes: 15 additions & 11 deletions gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,23 @@ func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []Networ

switch network {
case tcp:
// Handle ICE TCP passive mode
a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
if err != nil {
if !errors.Is(err, ErrTCPMuxNotInitialized) {
a.log.Warnf("error getting tcp conn by ufrag: %s %s %s\n", network, ip, a.localUfrag)
if a.activeTCP {
tcpType = TCPTypeActive
} else {
// Handle ICE TCP passive mode
a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
if err != nil {
if !errors.Is(err, ErrTCPMuxNotInitialized) {
a.log.Warnf("error getting tcp conn by ufrag: %s %s %s\n", network, ip, a.localUfrag)
}
continue
}
continue
port = conn.LocalAddr().(*net.TCPAddr).Port
tcpType = TCPTypePassive
// is there a way to verify that the listen address is even
// accessible from the current interface.
}
port = conn.LocalAddr().(*net.TCPAddr).Port
tcpType = TCPTypePassive
// is there a way to verify that the listen address is even
// accessible from the current interface.
case udp:
conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: ip, Port: 0})
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ func (s *controlledSelector) ContactCandidates() {
}

func (s *controlledSelector) PingCandidate(local, remote Candidate) {

if remote.Port() == 0 {
return
}

msg, err := stun.Build(stun.BindingRequest, stun.TransactionID,
stun.NewUsername(s.agent.remoteUfrag+":"+s.agent.localUfrag),
AttrControlled(s.agent.tieBreaker),
Expand Down

0 comments on commit bc02898

Please sign in to comment.