Skip to content

Commit

Permalink
Merge 283538f into 80f3d8b
Browse files Browse the repository at this point in the history
  • Loading branch information
cristinaleonr committed Dec 4, 2023
2 parents 80f3d8b + 283538f commit b618d73
Show file tree
Hide file tree
Showing 7 changed files with 475 additions and 260 deletions.
62 changes: 62 additions & 0 deletions cmd/heartbeat/health/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package health

import (
"context"
"strings"

"cloud.google.com/go/compute/apiv1/computepb"
"github.com/googleapis/gax-go"
)

// GCPChecker queries the VM's load balancer to check its status.
type GCPChecker struct {
client GCEClient
md Metadata
}

// Metadata returns environmental metadata for a machine.
type Metadata interface {
Project() string
InstanceName() string
Region() string
Group() string
}

// GCEClient queries the Compute API for health updates.
type GCEClient interface {
GetHealth(context.Context, *computepb.GetHealthRegionBackendServiceRequest, ...gax.CallOption) (*computepb.BackendServiceGroupHealth, error)
}

// NewGCPChecker returns a new instance of GCPChecker.
func NewGCPChecker(c GCEClient, md Metadata) *GCPChecker {
return &GCPChecker{
client: c,
md: md,
}
}

// GetHealth contacts the GCP load balancer to get the latest VM health status
// and uses the data to generate a health score.
func (c *GCPChecker) GetHealth(ctx context.Context) float64 {
g := c.md.Group()
req := &computepb.GetHealthRegionBackendServiceRequest{
BackendService: c.md.InstanceName(),
Project: c.md.Project(),
Region: c.md.Region(),
ResourceGroupReferenceResource: &computepb.ResourceGroupReference{
Group: &g,
},
}
lbHealth, err := c.client.GetHealth(ctx, req)
if err != nil {
return 0
}

for _, h := range lbHealth.HealthStatus {
if !strings.EqualFold(*h.HealthState, "HEALTHY") {
return 0
}
}

return 1
}
86 changes: 86 additions & 0 deletions cmd/heartbeat/health/gcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package health

import (
"context"
"errors"
"testing"

"cloud.google.com/go/compute/apiv1/computepb"
"github.com/googleapis/gax-go"
"github.com/m-lab/locate/cmd/heartbeat/metadata"
)

func TestGCPChecker_GetHealth(t *testing.T) {
tests := []struct {
name string
client GCEClient
want float64
}{
{
name: "healthy",
client: &fakeGCEClient{
status: []string{"HEALTHY"},
err: false,
},
want: 1,
},
{
name: "unhealthy",
client: &fakeGCEClient{
status: []string{"UNHEALTHY"},
err: false,
},
want: 0,
},
{
name: "mix",
client: &fakeGCEClient{
status: []string{"UNHEALTHY", "HEALTHY"},
err: false,
},
want: 1,
},
{
name: "healthy-lower-case",
client: &fakeGCEClient{
status: []string{"healthy"},
err: false,
},
want: 1,
},
{
name: "error",
client: &fakeGCEClient{
err: true,
},
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewGCPChecker(tt.client, &metadata.GCPMetadata{})
if got := c.GetHealth(context.Background()); got != tt.want {
t.Errorf("GCPChecker.GetHealth() = %v, want %v", got, tt.want)
}
})
}
}

type fakeGCEClient struct {
status []string
err bool
}

func (c *fakeGCEClient) GetHealth(ctx context.Context, req *computepb.GetHealthRegionBackendServiceRequest, opts ...gax.CallOption) (*computepb.BackendServiceGroupHealth, error) {
if c.err {
return nil, errors.New("health error")
}

health := make([]*computepb.HealthStatus, 0)
for _, s := range c.status {
health = append(health, &computepb.HealthStatus{HealthState: &s})
}
return &computepb.BackendServiceGroupHealth{
HealthStatus: health,
}, nil
}
25 changes: 21 additions & 4 deletions cmd/heartbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"syscall"
"time"

compute "cloud.google.com/go/compute/apiv1"
md "cloud.google.com/go/compute/metadata"
"github.com/gorilla/websocket"
"github.com/m-lab/go/flagx"
"github.com/m-lab/go/memoryless"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/rtx"
v2 "github.com/m-lab/locate/api/v2"
"github.com/m-lab/locate/cmd/heartbeat/health"
"github.com/m-lab/locate/cmd/heartbeat/metadata"
"github.com/m-lab/locate/cmd/heartbeat/registration"
"github.com/m-lab/locate/connection"
"github.com/m-lab/locate/metrics"
Expand All @@ -31,14 +34,21 @@ var (
pod string
node string
namespace string
project string
kubernetesAuth = "/var/run/secrets/kubernetes.io/serviceaccount/"
kubernetesURL = flagx.URL{}
registrationURL = flagx.URL{}
services = flagx.KeyValueArray{}
heartbeatPeriod = static.HeartbeatPeriod
mainCtx, mainCancel = context.WithCancel(context.Background())
lbPath = "metadata/loadbalanced"
)

// Checker generates a health score for the heartbeat instance (0, 1).
type Checker interface {
GetHealth(ctx context.Context) float64 // Health score.
}

func init() {
flag.StringVar(&heartbeatURL, "heartbeat-url", "ws://localhost:8080/v2/platform/heartbeat",
"URL for locate service")
Expand Down Expand Up @@ -78,10 +88,17 @@ func main() {
err = conn.Dial(heartbeatURL, http.Header{}, hbm)
rtx.Must(err, "failed to establish a websocket connection with %s", heartbeatURL)

_, lberr := os.ReadFile(lbPath)
probe := health.NewPortProbe(svcs)
ec := health.NewEndpointClient(static.HealthEndpointTimeout)
hc := &health.Checker{}
if kubernetesURL.URL == nil {
var hc Checker
if lberr == nil { // Check if "loadbalanced" file exists.
md, err := metadata.NewGCPMetadata(md.NewClient(http.DefaultClient), hostname)
rtx.Must(err, "failed to get VM metadata")
gceClient, err := compute.NewRegionBackendServicesRESTClient(mainCtx)
rtx.Must(err, "failed to create GCE client")
hc = health.NewGCPChecker(gceClient, md)
} else if kubernetesURL.URL == nil {
hc = health.NewChecker(probe, ec)
} else {
k8s := health.MustNewKubernetesClient(kubernetesURL.URL, pod, node, namespace, kubernetesAuth)
Expand All @@ -93,7 +110,7 @@ func main() {

// write starts a write loop to send health messages every
// HeartbeatPeriod.
func write(ws *connection.Conn, hc *health.Checker, ldr *registration.Loader) {
func write(ws *connection.Conn, hc Checker, ldr *registration.Loader) {
defer ws.Close()
hbTicker := *time.NewTicker(heartbeatPeriod)
defer hbTicker.Stop()
Expand Down Expand Up @@ -139,7 +156,7 @@ func write(ws *connection.Conn, hc *health.Checker, ldr *registration.Loader) {
}
}

func getHealth(hc *health.Checker) float64 {
func getHealth(hc Checker) float64 {
ctx, cancel := context.WithTimeout(mainCtx, heartbeatPeriod)
defer cancel()
return hc.GetHealth(ctx)
Expand Down
71 changes: 71 additions & 0 deletions cmd/heartbeat/metadata/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package metadata

import (
"fmt"
"strings"

"github.com/m-lab/go/host"
)

const groupTemplate = "https://www.googleapis.com/compute/v1/projects/%s/regions/%s/instanceGroups/%s"

// GCPMetadata contains metadata about a GCP VM.
type GCPMetadata struct {
project string
backend string
region string
group string
}

// Client uses HTTP requests to query the metadata service.
type Client interface {
ProjectID() (string, error)
Zone() (string, error)
}

// NewGCPMetadata returns a new instance of GCPMetadata.
func NewGCPMetadata(c Client, hostname string) (*GCPMetadata, error) {
h, err := host.Parse(hostname)
if err != nil {
return nil, err
}
backend := strings.ReplaceAll(h.String(), ".", "-")

project, err := c.ProjectID()
if err != nil {
return nil, err
}

zone, err := c.Zone()
if err != nil {
return nil, err
}
region := zone[:len(zone)-2]

return &GCPMetadata{
project: project,
backend: backend,
region: region,
group: fmt.Sprintf(groupTemplate, project, region, backend),
}, nil
}

// Project ID (e.g., mlab-sandbox).
func (m *GCPMetadata) Project() string {
return m.project
}

// InstanceName in GCE.
func (m *GCPMetadata) InstanceName() string {
return m.backend
}

// Region derived from zone (e.g., us-west1).
func (m *GCPMetadata) Region() string {
return m.region
}

// Group is the the URI referencing the instance group.
func (m *GCPMetadata) Group() string {
return m.group
}
Loading

0 comments on commit b618d73

Please sign in to comment.