Skip to content

mmadfox/go-wirenet

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

71 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

go-wirenet

Simple bidirectional TCP stream server. Useful for NAT traversal.

Coverage Status Go Documentation Go Report Card

Design

Design

Client-Server
// client <-> server
client1 join  to the server  ---------NAT------> server
client2 join to the server   ---------NAT------> server
client3 join to the server   ---------NAT------> server
client4 join to the server   ---------NAT------> server

// call from the server
call client1 from the server ---------NAT------> client1
call client 2 from the server --------NAT------> client2  
call client 3 from the server --------NAT------> client3  
call client 4 from the server --------NAT------> client4

// call from the client
call server from the client1 ---------NAT------> server
call server from the client2 ---------NAT------> server  
call server from the client3  --------NAT------> server  
call server from the client4  --------NAT------> server
Client-Hub
// clients <-> hub
client1 join to the hub  ---------NAT------> hub
client2 join to the hub  ---------NAT------> hub
client3 join to the hub  ---------NAT------> hub
client4 join to the hub  ---------NAT------> hub

// call from the client
call client2 from the client1 ---------NAT------> client2
call client1 from the client2 ---------NAT------> client1 
call client2 from the client3  --------NAT------> client2  
call client1 from the client4  --------NAT------> client1

Table of contents

Installation

go get github.com/mediabuyerbot/go-wirenet

Examples

Creating connection

import "github.com/mediabuyerbot/go-wirenet"

// make server side
wire, err := wirenet.Mount(":8989", nil)
if err != nil {
    handleError(err)
}
if err := wire.Connect(); err != nil {
    handleError(err)
}

// OR make client side 
wire, err := wirenet.Join(":8989", nil)
if err != nil {
    handleError(err)
}

// connection
if err := wire.Connect(); err != nil {
    handleError(err)
}

Stream handling

import "github.com/mediabuyerbot/go-wirenet"

// server side
wire, err := wirenet.Mount(":8989", nil)
if err != nil {
    handleError(err)
}
// or client side
wire, err := wirenet.Join(":8989", nil)
if err != nil {
    handleError(err)
}

backupStream := func(ctx context.Context, stream wirenet.Stream) {
    file, err := os.Open("/backup.log")
    ...
    // write to stream
    n, err := stream.ReadFrom(file)
    ...
    stream.Close()
}

openChromeStream := func(ctx context.Context, stream wirenet.Stream) {
      // read from stream
      n, err := stream.WriteTo(os.Stdout)
} 

wire.Stream("backup", backupStream)
wire.Stream("openChrome", openChromeStream)

if err := wire.Connect(); err != nil {
    handleError(err)
}

Stream opening

// make options
opts := []wirenet.Option{
   wirenet.WithSessionOpenHook(func(session wirenet.Session) {
   		     hub.registerSession(session)	
   		}),
   		wirenet.WithSessionCloseHook(func(session wirenet.Session) {
   			 hub.unregisterSession(session)
        }),
}
// make client side
wire, err := wirenet.Join(":8989", opts...)
// OR make server side
wire, err := wirenet.Mount(":8989", opts...)

...

// find an open session in some repository
sess := hub.findSession("sessionID")
stream, err := sess.OpenStream("backup")
if err != nil {
   handleError(err)
}
defer stream.Close()
 
backup, err := os.Open("/backup.log")
if err != nil {
   handleError(err)
}
defer backup.Close()

// write to stream
n, err := stream.ReadFrom(backup)
...

Writing to stream

wire.Stream("account.set", func(ctx context.Context, stream wirenet.Stream) {
   // write to stream using writer 
   writer := stream.Writer()
   for {
      n, err := fileOne.Read(buf)
      if err != nil {
          handleError(err)
          break
      }
   	  n, err := writer.Write(buf[:n])
      ...
   }
   // EOF frame
   writer.Close()
   
   for {
         n, err := fileTwo.Read(buf)
         if err != nil {
             handleError(err)
             break
         }
      	  n, err := writer.Write(buf[:n])
         ...
      }
      // EOF frame
      writer.Close() 
   ...

   // or write to stream (recommended) 
   n, err := stream.ReadFrom(fileOne)
   ...
   n, err := stream.ReadFrom(fileTwo)
})

Reading from stream

wire.Stream("account.set", func(ctx context.Context, stream wirenet.Stream) {
   // reading from stream using reader 
   reader := stream.Reader()
   buf := make([]byte, wirenet.BufSize)
   n, err := reader.Read(buf)
   // EOF frame
   reader.Close()
   ...

   // or reader from stream (recommended)  
   n, err := stream.WriteTo(file)
   ...
})

Using authentication

server

tokenValidator := func(streamName string, id wirenet.Identification, token wirenet.Token) error {
   if streamName == "public" {
      return nil 
   }
   return validate(token)
}

wire, err := wirenet.Mount(":8989", wirenet.WithTokenValidator(tokenValidator))
go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()
wire.Close()

client

 token := wirenet.Token("token")
 identification := wirenet.Identification("uuid")
 wire, err := wirenet.Join(":8989",
 		wirenet.WithIdentification(identification, token),
 )
 if err := wire.Connect(); err != nil {
    handleError(err)
 }

Using SSL/TLS certs

server

// make keys 
// ./certs/server.key
// ./certs/server.pem
tlsConf, err := wirenet.LoadCertificates("server", "./certs")
if err != nil {
	handleError(err)
}
wire, err := wirenet.Mount(":8989", wirenet.WithTLS(tlsConf))
go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()
wire.Close()

client

// make keys 
// ./certs/client.key
// ./certs/client.pem
tlsConf, err := wirenet.LoadCertificates("client", "./certs")
if err != nil {
	handleError(err)
}
wire, err := wirenet.Mount(":8989", wirenet.WithTLS(tlsConf))
if err := wire.Connect(); err != nil {
    handleError(err)
}

Shutdown

timeout := 120*time.Second 
wire, err := wirenet.Mount(":8989",
    // Waiting time for completion of all streams
    wirenet.WithSessionCloseTimeout(timeout),
)
go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()

wire.Close()

KeepAlive

// server side
wire, err := wirenet.Mount(":8989",
     WithKeepAlive(true),
     WithKeepAliveInterval(30 * time.Second), 
)

// OR client side
wire, err := wirenet.Join(":8989",
     WithKeepAlive(true),
     WithKeepAliveInterval(30 * time.Second), 
)

go func() {
	if err := wire.Connect(); err != nil {
	   handleError(err)
    }
}()

<-terminate()

wire.Close()

Hub mode

Attention! The name of the stream for each client must be unique!

hub

 hub, err := wirenet.Hub(":8989")
 hub.Connect()

client1

client1, err := wirenet.Join(":8989")
client1.Stream("client1:readBalance", func(ctx context.Context, s Stream) {})
go func() {
   client1.Connect()
}()
...
sess, err := client2.Session("uuid")
stream, err := sess.OpenStream("client2:readBalance")
<-termiate()
client1.Close()

client2

client2, err := wirenet.Join(":8989")
client2.Stream("client2:readBalance", func(ctx context.Context, s Stream) {})
go func() {
   client2.Connect()
}()
...
sess, err := client2.Session("uuid")
stream, err := sess.OpenStream("client1:readBalance")
<-termiate()
client2.Close()

Options

wirenet.WithConnectHook(hook func(io.Closer)) Option
wirenet.WithSessionOpenHook(hook wirenet.SessionHook) Option
wirenet.WithSessionCloseHook(hook wirenet.SessionHook) Option
wirenet.WithIdentification(id wirenet.Identification, token wirenet.Token) Option
wirenet.WithTokenValidator(v wirenet.TokenValidator) Option                   // server side
wirenet.WithTLS(conf *tls.Config) Option
wirenet.WithRetryWait(min, max time.Duration) Option
wirenet.WithRetryMax(n int) Option
wirenet.WithReadWriteTimeouts(read, write time.Duration) Option
wirenet.WithSessionCloseTimeout(dur time.Duration) Option