Skip to content

Commit

Permalink
[WIP] Active TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
ashellunts committed Feb 4, 2023
1 parent c0be5d1 commit ed64aab
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 56 deletions.
166 changes: 166 additions & 0 deletions active_tcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package ice

import (
"context"
"fmt"
"net"
"testing"

"github.com/pion/logging"
"github.com/stretchr/testify/require"
)

func connect_2(aAgent, bAgent *Agent) (*Conn, *Conn) {

fmt.Println("1")
gatherAndExchangeCandidates(aAgent, bAgent)
fmt.Println("2")

accepted := make(chan struct{})
var aConn *Conn

go func() {
var acceptErr error
bUfrag, bPwd, acceptErr := bAgent.GetLocalUserCredentials()
fmt.Println("3")

check(acceptErr)
aConn, acceptErr = aAgent.Accept(context.TODO(), bUfrag, bPwd)
fmt.Println("4")

check(acceptErr)
close(accepted)
fmt.Println("5")

}()
aUfrag, aPwd, err := aAgent.GetLocalUserCredentials()
fmt.Println("6")

check(err)
bConn, err := bAgent.Dial(context.TODO(), aUfrag, aPwd)
check(err)
fmt.Println("7")

// Ensure accepted
<-accepted
fmt.Println("8")

return aConn, bConn
}

func TestActiveTCP(t *testing.T) {
r := require.New(t)

const port = 7686

listener, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IPv4(192, 168, 1, 92),
Port: port,
})
r.NoError(err)
defer func() {
_ = listener.Close()
}()

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)

tcpMux := NewTCPMuxDefault(TCPMuxParams{
Listener: listener,
Logger: loggerFactory.NewLogger("passive-ice"),
ReadBufferSize: 20,
})

defer func() {
_ = tcpMux.Close()
}()

r.NotNil(tcpMux.LocalAddr(), "tcpMux.LocalAddr() is nil")
fmt.Println(tcpMux.LocalAddr())

ipFilter := func(ip net.IP) bool {
println("-------------------", ip.To16().String())
// panic(1)
return true
}

passiveAgent, err := NewAgent(&AgentConfig{
TCPMux: tcpMux,
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeTCP4},
LoggerFactory: loggerFactory,
activeTCP: false,
IPFilter: ipFilter,
})
r.NoError(err)
r.NotNil(passiveAgent)

activeAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeTCP4},
LoggerFactory: loggerFactory,
activeTCP: true,
})
r.NoError(err)
r.NotNil(activeAgent)

// gatherAndExchangeCandidates(activeAgent, passiveAgent)

passiveAgentConn, activeAgenConn := connect_2(passiveAgent, activeAgent)
r.NotNil(passiveAgentConn)
r.NotNil(activeAgenConn)

// pair := passiveAgent.getSelectedPair()
// r.NotNil(pair)
// r.Equal(port, pair.Local.Port())

// // send a packet from mux
// data := []byte("hello world")
// _, err = passiveAgentConn.Write(data)
// r.NoError(err)

// buffer := make([]byte, 1024)
// n, err := activeAgenConn.Read(buffer)
// r.NoError(err)
// r.Equal(data, buffer[:n])

// // send a packet to mux
// data2 := []byte("hello world 2")
// _, err = activeAgenConn.Write(data2)
// r.NoError(err)

// n, err = passiveAgentConn.Read(buffer)
// r.NoError(err)
// r.Equal(data2, buffer[:n])

// r.NoError(activeAgenConn.Close())
// r.NoError(passiveAgentConn.Close())
// r.NoError(tcpMux.Close())
}

func TestUDP2(t *testing.T) {
r := require.New(t)

loggerFactory := logging.NewDefaultLoggerFactory()
loggerFactory.DefaultLogLevel.Set(logging.LogLevelTrace)

passiveAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeUDP4},
LoggerFactory: loggerFactory,
})
r.NoError(err)
r.NotNil(passiveAgent)

activeAgent, err := NewAgent(&AgentConfig{
CandidateTypes: []CandidateType{CandidateTypeHost},
NetworkTypes: []NetworkType{NetworkTypeUDP4},
LoggerFactory: loggerFactory,
})
r.NoError(err)
r.NotNil(activeAgent)

passiveAgentConn, activeAgenConn := connect(passiveAgent, activeAgent)
r.NotNil(passiveAgentConn)
r.NotNil(activeAgenConn)
}
80 changes: 69 additions & 11 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package ice

import (
"context"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -92,6 +94,9 @@ type Agent struct {
remotePwd string
remoteCandidates map[NetworkType][]Candidate

activeTCP bool
tcpReadBufferSize int

checklist []*CandidatePair
selector pairCandidateSelector

Expand Down Expand Up @@ -320,6 +325,13 @@ func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
insecureSkipVerify: config.InsecureSkipVerify,

includeLoopback: config.IncludeLoopback,
activeTCP: config.activeTCP,
}

const defaultTCPReadBufferSize = 8
a.tcpReadBufferSize = config.tcpReadBufferSize
if a.tcpReadBufferSize == 0 {
a.tcpReadBufferSize = defaultTCPReadBufferSize
}

a.tcpMux = config.TCPMux
Expand Down Expand Up @@ -656,6 +668,43 @@ func (a *Agent) getBestValidCandidatePair() *CandidatePair {
}

func (a *Agent) addPair(local, remote Candidate) *CandidatePair {
if local.TCPType() == TCPTypeActive && remote.TCPType() == TCPTypePassive {
a.log.Debugf("artur, addPair: local %s, remote %s", local, remote)
addressToConnect := remote.Address() + ":" + fmt.Sprint(remote.Port())
a.log.Debugf("artur, addressToConnect %s", addressToConnect)

// connect
conn, err := net.Dial("tcp", addressToConnect)
if err != nil {
a.log.Errorf("Failed to dial TCP address %s: %v", addressToConnect, err)
return nil
}
a.log.Debugf("artur, socket connected, local %s, remote %s", conn.LocalAddr(), conn.RemoteAddr())

// create PacketCon from tcp connection
packetConn := newTCPPacketConn(tcpPacketParams{
ReadBuffer: a.tcpReadBufferSize,
LocalAddr: conn.LocalAddr(),
Logger: a.log,
})

if err = packetConn.AddConn(conn, nil); err != nil {
a.log.Errorf("Failed to add TCP connection: %v", err)
return nil
}

// updating local candidate with its real local port
localPort, err := strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1])
if err != nil {
panic(err)
}

localCandidateHost := local.(*CandidateHost)
localCandidateHost.port = localPort // FIXME: this causes a data race with candidateBase.Port() artur

local.start(a, packetConn, a.startedCh)
}

p := newCandidatePair(local, remote, a.isControlling)
a.checklist = append(a.checklist, p)
return p
Expand Down Expand Up @@ -722,14 +771,14 @@ func (a *Agent) AddRemoteCandidate(c Candidate) error {
return nil
}

// cannot check for network yet because it might not be applied
// when mDNS hostname is used.
if c.TCPType() == TCPTypeActive {
// TCP Candidates with TCP type active will probe server passive ones, so
// no need to do anything with them.
a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
return nil
}
// // cannot check for network yet because it might not be applied
// // when mDNS hostname is used.
// if c.TCPType() == TCPTypeActive {
// // TCP Candidates with TCP type active will probe server passive ones, so
// // no need to do anything with them.
// a.log.Infof("Ignoring remote candidate with tcpType active: %s", c)
// return nil
// }

// If we have a mDNS Candidate lets fully resolve it before adding it locally
if c.Type() == CandidateTypeHost && strings.HasSuffix(c.Address(), ".local") {
Expand Down Expand Up @@ -832,7 +881,9 @@ func (a *Agent) addCandidate(ctx context.Context, c Candidate, candidateConn net
}
}

c.start(a, candidateConn, a.startedCh)
if !a.activeTCP {
c.start(a, candidateConn, a.startedCh)
}

set = append(set, c)
a.localCandidates[c.NetworkType()] = set
Expand Down Expand Up @@ -960,7 +1011,14 @@ func (a *Agent) findRemoteCandidate(networkType NetworkType, addr net.Addr) Cand

set := a.remoteCandidates[networkType]
for _, c := range set {
if c.Address() == ip.String() && c.Port() == port {
if c.Address() == ip.String() && c.TCPType() == TCPTypeActive && c.Port() == 0 {
// remote active TCP candidate, locally port was zero ()
// change port to match remote candidate
candidateRemote := c.(*CandidateHost)
candidateRemote.port = port
candidateRemote.resolvedAddr = createAddr(networkType, ip, port)
return c
} else if c.Address() == ip.String() && c.Port() == port {
return c
}
}
Expand Down Expand Up @@ -1091,7 +1149,7 @@ func (a *Agent) handleInbound(m *stun.Message, local Candidate, remote net.Addr)
return
}

if remoteCandidate == nil {
if remoteCandidate == nil && local.TCPType() != TCPTypePassive {
ip, port, networkType, ok := parseAddr(remote)
if !ok {
a.log.Errorf("Failed to create parse remote net.Addr when creating remote prflx candidate")
Expand Down
3 changes: 3 additions & 0 deletions agent_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ type AgentConfig struct {
// experimental and the API might change in the future.
TCPMux TCPMux

activeTCP bool
tcpReadBufferSize int

// UDPMux is used for multiplexing multiple incoming UDP connections on a single port
// when this is set, the agent ignores PortMin and PortMax configurations and will
// defer to UDPMux for incoming connections
Expand Down
8 changes: 7 additions & 1 deletion candidate_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func (c *candidateBase) start(a *Agent, conn net.PacketConn, initializedCh <-cha
}

func (c *candidateBase) recvLoop(initializedCh <-chan struct{}) {

if c.tcpType == TCPTypeActive {
c.agent().log.Debugf("%s is an active TCP candidate, skipping recvLoop", c.ID())
return
}

a := c.agent()

defer close(c.closedCh)
Expand Down Expand Up @@ -271,7 +277,7 @@ func (c *candidateBase) handleInboundPacket(buf []byte, srcAddr net.Addr) {
// close stops the recvLoop
func (c *candidateBase) close() error {
// If conn has never been started will be nil
if c.Done() == nil {
if c.Done() == nil || c.TCPType() == TCPTypeActive {
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions candidate_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func NewCandidateHost(config *CandidateHostConfig) (*CandidateHost, error) {
}

if !strings.HasSuffix(config.Address, ".local") {
if config.TCPType == TCPTypeActive {
c.candidateBase.networkType = NetworkTypeTCP4
return c, nil
}

ip := net.ParseIP(config.Address)
if ip == nil {
return nil, ErrAddressParseFailed
Expand Down
Loading

0 comments on commit ed64aab

Please sign in to comment.