Skip to content

Commit

Permalink
Merge 93d8e10 into 80f3d8b
Browse files Browse the repository at this point in the history
  • Loading branch information
cristinaleonr committed Dec 5, 2023
2 parents 80f3d8b + 93d8e10 commit 1dfee6b
Show file tree
Hide file tree
Showing 8 changed files with 489 additions and 263 deletions.
6 changes: 3 additions & 3 deletions cloudbuild/cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ options:

steps:
# Run unit tests for environment.
- name: gcr.io/$PROJECT_ID/golang-cbif:1.18
- name: gcr.io/$PROJECT_ID/golang-cbif:1.20
env:
- GONOPROXY=github.com/m-lab/go/*
args:
Expand All @@ -18,7 +18,7 @@ steps:
- go test -v ./...

# Deployment of APIs in sandbox & staging.
- name: gcr.io/$PROJECT_ID/gcloud-jsonnet-cbif:1.18
- name: gcr.io/$PROJECT_ID/gcloud-jsonnet-cbif:1.1
env:
# Use cbif condition: only run these steps in one of these projects.
- PROJECT_IN=mlab-sandbox,mlab-staging
Expand All @@ -36,7 +36,7 @@ steps:
- gcloud endpoints services deploy openapi.yaml

# Deployment of APIs in mlab-ns.
- name: gcr.io/$PROJECT_ID/gcloud-jsonnet-cbif:1.18
- name: gcr.io/$PROJECT_ID/gcloud-jsonnet-cbif:1.1
env:
# Use cbif condition: only run these steps in this project.
- PROJECT_IN=mlab-ns
Expand Down
63 changes: 63 additions & 0 deletions cmd/heartbeat/health/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package health

import (
"context"
"strings"

"cloud.google.com/go/compute/apiv1/computepb"
"github.com/googleapis/gax-go"
)

// GCPChecker queries the VM's load balancer to check its status.
type GCPChecker struct {
client GCEClient
md Metadata
}

// Metadata returns environmental metadata for a machine.
type Metadata interface {
Project() string
Backend() string
Region() string
Group() string
}

// GCEClient queries the Compute API for health updates.
type GCEClient interface {
GetHealth(context.Context, *computepb.GetHealthRegionBackendServiceRequest, ...gax.CallOption) (*computepb.BackendServiceGroupHealth, error)
}

// NewGCPChecker returns a new instance of GCPChecker.
func NewGCPChecker(c GCEClient, md Metadata) *GCPChecker {
return &GCPChecker{
client: c,
md: md,
}
}

// GetHealth contacts the GCP load balancer to get the latest VM health status
// and uses the data to generate a health score.
func (c *GCPChecker) GetHealth(ctx context.Context) float64 {
g := c.md.Group()
req := &computepb.GetHealthRegionBackendServiceRequest{
BackendService: c.md.Backend(),
Project: c.md.Project(),
Region: c.md.Region(),
ResourceGroupReferenceResource: &computepb.ResourceGroupReference{
Group: &g,
},
}
lbHealth, err := c.client.GetHealth(ctx, req)
if err != nil {
return 0
}

for _, h := range lbHealth.HealthStatus {
// The group is healthy if at least one of the instances has a 'HEALTHY' health state.
if strings.EqualFold(*h.HealthState, "HEALTHY") {
return 1
}
}

return 0
}
87 changes: 87 additions & 0 deletions cmd/heartbeat/health/gcp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package health

import (
"context"
"errors"
"testing"

"cloud.google.com/go/compute/apiv1/computepb"
"github.com/googleapis/gax-go"
"github.com/m-lab/locate/cmd/heartbeat/metadata"
)

func TestGCPChecker_GetHealth(t *testing.T) {
tests := []struct {
name string
client GCEClient
want float64
}{
{
name: "healthy",
client: &fakeGCEClient{
status: []string{"HEALTHY"},
err: false,
},
want: 1,
},
{
name: "unhealthy",
client: &fakeGCEClient{
status: []string{"UNHEALTHY"},
err: false,
},
want: 0,
},
{
name: "mix",
client: &fakeGCEClient{
status: []string{"HEALTHY", "HEALTHY", "UNHEALTHY"},
err: false,
},
want: 1,
},
{
name: "healthy-lower-case",
client: &fakeGCEClient{
status: []string{"healthy"},
err: false,
},
want: 1,
},
{
name: "error",
client: &fakeGCEClient{
err: true,
},
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := NewGCPChecker(tt.client, &metadata.GCPMetadata{})
if got := c.GetHealth(context.Background()); got != tt.want {
t.Errorf("GCPChecker.GetHealth() = %v, want %v", got, tt.want)
}
})
}
}

type fakeGCEClient struct {
status []string
err bool
}

func (c *fakeGCEClient) GetHealth(ctx context.Context, req *computepb.GetHealthRegionBackendServiceRequest, opts ...gax.CallOption) (*computepb.BackendServiceGroupHealth, error) {
if c.err {
return nil, errors.New("health error")
}

health := make([]*computepb.HealthStatus, 0)
for _, s := range c.status {
statusPtr := s
health = append(health, &computepb.HealthStatus{HealthState: &statusPtr})
}
return &computepb.BackendServiceGroupHealth{
HealthStatus: health,
}, nil
}
28 changes: 24 additions & 4 deletions cmd/heartbeat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"syscall"
"time"

compute "cloud.google.com/go/compute/apiv1"
md "cloud.google.com/go/compute/metadata"
"github.com/gorilla/websocket"
"github.com/m-lab/go/flagx"
"github.com/m-lab/go/memoryless"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/rtx"
v2 "github.com/m-lab/locate/api/v2"
"github.com/m-lab/locate/cmd/heartbeat/health"
"github.com/m-lab/locate/cmd/heartbeat/metadata"
"github.com/m-lab/locate/cmd/heartbeat/registration"
"github.com/m-lab/locate/connection"
"github.com/m-lab/locate/metrics"
Expand All @@ -31,14 +34,21 @@ var (
pod string
node string
namespace string
project string
kubernetesAuth = "/var/run/secrets/kubernetes.io/serviceaccount/"
kubernetesURL = flagx.URL{}
registrationURL = flagx.URL{}
services = flagx.KeyValueArray{}
heartbeatPeriod = static.HeartbeatPeriod
mainCtx, mainCancel = context.WithCancel(context.Background())
lbPath = "/metadata/loadbalanced"
)

// Checker generates a health score for the heartbeat instance (0, 1).
type Checker interface {
GetHealth(ctx context.Context) float64 // Health score.
}

func init() {
flag.StringVar(&heartbeatURL, "heartbeat-url", "ws://localhost:8080/v2/platform/heartbeat",
"URL for locate service")
Expand Down Expand Up @@ -78,10 +88,20 @@ func main() {
err = conn.Dial(heartbeatURL, http.Header{}, hbm)
rtx.Must(err, "failed to establish a websocket connection with %s", heartbeatURL)

_, lberr := os.ReadFile(lbPath)
probe := health.NewPortProbe(svcs)
ec := health.NewEndpointClient(static.HealthEndpointTimeout)
hc := &health.Checker{}
if kubernetesURL.URL == nil {
var hc Checker

// If the "loadbalanced" file exists, then the instance is a load balanced VM.
// If not, then it is a standalone instance.
if lberr == nil {
md, err := metadata.NewGCPMetadata(md.NewClient(http.DefaultClient), hostname)
rtx.Must(err, "failed to get VM metadata")
gceClient, err := compute.NewRegionBackendServicesRESTClient(mainCtx)
rtx.Must(err, "failed to create GCE client")
hc = health.NewGCPChecker(gceClient, md)
} else if kubernetesURL.URL == nil {
hc = health.NewChecker(probe, ec)
} else {
k8s := health.MustNewKubernetesClient(kubernetesURL.URL, pod, node, namespace, kubernetesAuth)
Expand All @@ -93,7 +113,7 @@ func main() {

// write starts a write loop to send health messages every
// HeartbeatPeriod.
func write(ws *connection.Conn, hc *health.Checker, ldr *registration.Loader) {
func write(ws *connection.Conn, hc Checker, ldr *registration.Loader) {
defer ws.Close()
hbTicker := *time.NewTicker(heartbeatPeriod)
defer hbTicker.Stop()
Expand Down Expand Up @@ -139,7 +159,7 @@ func write(ws *connection.Conn, hc *health.Checker, ldr *registration.Loader) {
}
}

func getHealth(hc *health.Checker) float64 {
func getHealth(hc Checker) float64 {
ctx, cancel := context.WithTimeout(mainCtx, heartbeatPeriod)
defer cancel()
return hc.GetHealth(ctx)
Expand Down
75 changes: 75 additions & 0 deletions cmd/heartbeat/metadata/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package metadata

import (
"fmt"
"strings"

"github.com/m-lab/go/host"
)

const groupTemplate = "https://www.googleapis.com/compute/v1/projects/%s/regions/%s/instanceGroups/%s"

// GCPMetadata contains metadata about a GCP VM.
type GCPMetadata struct {
project string
backend string
region string
group string
}

// Client uses HTTP requests to query the metadata service.
type Client interface {
ProjectID() (string, error)
Zone() (string, error)
}

// NewGCPMetadata returns a new instance of GCPMetadata.
func NewGCPMetadata(c Client, hostname string) (*GCPMetadata, error) {
h, err := host.Parse(hostname)
if err != nil {
return nil, err
}
// Backend refers to the GCP load balancer.
// Resources for a GCP load balancer all have the same name. That is,
// the VM hostname with dots turned to dashes (since GCP does not allow
// dots in names).
backend := strings.ReplaceAll(h.String(), ".", "-")

project, err := c.ProjectID()
if err != nil {
return nil, err
}

zone, err := c.Zone()
if err != nil {
return nil, err
}
region := zone[:len(zone)-2]

return &GCPMetadata{
project: project,
backend: backend,
region: region,
group: fmt.Sprintf(groupTemplate, project, region, backend),
}, nil
}

// Project ID (e.g., mlab-sandbox).
func (m *GCPMetadata) Project() string {
return m.project
}

// Backend in GCE.
func (m *GCPMetadata) Backend() string {
return m.backend
}

// Region derived from zone (e.g., us-west1).
func (m *GCPMetadata) Region() string {
return m.region
}

// Group is the the URI referencing the instance group.
func (m *GCPMetadata) Group() string {
return m.group
}
Loading

0 comments on commit 1dfee6b

Please sign in to comment.