Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 39 additions & 25 deletions pkg/chassis/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ import (
"net"
"net/http"
"os"
"os/signal"
"reflect"
"syscall"
"time"

ntv1 "github.com/steady-bytes/draft/api/core/control_plane/networking/v1"
Expand All @@ -34,8 +32,18 @@ type (
RPCRegistrar
ConsensusRegistrar
}
blueprintConn struct {
stream *connect.BidiStreamForClient[sdv1.ClientDetails, sdv1.ClusterDetails]
closer chan struct{}
}
)

var closer CloseChan

func Closer() CloseChan {
return closer
}

////////////////////////////
// Plugin Register Functions
////////////////////////////
Expand Down Expand Up @@ -134,8 +142,10 @@ func (c *Runtime) newBlueprintClient(useEntrypoint bool) {
} else {
if node := c.blueprintCluster.Pop(); node == nil {
// TODO: determine what to do if there are not any nodes to connect to
c.logger.Warn("no blueprint node within known cluster. falling back on service config")
entrypoint = c.config.GetString("service.entrypoint")
} else {
entrypoint = fmt.Sprintf("http://%s", node.Address)
entrypoint = node.Address
}
}

Expand All @@ -161,11 +171,11 @@ func (c *Runtime) Register(options RegistrationOptions) *Runtime {
err error
)

for range [INTITIALIZE_LIMIT]int{} {
for range INTITIALIZE_LIMIT {
// connect with `blueprint` to get an identity
pid, err = c.initialize()
if err != nil {
c.logger.WithError(err).Error("failed to initialize process")
c.logger.WithError(err).Error("failed to initialize process (may retry)")
time.Sleep(SYNC_INTERVAL)
continue
}
Expand Down Expand Up @@ -209,11 +219,13 @@ func (c *Runtime) initialize() (*sdv1.ProcessIdentity, error) {
// - Add a state machine around health, and running state of the process that can be reported by the service layer
func (c *Runtime) synchronize(ctx context.Context, pid *sdv1.ProcessIdentity, opts RegistrationOptions) {
var (
stream = c.blueprintClient.Synchronize(ctx)
closer = make(chan struct{})
conn = &blueprintConn{
stream: c.blueprintClient.Synchronize(ctx),
closer: make(chan struct{}),
}
)

go c.receiveAck(stream, closer)
go c.receiveAck(conn)

for {
meta := make([]*sdv1.Metadata, 0)
Expand All @@ -225,7 +237,8 @@ func (c *Runtime) synchronize(ctx context.Context, pid *sdv1.ProcessIdentity, op
})
}

adder := fmt.Sprintf("%s:%d", c.config.GetString("service.network.advertise_address"), c.config.GetInt("service.network.port"))
// TODO: should we also save external host/port?
adder := fmt.Sprintf("%s:%d", c.config.GetString("service.network.internal.host"), c.config.GetInt("service.network.internal.port"))

req := connect.NewRequest(&sdv1.ClientDetails{
Pid: pid.GetPid(),
Expand All @@ -238,18 +251,21 @@ func (c *Runtime) synchronize(ctx context.Context, pid *sdv1.ProcessIdentity, op
AdvertiseAddress: adder,
})

if err := stream.Send(req.Msg); err != nil {
err := conn.stream.Send(req.Msg)
if err != nil {
// If a connection is lost with the leader. Attempt to connect to other known blueprint
// instances to find the new leader to send status to
c.logger.WithError(err).Error("failed to send process details to blueprint, starting recovery process")
c.logger.WithError(err).Error("failed to send process details to blueprint (will retry)")
c.newBlueprintClient(false)

if c.blueprintClient == nil {
c.logger.Error("can't connect to blueprint")
return
}

c.synchronize(context.Background(), pid, opts)
conn.stream = c.blueprintClient.Synchronize(ctx)
} else {
c.logger.WithField("message", req.Msg).Trace("sync successful")
}

time.Sleep(SYNC_INTERVAL)
Expand All @@ -259,19 +275,19 @@ func (c *Runtime) synchronize(ctx context.Context, pid *sdv1.ProcessIdentity, op
// `receiveAck` processes all incoming messages from the synchronize stream. `ClusterDetails` are received and updated in a local store
// wrapped with a mutex to store any changes to a connected blueprint cluster. This lends it's self to a more realtime gossip data dissemination
// of blueprint cluster details.
func (c *Runtime) receiveAck(stream *connect.BidiStreamForClient[sdv1.ClientDetails, sdv1.ClusterDetails], closer chan struct{}) {
func (c *Runtime) receiveAck(conn *blueprintConn) {
for {
in, err := stream.Receive()
in, err := conn.stream.Receive()
if err == io.EOF {
close(closer)
close(conn.closer)
return
}
if err != nil {
c.logger.WithError(err).Error("stream closed")
close(closer)
close(conn.closer)
return
}
c.logger.WithField("nodes", in.GetNodes()).Info("got message")
c.logger.WithField("nodes", in.GetNodes()).Trace("got message")
// when an ack message is received from the connected blueprint node is received
// save the nodes to memory so when a failure occurs on the blueprint cluster the
// chassis synchronize connection can be reestablished to the blueprint leader to report
Expand All @@ -294,15 +310,13 @@ func (c *Runtime) Start() {
if c.mux == nil {
c.mux = http.NewServeMux()
}
close := make(chan os.Signal, 1)
signal.Notify(close, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

if c.isRPC {
c.runRPC(close, handler)
c.runRPC(handler)
}

if !c.noMux {
go c.runMux(close, handler)
go c.runMux(handler)
}

// TODO: start consumers
Expand All @@ -315,7 +329,7 @@ func (c *Runtime) Start() {
}

// wait for close signal
<-close
<-closer
c.shutdown()
}

Expand Down Expand Up @@ -366,7 +380,7 @@ func (c *Runtime) shutdown() {
}

// TODO -> use closer
func (c *Runtime) runRPC(_ CloseChan, _ http.Handler) {
func (c *Runtime) runRPC(_ http.Handler) {
if len(c.rpcReflectionServiceNames) > 0 {
reflector := grpcreflect.NewStaticReflector(c.rpcReflectionServiceNames...)
c.mux.Handle(grpcreflect.NewHandlerV1(reflector))
Expand All @@ -375,8 +389,8 @@ func (c *Runtime) runRPC(_ CloseChan, _ http.Handler) {
}

// TODO -> use closer
func (c *Runtime) runMux(_ CloseChan, handler http.Handler) {
addr := fmt.Sprintf("%s:%d", c.config.GetString("service.network.bind_address"), c.config.GetInt("service.network.port"))
func (c *Runtime) runMux(handler http.Handler) {
addr := fmt.Sprintf("%s:%d", c.config.GetString("service.network.bind_address"), c.config.GetInt("service.network.bind_port"))
c.logger.Info(fmt.Sprintf("running server on: %s", addr))
if err := http.ListenAndServe(addr, h2c.NewHandler(handler, &http2.Server{})); err != nil {
c.logger.WithError(err).Panic("failed to start mux server")
Expand Down
5 changes: 2 additions & 3 deletions pkg/chassis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,8 @@ func LoadConfig() Config {
}

func setDefaults(v *viper.Viper) {
v.SetDefault("service.network.port", 8090)
v.SetDefault("service.network.bind_address", "0.0.0.0")
v.SetDefault("service.network.advertise_address", "127.0.0.1")
v.SetDefault("service.network.bind_address", "localhost")
v.SetDefault("service.network.bind_port", 8090)
v.SetDefault("service.env", "local")
v.SetDefault("service.logging.level", "info")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/chassis/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *Runtime) bootstrapRaft(registrar ConsensusRegistrar) {
for leader := range notifyCh {
registrar.LeadershipChange(c.logger, leader, url)
}
}(raftScheme, raftHost, c.config.GetString("service.network.port"))
}(raftScheme, raftHost, c.config.GetString("service.network.bind_port"))

// configuration for raft
if raftPort == "" || raftNodeID == "" {
Expand Down
4 changes: 0 additions & 4 deletions pkg/chassis/consensus_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chassis
import (
"context"
"errors"
"fmt"

"github.com/hashicorp/raft"
)
Expand All @@ -29,19 +28,16 @@ func NewRaftController(r *raft.Raft) RaftController {

func (c *raftController) Join(ctx context.Context, nodeID, raftAddress string) error {
if c.raft.State() != raft.Leader {
fmt.Println("must join leader")
return errors.New("must join leader")
}

config := c.raft.GetConfiguration()
if err := config.Error(); err != nil {
fmt.Println("failed to get configuration")
return errors.New("failed to get configuration")
}

index := c.raft.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(raftAddress), 0, 0)
if index.Error() != nil {
fmt.Println("failed to add new voter", index.Error())
return errors.New("failed to add new voter")
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/chassis/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package chassis
import (
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"

sdv1 "github.com/steady-bytes/draft/api/core/registry/service_discovery/v1"
sdv1Cnt "github.com/steady-bytes/draft/api/core/registry/service_discovery/v1/v1connect"
Expand Down Expand Up @@ -51,6 +54,10 @@ func (b *BlueprintCluster) Pop() *sdv1.Node {
}

func New(logger Logger) *Runtime {
// set up closer channel to handle graceful shutdown
closer = make(chan os.Signal, 1)
signal.Notify(closer, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)

if logger == nil {
panic("logger cannot be nil")
}
Expand Down
Loading