Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .config/wire.yaml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Wire configuration file example.
# Copy to wire.yaml and customize as needed.
#
# Precedence (highest to lowest):
# CLI flags → environment variables → config file(s) → defaults
#
# Environment variable substitution is supported:
# ${VAR} — replaced by the value of VAR (error if unset)
# ${VAR:-default} — replaced by VAR, or "default" if unset

node:
# id: "wire-node-1" # Node ID (defaults to hostname)
data_dir: "data/coordinator" # PebbleDB metadata storage
store_db: "pebble" # Storage backend

http:
addr: ":4001" # HTTP API listen address
# adv_addr: "" # Advertised address for cluster peers
# allow_origin: "" # CORS allowed origin

# tls:
# cert: "/path/to/cert.pem"
# key: "/path/to/key.pem"
# ca_cert: "/path/to/ca.pem"

# node_tls: # Wire protocol TLS
# cert: "/path/to/cert.pem"
# key: "/path/to/key.pem"
# ca_cert: "/path/to/ca.pem"
# verify_client: false

# auth:
# file: "/path/to/auth.json"

write_queue:
capacity: 1024
batch_size: 128
timeout: "50ms"
# transactional: false

election:
backend: "noop" # "noop" or "filelock"
lock_path: "data/coordinator/leader.lock"
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ tmp/
# Log files
*.log

# Pal MCP config
.mcp.json

# misc files
wsnapshots/
raft/
Expand All @@ -48,3 +51,4 @@ raft_database2/
raft_database3/
raft.db
wire
data/
30 changes: 26 additions & 4 deletions cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ type Config struct {

// MaxFrameSize is the maximum wire protocol frame size in bytes.
MaxFrameSize uint32

// CoordinatorDataDir is the directory for coordinator metadata (PebbleDB).
CoordinatorDataDir string

// CoordinatorNodeID is the unique identifier for this coordinator node.
CoordinatorNodeID string

// HTTPListenAddr is the HTTP API listen address.
HTTPListenAddr string

// ElectionBackend selects the leader election backend ("noop" or "filelock").
ElectionBackend string

// ElectionLockPath is the path to the lock file for the filelock election backend.
ElectionLockPath string
}

// BuildInfo holds version metadata populated at build time.
Expand All @@ -42,10 +57,10 @@ type BuildInfo struct {
Branch string
}

func initFlags(name, desc string, build *BuildInfo) (*Config, error) {
func initFlags(name, desc string, build *BuildInfo) (*Config, *pflag.FlagSet, error) {

if pflag.Parsed() {
return nil, fmt.Errorf("command-line flags already parsed")
return nil, nil, fmt.Errorf("command-line flags already parsed")
}

config := &Config{}
Expand All @@ -70,13 +85,20 @@ func initFlags(name, desc string, build *BuildInfo) (*Config, error) {
f.BoolVar(&config.NodeVerifyClient, "node-verify-client", false, "require mutual TLS")
f.Uint32Var(&config.MaxFrameSize, "max-frame-size", 16777216, "max wire protocol frame size")

// Coordinator flags
f.StringVar(&config.CoordinatorDataDir, "coordinator-data-dir", "data/coordinator", "coordinator metadata storage directory")
f.StringVar(&config.CoordinatorNodeID, "node-id", "", "coordinator node ID (defaults to hostname)")
f.StringVar(&config.HTTPListenAddr, "http-listen", ":4001", "HTTP API listen address")
f.StringVar(&config.ElectionBackend, "election-backend", "noop", "leader election backend (noop, filelock)")
f.StringVar(&config.ElectionLockPath, "election-lock-path", "data/coordinator/leader.lock", "file path for filelock election backend")

f.Usage = func() {
fmt.Fprintf(os.Stderr, "\n%s\n\n", desc)
fmt.Fprintf(os.Stderr, "Usage: %s [flags]\n\n", name)
f.PrintDefaults()
}

pflag.CommandLine.MarkHidden("help")
_ = pflag.CommandLine.MarkHidden("help")

if err := f.Parse(os.Args[1:]); err != nil {
fmt.Printf("error when loading flags: %v\n", err)
Expand All @@ -89,7 +111,7 @@ func initFlags(name, desc string, build *BuildInfo) (*Config, error) {
errorExit(0, msg)
}

return config, nil
return config, f, nil
}

func errorExit(code int, msg string) {
Expand Down
99 changes: 92 additions & 7 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package main

import (
"context"
"fmt"
"net/http"
"os"
"syscall"

"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"

"github.com/tarungka/wire/internal/cmd"
"github.com/tarungka/wire/internal/config"
"github.com/tarungka/wire/internal/coordinator"
"github.com/tarungka/wire/internal/logger"
)

Expand Down Expand Up @@ -34,17 +40,18 @@ func main() {
// Handle signals first, so signal handling is established before anything else.
sigCh := HandleSignals(syscall.SIGINT, syscall.SIGTERM, os.Interrupt)
// Main context
mainCtx, _ := CreateContext(sigCh)
mainCtx, mainCancel := CreateContext(sigCh)
defer mainCancel()

// Setup logging
// logs will be written to both server.log and stdout
logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
fmt.Printf("failed to create log file")
}
defer logFile.Close()
defer func() { _ = logFile.Close() }()

cfg, err := initFlags(name, desc, &BuildInfo{
cliCfg, flagSet, err := initFlags(name, desc, &BuildInfo{
Version: cmd.Version,
Commit: cmd.Commit,
Branch: cmd.Branch,
Expand All @@ -54,18 +61,96 @@ func main() {
}
fmt.Print(logo)

logger.SetDevelopment(cfg.DebugMode)
// Load config files, apply CLI flag overrides, and validate.
wireCfg, err := config.Load(cliCfg.ConfigPath)
if err != nil {
fmt.Fprintf(os.Stderr, "fatal: %v\n", err)
os.Exit(1)
}
if err := config.ApplyFlags(&wireCfg, flagSet); err != nil {
fmt.Fprintf(os.Stderr, "fatal: %v\n", err)
os.Exit(1)
}
if err := wireCfg.Validate(); err != nil {
fmt.Fprintf(os.Stderr, "fatal: %v\n", err)
os.Exit(1)
}

logger.SetDevelopment(wireCfg.Node.Debug)
logger.SetLogFile(logFile)

log.Logger = logger.GetLogger("main")

if cfg.DebugMode {
if wireCfg.Node.Debug {
log.Debug().Msgf("PID: %v | PPID: %v", os.Getpid(), os.Getppid())
}

log.Info().Msg("Starting wire...")

// Block until context is canceled by signal.
<-mainCtx.Done()
// Resolve coordinator node ID.
nodeID := wireCfg.Node.ID
if nodeID == "" {
nodeID, _ = os.Hostname()
if nodeID == "" {
nodeID = "wire-node-1"
}
}

// Create metadata store (PebbleDB).
store, err := coordinator.NewPebbleStore(wireCfg.Node.DataDir)
if err != nil {
log.Fatal().Err(err).Msg("failed to open coordinator metadata store")
}
defer func() { _ = store.Close() }()

// Create leader election backend.
var election coordinator.LeaderElection
switch wireCfg.Election.Backend {
case "filelock":
election = coordinator.NewFileLockElection(wireCfg.Election.LockPath, wireCfg.HTTP.Addr)
case "noop", "":
// Single-node mode: no election needed.
default:
log.Fatal().Str("backend", wireCfg.Election.Backend).Msg("unknown election backend")
}

// Create coordinator.
coordCfg := coordinator.CoordinatorConfig{
DataDir: wireCfg.Node.DataDir,
NodeID: nodeID,
ListenAddr: wireCfg.HTTP.Addr,
}
coord := coordinator.New(coordCfg, store, election, log.Logger)

// Create HTTP server.
httpSrv := coordinator.NewHTTPServer(coord, wireCfg.HTTP.Addr, log.Logger)

// Start everything in an errgroup.
g, gCtx := errgroup.WithContext(mainCtx)

g.Go(func() error {
return coord.Run(gCtx)
})

g.Go(func() error {
err := httpSrv.ListenAndServe()
if err == http.ErrServerClosed {
return nil
}
return err
})

g.Go(func() error {
<-gCtx.Done()
log.Info().Msg("Shutting down...")
_ = coord.Shutdown(context.Background())
_ = httpSrv.Shutdown(context.Background())
return nil
})

if err := g.Wait(); err != nil && err != context.Canceled {
log.Fatal().Err(err).Msg("wire exited with error")
}

log.Info().Msg("Shutting down.")
}
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Central index for all Wire project documentation.
| [diagrams.md](diagrams.md) | Visual reference — Mermaid architecture, data flow, checkpointing, and state diagrams | Canon v1.0.0 |
| [techinical-documentation.md](techinical-documentation.md) | Full engineering spec — strategy, API surface, GPU design, and connectors | Draft v0.1.0 |
| [gemini-conversations-export.md](gemini-conversations-export.md) | Exported Gemini research conversations — actor model, PebbleDB, Flink internals, language choice | Reference |
| [usage.md](usage.md) | Getting started — building, running, HTTP API reference, and SDK quick start | Canon v1.0.0 |
| [glossary.md](glossary.md) | Glossary of Wire-specific terms and definitions | Reference |

## Conventions
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
Wire implements a classic **Master-Worker** distributed architecture.

### 1.1 The Coordinator (Control Plane)
The central brain of the cluster. It is lightweight and generally stateless (relying on an external metadata store or leader election for HA).
The central brain of the cluster. It persists metadata locally via PebbleDB and supports a phased HA strategy: pluggable leader election, fencing tokens for split-brain prevention, and recovery from durable storage (see WIP-09).

**Responsibilities:**
* **Job Management:** Accepts JobGraphs, optimizes them, and schedules execution.
Expand Down
Loading