Go toolkit for server-to-device communication through intermediary bridges. WebSocket hub, request-response correlation, offline command queue.
Your server needs to send commands to a BLE device, but BLE is a local radio protocol — servers can't reach devices directly. A phone physically near the device acts as a bridge.
Server ←──WebSocket──→ Phone App ←──BLE──→ Device
This library handles the server side of that architecture:
- WebSocket hub — manages connections from phones and users, with lifecycle hooks and subscription-based broadcasting
- Session manager — correlates outgoing commands with responses using request IDs and timeouts
- Offline queue — stores commands when a device is offline, delivers them on reconnect
- Device config registry — maps device types to their BLE service/characteristic UUIDs
go get github.com/khavrks/bridgekit| Package | What it does |
|---|---|
relay |
BLE protocol types, session manager, device config registry |
ws |
WebSocket hub, client lifecycle, read/write pumps, rate limiter |
queue |
Offline command queue interface + 4 backends (Memory, Postgres, Redis, RabbitMQ) |
package main
import (
"github.com/khavrks/bridgekit/relay"
"github.com/khavrks/bridgekit/ws"
"github.com/khavrks/bridgekit/queue"
)
func main() {
// 1. Create and run the WebSocket hub
hub := ws.NewHub()
go hub.Run()
// 2. Register device BLE configs
configs := relay.NewConfigRegistry(relay.DeviceConfig{
ServiceUUID: "0000fff0-0000-1000-8000-00805f9b34fb",
WriteUUID: "0000fff1-0000-1000-8000-00805f9b34fb",
ListenUUID: "0000fff2-0000-1000-8000-00805f9b34fb",
})
// 3. Create session manager (sends commands through the hub)
sessions := relay.NewSessionManager(func(deviceID string, req relay.WriteRequest) bool {
return hub.SendToDevice(deviceID, ws.Message{Type: "ble_write", DeviceID: deviceID})
}, configs)
// 4. Send a command (blocks until response or timeout)
resp, err := sessions.SendCommand("device-123", "smart-lock", "aabbccdd", 10*time.Second)
}The hub manages two types of connections:
- User connections — a user can have multiple (tabs, devices). The hub detects first-connect and last-disconnect.
- Device connections — one per device. This is the phone bridging BLE.
hub := ws.NewHub()
hub.SetOnDeviceConnect(func(deviceID string) {
log.Printf("device %s is online", deviceID)
})
hub.SetOnDeviceDisconnect(func(deviceID string) {
log.Printf("device %s went offline", deviceID)
})
// Subscription-based broadcasting: notify all users who care about a device
hub.SetSubscriptionLoader(func(userID string) []string {
return db.GetDeviceIDsForUser(userID) // your DB query
})
hub.BroadcastToDevice("device-123", ws.Message{Type: "state_changed"})Request-response correlation over an async WebSocket channel:
// Server sends command → phone writes to BLE → device responds → phone sends back
resp, err := sessions.SendCommand("device-123", "lock-type", "hex-payload", 10*time.Second)
// In your WebSocket message handler, route responses back:
sessions.HandleResponse(bleResponse)Commands for offline devices are stored and delivered on reconnect:
// Pick your backend — all implement queue.Store
q := queue.NewMemoryStore() // dev/testing
q := queue.NewPostgresStore(pgPool) // durable, battle-tested
q := queue.NewRedisStore(redisClient) // fast, natural TTL
q, _ := queue.NewRabbitMQStore(amqpConn, queue.RabbitMQConfig{}) // reliable delivery
// Queue a command (with dedup — same type overwrites)
q.EnqueueOrUpdate(ctx, "device-123", "lock", payload, 1*time.Hour)
// On reconnect, drain all pending commands
payloads, _ := q.DrainPending(ctx, "device-123")
// Commands requiring approval before execution
cmdID, _ := q.EnqueueWithConfirmation(ctx, "device-123", "unlock", payload, 1*time.Hour)
q.ConfirmCommand(ctx, cmdID) // user approves
// or: q.CancelCommand(ctx, cmdID) // user deniesRabbitMQ also supports real-time consumption from per-device queues:
rmq, _ := queue.NewRabbitMQStore(conn, queue.RabbitMQConfig{})
// When a device connects, start consuming its queue
payloads, _ := rmq.Consume(ctx, "device-123")
go func() {
for payload := range payloads {
sessions.SendCommand("device-123", "lock-type", string(payload), 10*time.Second)
}
}()| Backend | Durability | Speed | Best for |
|---|---|---|---|
| Memory | None (lost on restart) | Fastest | Testing, development |
| Postgres | Full | Moderate | Primary store, complex queries, existing PG infra |
| Redis | Configurable (AOF/RDB) | Fast | High throughput, natural TTL, existing Redis infra |
| RabbitMQ | Full (persistent msgs) | Fast | Reliable delivery, fan-out, existing AMQP infra |
The handler works with any HTTP framework — just pass http.ResponseWriter and *http.Request:
// net/http
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
ws.HandleWebSocket(hub, w, r, userID, deviceID, onMessage)
})
// Fiber (via fasthttpadaptor)
// Chi, Gin, Echo — same pattern, extract w and r from your frameworkIf using PostgresStore, create this table:
CREATE TABLE command_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
device_id TEXT NOT NULL,
command_type TEXT,
payload BYTEA NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
delivered BOOLEAN NOT NULL DEFAULT false,
requires_confirmation BOOLEAN NOT NULL DEFAULT false,
confirmed BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE UNIQUE INDEX idx_command_queue_dedup
ON command_queue (device_id, command_type)
WHERE delivered = false;Your phone app needs to:
- Connect to the WebSocket server with
?userId=X&deviceId=Y - Connect to the BLE device
- When receiving a
ble_writemessage:- Write
payload(hex-decoded) to the device's write characteristic - Read the response from the listen characteristic
- Send back a
ble_responsewith the samerequestId
- Write
- Send
ble_statusmessages when BLE connectivity changes
See the protocol types in relay/protocol.go for message formats.
| Dependency | Purpose |
|---|---|
| gorilla/websocket | WebSocket protocol |
| google/uuid | Request correlation IDs |
| jackc/pgx | Postgres queue backend |
| redis/go-redis | Redis queue backend |
| rabbitmq/amqp091-go | RabbitMQ queue backend |
MIT