Skip to content

Commit

Permalink
cluster: WatchServiceStatus, RegisterService
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Aug 13, 2020
1 parent 117b5f7 commit da4e6ef
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 12 deletions.
6 changes: 5 additions & 1 deletion cluster/calcium/calcium.go
@@ -1,6 +1,7 @@
package calcium

import (
"context"
"strings"

"github.com/projecteru2/core/cluster"
Expand All @@ -21,6 +22,9 @@ type Calcium struct {
store store.Store
scheduler scheduler.Scheduler
source source.Source
watcher *serviceWatcher

cancelServiceHeartbeat context.CancelFunc
}

// New returns a new cluster config
Expand Down Expand Up @@ -49,7 +53,7 @@ func New(config types.Config, embeddedStorage bool) (*Calcium, error) {
log.Warn("[Calcium] SCM not set, build API disabled")
}

return &Calcium{store: store, config: config, scheduler: scheduler, source: scm}, err
return &Calcium{store: store, config: config, scheduler: scheduler, source: scm, watcher: &serviceWatcher{}}, err
}

// Finalizer use for defer
Expand Down
134 changes: 134 additions & 0 deletions cluster/calcium/service.go
@@ -0,0 +1,134 @@
package calcium

import (
"context"
"sync"
"time"

"github.com/google/uuid"
"github.com/projecteru2/core/store"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

type serviceWatcher struct {
once sync.Once
subs sync.Map
}

func (w *serviceWatcher) Start(s store.Store, pushInterval time.Duration) {
w.once.Do(func() {
w.start(s, pushInterval)
})
}

func (w *serviceWatcher) start(s store.Store, pushInterval time.Duration) {
ch, err := s.ServiceStatusStream(context.Background())
if err != nil {
log.Errorf("[WatchServiceStatus] failed to start watch: %v", err)
return
}

go func() {
defer log.Error("[WatchServiceStatus] goroutine exited")
var (
latestStatus types.ServiceStatus
timer *time.Timer = time.NewTimer(pushInterval / 2)
)
for {
select {
case addresses, ok := <-ch:
if !ok {
log.Error("[WatchServiceStatus] watch channel closed")
return
}

latestStatus = types.ServiceStatus{
Addresses: addresses,
Interval: pushInterval,
}
w.dispatch(latestStatus)

case <-timer.C:
w.dispatch(latestStatus)
}
timer.Stop()
timer.Reset(pushInterval / 2)
}
}()
}

func (w *serviceWatcher) dispatch(status types.ServiceStatus) {
w.subs.Range(func(k, v interface{}) bool {
c, ok := v.(chan<- types.ServiceStatus)
if !ok {
log.Error("[WatchServiceStatus] failed to cast channel from map")
return true
}
c <- status
return true
})
}

func (w *serviceWatcher) Subscribe(ch chan<- types.ServiceStatus) uuid.UUID {
id := uuid.New()
_, _ = w.subs.LoadOrStore(id, ch)
return id
}

func (w *serviceWatcher) Unsubscribe(id uuid.UUID) {
w.subs.Delete(id)
}

func (c *Calcium) WatchServiceStatus(ctx context.Context) (<-chan types.ServiceStatus, error) {
ch := make(chan types.ServiceStatus)
c.watcher.Start(c.store, c.config.ServiceDiscoveryPushInterval)
id := c.watcher.Subscribe(ch)
go func() {
<-ctx.Done()
c.watcher.Unsubscribe(id)
close(ch)
}()
return ch, nil
}

func (c *Calcium) RegisterService(ctx context.Context) (unregister func(), err error) {
ctx, cancel := context.WithCancel(ctx)
serviceAddress, err := utils.GetOutboundAddress(c.config.Bind)
if err != nil {
log.Errorf("[RegisterService] failed to get outbound address: %v", err)
return
}
if err = c.store.RegisterService(ctx, serviceAddress, c.config.ServiceHeartbeatInterval); err != nil {
log.Errorf("[RegisterService] failed to register service: %v", err)
return
}

done := make(chan struct{})
go func() {
defer func() {
if err := c.store.UnregisterService(context.Background(), serviceAddress); err != nil {
log.Errorf("[RegisterService] failed to unregister service: %v", err)
}
close(done)
}()

timer := time.NewTicker(c.config.ServiceHeartbeatInterval / 2)
for {
select {
case <-timer.C:
if err := c.store.RegisterService(ctx, serviceAddress, c.config.ServiceHeartbeatInterval); err != nil {
log.Errorf("[RegisterService] failed to register service: %v", err)
}
case <-ctx.Done():
log.Infof("[RegisterService] context done: %v", ctx.Err())
return
}
}
}()
return func() {
cancel()
<-done
}, err
}
2 changes: 2 additions & 0 deletions cluster/cluster.go
Expand Up @@ -44,6 +44,8 @@ const (

// Cluster define all interface
type Cluster interface {
// meta service
WatchServiceStatus(context.Context) (<-chan types.ServiceStatus, error)
// meta networks
ListNetworks(ctx context.Context, podname string, driver string) ([]*enginetypes.Network, error)
ConnectNetwork(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error)
Expand Down
11 changes: 9 additions & 2 deletions core.go
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -89,18 +90,23 @@ func serve() {
pb.RegisterCoreRPCServer(grpcServer, vibranium)
go func() {
if err := grpcServer.Serve(s); err != nil {
log.Fatalf("start grpc failed %v", err)
log.Fatalf("[main] start grpc failed %v", err)
}
}()
if config.Profile != "" {
http.Handle("/metrics", metrics.Client.ResourceMiddleware(cluster)(promhttp.Handler()))
go func() {
if err := http.ListenAndServe(config.Profile, nil); err != nil {
log.Errorf("start http failed %v", err)
log.Errorf("[main] start http failed %v", err)
}
}()
}

unregisterService, err := cluster.RegisterService(context.Background())
if err != nil {
log.Errorf("[main] failed to register service: %v", err)
return
}
log.Info("[main] Cluster started successfully.")

// wait for unix signals and try to GracefulStop
Expand All @@ -109,6 +115,7 @@ func serve() {
sig := <-sigs
log.Infof("[main] Get signal %v.", sig)
close(rpcch)
unregisterService()
grpcServer.GracefulStop()
log.Info("[main] gRPC server gracefully stopped.")

Expand Down
1 change: 1 addition & 0 deletions core.yaml.sample
@@ -1,5 +1,6 @@
log_level: "DEBUG"
bind: ":5001"
service_address: 10.22.12.87:5001
statsd: "127.0.0.1:8125"
profile: ":12346"
global_timeout: 300s
Expand Down
17 changes: 17 additions & 0 deletions rpc/rpc.go
Expand Up @@ -39,6 +39,23 @@ func (v *Vibranium) Info(ctx context.Context, opts *pb.Empty) (*pb.CoreInfo, err
}, nil
}

// WatchServiceStatus pushes sibling services
func (v *Vibranium) WatchServiceStatus(_ *pb.Empty, stream pb.CoreRPC_WatchServiceStatusServer) (err error) {
ch, err := v.cluster.WatchServiceStatus(stream.Context())
if err != nil {
log.Errorf("[WatchServicesStatus] failed to create watch channel: %v", err)
return err
}
for status := range ch {
s := toRPCServiceStatus(status)
if err = stream.Send(s); err != nil {
v.logUnsentMessages("WatchServicesStatus", s)
return err
}
}
return nil
}

// ListNetworks list networks for pod
func (v *Vibranium) ListNetworks(ctx context.Context, opts *pb.ListNetworkOptions) (*pb.Networks, error) {
networks, err := v.cluster.ListNetworks(ctx, opts.Podname, opts.Driver)
Expand Down
8 changes: 8 additions & 0 deletions rpc/transform.go
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"bytes"
"encoding/json"
"time"

enginetypes "github.com/projecteru2/core/engine/types"
pb "github.com/projecteru2/core/rpc/gen"
Expand All @@ -12,6 +13,13 @@ import (
"golang.org/x/net/context"
)

func toRPCServiceStatus(status types.ServiceStatus) *pb.ServiceStatus {
return &pb.ServiceStatus{
Addresses: status.Addresses,
IntervalInSecond: int64(status.Interval / time.Second),
}
}

func toRPCCPUMap(m types.CPUMap) map[string]int32 {
cpu := make(map[string]int32)
for label, value := range m {
Expand Down
21 changes: 12 additions & 9 deletions types/config.go
Expand Up @@ -6,15 +6,18 @@ import (

// Config holds eru-core config
type Config struct {
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
Auth AuthConfig `yaml:"auth"` // grpc auth
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config
LogLevel string `yaml:"log_level" required:"true" default:"INFO"`
Bind string `yaml:"bind" required:"true" default:"5001"` // HTTP API address
ServiceAddress string `yaml:"service_address" required:"true"`
ServiceDiscoveryPushInterval time.Duration `yaml:"service_discovery_interval" required:"true" default:"10s"`
ServiceHeartbeatInterval time.Duration `yaml:"service_heartbeat_interval" required:"true" default:"10s"`
LockTimeout time.Duration `yaml:"lock_timeout" required:"true" default:"30s"` // timeout for lock (ttl)
GlobalTimeout time.Duration `yaml:"global_timeout" required:"true" default:"300s"` // timeout for remove, run_and_wait and build, in second
Statsd string `yaml:"statsd"` // statsd host and port
Profile string `yaml:"profile"` // profile ip:port
CertPath string `yaml:"cert_path"` // docker cert files path
Auth AuthConfig `yaml:"auth"` // grpc auth
GRPCConfig GRPCConfig `yaml:"grpc"` // grpc config

Git GitConfig `yaml:"git"`
Etcd EtcdConfig `yaml:"etcd"`
Expand Down
8 changes: 8 additions & 0 deletions types/service.go
@@ -0,0 +1,8 @@
package types

import "time"

type ServiceStatus struct {
Addresses []string
Interval time.Duration
}
19 changes: 19 additions & 0 deletions utils/service.go
@@ -0,0 +1,19 @@
package utils

import (
"fmt"
"net"
"strings"
)

func GetOutboundAddress(bind string) (string, error) {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
return "", err
}
defer conn.Close()

localAddr := conn.LocalAddr().(*net.UDPAddr)
port := strings.Split(bind, ":")[1]
return fmt.Sprintf("%s:%s", localAddr.IP, port), nil
}

0 comments on commit da4e6ef

Please sign in to comment.