A general-purpose TCP/UDP data transport library written in Go.
NetPipe handles the networking complexity so you don't have to. Import the library, call a few functions, and you have a working server or client. Framing, encryption, streaming, reconnection, heartbeats -- all of it is internal. You just send bytes and receive bytes.
go get github.com/ipaz12/NetPipe
Requires Go 1.21 or later. One external dependency: github.com/google/uuid.
Note: The import path uses a capital N to match the GitHub repo name. In your code, you reference the package as netpipe (lowercase) since that is the declared package name.
Server (15 lines):
package main
import (
"fmt"
"github.com/ipaz12/NetPipe"
)
func main() {
server := netpipe.NewServer(netpipe.Config{})
server.OnConnect(func(id string) {
fmt.Printf("%s connected (%d online)\n", id[:8], server.ClientCount())
})
server.OnData(func(id string, data []byte) {
fmt.Printf("[%s]: %s", id[:8], string(data))
server.BroadcastExcept(id, data) // relay to everyone else
})
server.Listen()
}Client (20 lines):
package main
import (
"bufio"
"fmt"
"os"
"github.com/ipaz12/NetPipe"
)
func main() {
client := netpipe.NewClient(netpipe.Config{})
client.OnData(func(data []byte) {
fmt.Print(string(data))
})
client.Connect()
go client.Listen()
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
client.Send([]byte(scanner.Text() + "\n"))
}
}That's a working chat system. The server relays messages between clients. No framing code, no buffer management, no connection handling. Just Send() and OnData.
Transport: TCP and UDP. Switch protocols with one config change. The rest of the API stays the same.
Message Framing: TCP has no concept of message boundaries. NetPipe adds length-prefix framing so messages always arrive whole, no matter how the network fragments them. You never think about this.
Encryption: AES-256-GCM with SHA-256 key derivation. Send encrypted and unencrypted messages side by side.
client.Send([]byte("public"))
client.SendEncrypted([]byte("private"), "my-secret-key")The server auto-decrypts if you set EncryptionKey in the config.
Streaming: Large payloads are automatically chunked, sent, and reassembled on the other end. You get the whole thing in one callback.
client.SendStream(bigData) // auto-chunked at 64 KB
server.OnStream(func(id string, data []byte) {
// data is fully reassembled, could be megabytes
})Heartbeat: The server pings clients periodically. Clients auto-respond. Dead connections are detected and cleaned up. Zero developer effort.
Auto-Reconnect: The client reconnects automatically with exponential backoff and jitter. Set AutoReconnect: true and forget about it.
Session Resumption: When a client reconnects, the server recognizes it and keeps the same client ID. Your application code sees continuity across disconnections.
Graceful Drain: Stop accepting new connections but let existing clients finish their work.
remaining := server.Drain() // blocks until all clients leave or timeoutSecurity Hardening: Semaphore-based MaxClients (no race conditions), per-IP connection limits, connect timeout (anti-slowloris), per-client stream limits, deferred cleanup (panic-safe), and rejection frames so clients know why they were dropped.
ATC Dashboard: An optional visual dashboard that shows your server or mesh as a live node graph with animated data transfers. One config flag to enable.
server := netpipe.NewServer(netpipe.Config{
EnableATC: true, // opens dashboard at http://localhost:5001
})NetPipe includes a full P2P mode for direct peer-to-peer communication without a central server. Every connection performs an automatic X25519 Diffie-Hellman key exchange. All traffic is encrypted by default.
peer := netpipe.NewPeer(netpipe.PeerConfig{Port: 6000})
peer.OnData(func(peerID string, data []byte) {
fmt.Printf("[%s]: %s\n", peerID[:8], string(data))
})
go peer.Listen() // accept incoming peers
peerID, _ := peer.Connect("192.168.1.10", 6000) // connect to another peer
peer.Broadcast([]byte("hello mesh"))P2P features include full mesh topology, LAN broadcast discovery, partial mesh routing with TTL, and per-connection encryption where each peer pair has its own independent shared secret.
LAN Discovery:
go peer.StartDiscoveryListener() // make this peer discoverable
peer.Discover(func(found []netpipe.PeerInfo) {
for _, p := range found {
peer.ConnectTo(p)
}
})Routed Messages (partial mesh, not every peer directly connected):
peer.SendRouted(targetPeerID, []byte("reaches target through intermediaries"))| Field | Type | Default | Description |
|---|---|---|---|
| Port | int | 5000 | Listening port |
| Protocol | string | "tcp" | "tcp" or "udp" |
| EncryptionKey | string | "" | Auto-decrypt incoming encrypted messages |
| ChunkSize | int | 65536 | Stream chunk size in bytes |
| MaxStreamsPerClient | int | 0 | Max concurrent incomplete streams per client (0 = unlimited) |
| MaxClients | int | 0 | Max simultaneous connections (0 = unlimited) |
| HeartbeatInterval | Duration | 0 | Ping interval (0 = disabled) |
| HeartbeatTimeout | Duration | 2x interval | Time to wait for pong |
| IdleTimeout | Duration | 0 | Disconnect idle clients (0 = disabled) |
| ConnectTimeout | Duration | 30s | Max time before first message |
| MaxConnsPerIP | int | 0 | Per-IP connection limit (0 = unlimited) |
| DrainTimeout | Duration | 30s | Max wait for Drain() |
| EnableATC | bool | false | Enable ATC visual dashboard |
| ATCPort | int | Port+1 | Dashboard HTTP port |
| Field | Type | Default | Description |
|---|---|---|---|
| Host | string | "localhost" | Server address |
| Port | int | 5000 | Server port |
| Protocol | string | "tcp" | "tcp" or "udp" |
| AutoReconnect | bool | false | Auto-reconnect on disconnect |
| ReconnectInterval | Duration | 2s | Base interval (doubles with backoff) |
| MaxReconnectAttempts | int | 0 | Max retries (0 = infinite) |
| Field | Type | Default | Description |
|---|---|---|---|
| Port | int | 6000 | Listening port |
| Protocol | string | "tcp" | Protocol |
| Name | string | "" | Human-readable name for LAN discovery |
| NoEncrypt | bool | false | Disable DH encryption (testing only) |
| ChunkSize | int | 65536 | Stream chunk size |
| EnableATC | bool | false | Enable ATC dashboard |
| ATCPort | int | Port+1 | Dashboard port |
server := netpipe.NewServer(config)
// Callbacks
server.OnConnect(func(clientID string) { })
server.OnDisconnect(func(clientID string) { })
server.OnData(func(clientID string, data []byte) { })
server.OnStream(func(clientID string, data []byte) { })
// Lifecycle
server.Listen() // blocks
server.Close() // shut down
server.Drain() // graceful shutdown
// Send
server.SendTo(clientID, data)
server.SendToEncrypted(clientID, data, key)
server.Broadcast(data)
server.BroadcastEncrypted(data, key)
server.BroadcastExcept(excludeID, data)
server.BroadcastExceptEncrypted(excludeID, data, key)
server.SendStream(clientID, data)
server.BroadcastStream(data)
// Utility
server.GetClients() // []string
server.ClientCount() // int
server.Kick(clientID) // force disconnectclient := netpipe.NewClient(config)
// Callbacks
client.OnData(func(data []byte) { })
client.OnDisconnect(func() { })
client.OnStream(func(data []byte) { })
client.OnReject(func(reason string) { })
// Lifecycle
client.Connect()
client.Disconnect()
client.Listen() // blocks (run in goroutine)
client.IsConnected()
// Send
client.Send(data)
client.SendEncrypted(data, key)
client.SendStream(data)
client.SetEncryptionKey(key) // auto-decrypt incomingpeer := netpipe.NewPeer(config)
// Callbacks
peer.OnConnect(func(peerID string) { })
peer.OnDisconnect(func(peerID string) { })
peer.OnData(func(peerID string, data []byte) { })
peer.OnStream(func(peerID string, data []byte) { })
// Lifecycle
peer.Listen() // blocks (run in goroutine)
peer.Connect(host, port) // returns peerID, error
peer.Close()
// Send
peer.Send(data) // to first connected peer
peer.SendTo(peerID, data)
peer.Broadcast(data)
peer.SendStream(peerID, data)
peer.BroadcastStream(data)
peer.SendRouted(destID, data) // through intermediaries
// Discovery
peer.Discover(callback)
peer.ConnectTo(peerInfo)
peer.StartDiscoveryListener()
// Utility
peer.GetPeerID()
peer.GetPeers()
peer.PeerCount()
peer.IsConnected()
peer.Disconnect(peerID)
peer.DisconnectAll()See PROTOCOL.md for the full wire protocol specification. Any client in any language that implements the protocol can talk to a NetPipe server. A Python client is available at netpipe-python.
config.go Config structs and defaults
server.go TCP server with client registry
client.go TCP client with auto-reconnect
framing.go Length-prefix wire protocol
crypto.go AES-256-GCM encryption
stream.go Chunked streaming with reassembly
heartbeat.go Ping/pong, idle timeout, reconnect logic
safeconn.go Write-mutex connection wrapper
session.go Session resumption across reconnects
udp.go UDP server and client
pp.go P2P peer with DH-encrypted mesh
pp_crypto.go X25519 key exchange
pp_discovery.go LAN broadcast discovery
pp_routing.go Partial mesh message routing
atc_event.go ATC telemetry event bus
atc_server.go ATC HTTP/SSE server
atc_hooks.go ATC integration with Server and Peer
atc_dashboard.go ATC embedded web dashboard
PROTOCOL.md Wire protocol specification
cmd/
server/main.go Demo chat server
client/main.go Demo chat client
p2p/main.go Demo P2P mesh chat
git clone https://github.com/ipaz12/NetPipe
cd NetPipe
# Chat server + clients
go run cmd/server/main.go # terminal 1
go run cmd/client/main.go # terminal 2
go run cmd/client/main.go # terminal 3
# P2P mesh
go run cmd/p2p/main.go -listen -port 6000 # terminal 1
go run cmd/p2p/main.go -connect 127.0.0.1:6000 -port 6001 # terminal 2
go run cmd/p2p/main.go -connect 127.0.0.1:6000 -port 6002 # terminal 3go test ./... -v22 tests covering framing, encryption, streaming, integration (server/client round-trip, encrypted round-trip, stream reassembly, broadcast, max clients, graceful drain), and P2P mesh.
MIT