Skip to content
This repository has been archived by the owner on Jul 28, 2021. It is now read-only.

Commit

Permalink
UDP dial fix (second candidate)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickname76 committed Aug 30, 2020
1 parent c0d4925 commit 2d809b5
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 149 deletions.
2 changes: 1 addition & 1 deletion cmd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ run: run-tui
run-tui:
cd ./tui; $(GOCMD) run main.go $(ARGS)
run-cli:
cd ./tui; $(GOCMD) run main.go $(ARGS)
cd ./cli; $(GOCMD) run main.go $(ARGS)

deps:
$(GOCMD) get
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/libp2p/go-ws-transport v0.3.1
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/nsf/termbox-go v0.0.0-20200418040025-38ba6e5628f1 // indirect
github.com/pion/udp v0.1.0
github.com/sparkymat/appdir v0.0.0-20190803090504-1c2ab64aee87
go.uber.org/zap v1.15.0
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/transport v0.10.0 h1:9M12BSneJm6ggGhJyWpDveFOstJsTiQjkLf4M44rm80=
github.com/pion/transport v0.10.0/go.mod h1:BnHnUipd0rZQyTVB2SBGojFHT9CBt5C5TcsJSQGkvSE=
github.com/pion/udp v0.1.0 h1:uGxQsNyrqG3GLINv36Ff60covYmfrLoxzwnCsIYspXI=
github.com/pion/udp v0.1.0/go.mod h1:BPELIjbwE9PRbd/zxI/KYBnbo7B6+oA6YuEaNE8lths=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -678,6 +683,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
Expand Down
186 changes: 38 additions & 148 deletions protdial.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"math/rand"
"net"
"strconv"
"sync"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pion/udp"
)

const dialProtID protocol.ID = "/p2pforwarder/dial/1.0.0"
Expand Down Expand Up @@ -86,34 +86,51 @@ func setDialHandler(f *Forwarder) {
})
}

func createAddrInfoString(network string, listenip string, lport int, port int) string {
return network + ":" + listenip + ":" + strconv.Itoa(lport) + " -> " + network + ":" + strconv.Itoa(port)
}

func (f *Forwarder) dial(ctx context.Context, peerid peer.ID, protocolType byte, listenip string, port uint16) {
lport := int(port)

var addressinfostr string

var lnfunc func(lip net.IP, port int) (net.Listener, error)

switch protocolType {
case protocolTypeTCP:
f.dialTCP(ctx, peerid, protocolType, listenip, port)
addressinfostr = createAddrInfoString("tcp", listenip, lport, int(port))

lnfunc = func(lip net.IP, port int) (net.Listener, error) {
return net.ListenTCP("tcp", &net.TCPAddr{
IP: lip,
Port: port,
})
}
case protocolTypeUDP:
f.dialUDP(ctx, peerid, protocolType, listenip, port)
addressinfostr = createAddrInfoString("udp", listenip, lport, int(port))

lnfunc = func(lip net.IP, port int) (net.Listener, error) {
return udp.Listen("udp", &net.UDPAddr{
IP: lip,
Port: port,
})
}
}
}

func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType byte, listenip string, port uint16) {
lport := int(port)
ln, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.ParseIP(listenip),
Port: lport,
})
lip := net.ParseIP(listenip)

ln, err := lnfunc(lip, lport)
if err != nil {
onErrFn(fmt.Errorf("dialTCP: %s", err))
onErrFn(fmt.Errorf("dial: %s", err))

for i := 0; i < 4; i++ {
lport = rand.Intn(65535-1024) + 1024

ln, err = net.ListenTCP("tcp", &net.TCPAddr{
IP: net.ParseIP(listenip),
Port: lport,
})
ln, err = lnfunc(lip, lport)

if err != nil {
onErrFn(fmt.Errorf("dialTCP: %s", err))
onErrFn(fmt.Errorf("dial: %s", err))
} else {
break
}
Expand All @@ -124,16 +141,14 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by
}
}

addressstr := "tcp:" + listenip + ":" + strconv.Itoa(lport) + " -> " + "tcp:" + strconv.FormatUint(uint64(port), 10)

onInfoFn("Listening " + addressstr)
onInfoFn("Listening " + addressinfostr)

go func() {
loop:
for {
conn, err := ln.Accept()
if err != nil {
onErrFn(fmt.Errorf("dialTCP: %s", err))
onErrFn(fmt.Errorf("dial: %s", err))
select {
case <-ctx.Done():
break loop
Expand All @@ -147,7 +162,7 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by

s, err := f.host.NewStream(ctx, peerid, dialProtID)
if err != nil {
onErrFn(fmt.Errorf("dialTCP: %s", err))
onErrFn(fmt.Errorf("dial: %s", err))
return
}
defer s.Close()
Expand All @@ -158,7 +173,7 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by

_, err = s.Write(p)
if err != nil {
onErrFn(fmt.Errorf("dialTCP: %s", err))
onErrFn(fmt.Errorf("dial: %s", err))
return
}

Expand All @@ -170,132 +185,7 @@ func (f *Forwarder) dialTCP(ctx context.Context, peerid peer.ID, protocolType by
<-ctx.Done()
ln.Close()

onInfoFn("Closed " + addressstr)
}

type udpConnAddrWriter struct {
conn *net.UDPConn
addr *net.UDPAddr
}

func (ucaw *udpConnAddrWriter) Write(p []byte) (int, error) {
return ucaw.conn.WriteToUDP(p, ucaw.addr)
}

func (f *Forwarder) dialUDP(ctx context.Context, peerid peer.ID, protocolType byte, listenip string, port uint16) {
lport := int(port)

conn, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.ParseIP(listenip),
Port: lport,
})

if err != nil {
onErrFn(fmt.Errorf("dialUDP: %s", err))

for i := 0; i < 4; i++ {
lport = rand.Intn(65535-1024) + 1024

conn, err = net.ListenUDP("udp", &net.UDPAddr{
IP: net.ParseIP(listenip),
Port: lport,
})

if err != nil {
onErrFn(fmt.Errorf("dialUDP: %s", err))
} else {
break
}
}

if err != nil {
return
}
}

addressstr := "udp:" + listenip + ":" + strconv.Itoa(lport) + " -> " + "udp:" + strconv.FormatUint(uint64(port), 10)

onInfoFn("Listening " + addressstr)

var (
buf = make([]byte, 1024)
conns = map[string]network.Stream{}
connsMux sync.Mutex
)
go func() {
loop:
for {
select {
case <-ctx.Done():
break loop
default:
n, udpaddr, err := conn.ReadFromUDP(buf)
if err != nil {
onErrFn(fmt.Errorf("dialUDP: %s", err))
continue loop
}

connsMux.Lock()
s, ok := conns[udpaddr.String()]
if !ok {
s, err = f.host.NewStream(ctx, peerid, dialProtID)
if err != nil {
connsMux.Unlock()
onErrFn(fmt.Errorf("dialUDP: %s", err))
continue loop
}

p := make([]byte, 3)
p[0] = protocolType
binary.BigEndian.PutUint16(p[1:3], port)

_, err = s.Write(p)
if err != nil {
connsMux.Unlock()
s.Close()
onErrFn(fmt.Errorf("dialUDP: %s", err))
continue loop
}

conns[udpaddr.String()] = s

go func() {
_, err := io.Copy(&udpConnAddrWriter{
conn: conn,
addr: udpaddr,
}, s)
if err != nil {
onErrFn(fmt.Errorf("dialUDP: %s", err))
}

s.Close()

connsMux.Lock()
delete(conns, udpaddr.String())
connsMux.Unlock()

}()
}
connsMux.Unlock()

_, err = s.Write(buf[:n])
if err != nil {
onErrFn(fmt.Errorf("dialUDP: %s", err))

s.Close()

connsMux.Lock()
delete(conns, udpaddr.String())
connsMux.Unlock()
}
}
}
}()

<-ctx.Done()
conn.Close()

onInfoFn("Closed " + addressstr)
onInfoFn("Closed " + addressinfostr)
}

func pipeBothIOs(ctx context.Context, a io.ReadWriter, b io.ReadWriter) {
Expand Down

0 comments on commit 2d809b5

Please sign in to comment.