diff --git a/.gitignore b/.gitignore index a7747886..426601d3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /build +.DS_Store diff --git a/Makefile b/Makefile index c72305bf..b001f905 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ GO_DOCKER_RUN = docker run --rm -v $(shell pwd)/cmd/sync:/go/src/github.com/nginxinc/nginx-asg-sync/cmd/sync -v $(shell pwd)/build:/build -w /go/src/github.com/nginxinc/nginx-asg-sync/cmd/sync -GOLANG_CONTAINER = golang:1.8 +GOLANG_CONTAINER = golang:1.10 all: amazon centos7 ubuntu-trusty ubuntu-xenial diff --git a/README.md b/README.md index 28bfb600..9df8e52b 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,34 @@ # NGINX Plus Integration with AWS Auto Scaling groups -- nginx-asg-sync -**nginx-asg-sync** allows [NGINX Plus](https://www.nginx.com/products/) to support scaling when load balancing [AWS Auto Scaling groups](http://docs.aws.amazon.com/autoscaling/latest/userguide/WhatIsAutoScaling.html): when the number of instances in an Auto Scaling group changes, nginx-asg-sync adds the new instances to the NGINX Plus configuration and removes the terminated ones. +**nginx-asg-sync** allows [NGINX Plus](https://www.nginx.com/products/) to discover instances of [AWS Auto Scaling groups](http://docs.aws.amazon.com/autoscaling/latest/userguide/WhatIsAutoScaling.html). When the number of instances in an Auto Scaling group changes, nginx-asg-sync adds the new instances to the NGINX Plus configuration and removes the terminated ones. -More details on this solution are available in the blog post [Load Balancing AWS Auto Scaling Groups with NGINX Plus](https://www.nginx.com/blog/load-balancing-aws-auto-scaling-groups-nginx-plus/). +## How It Works +nginx-asg-sync must be installed on the same EC2 instance with NGINX Plus. nginx-asg-sync constantly monitors backend Auto Scaling groups via the AWS Auto Scaling API. +When it sees that a scaling event has happened, it adds or removes the corresponding backend instances from the NGINX Plus configuration via the NGINX Plus API. -Below you will find instructions on how to use nginx-asg-sync. +**Note:** nginx-asg-sync does not scale Auto Scaling groups, it only gets the IP addresses of the instances of Auto Scaling groups. -## Contents +In the example below, NGINX Plus is configured to load balance among the instances of two Auto Scaling groups -- Backend One and Backend Two. +nginx-asg-sync, running on the same instance as NGINX Plus, ensures that whenever you scale the Auto Scaling groups, the corresponding instances are added (or removed) from the NGINX Plus configuration. -1. [Supported Operating Systems](#supported-operating-systems) -1. [Setting up Access to the AWS API](#setting-up-access-to-the-aws-api) -1. [Installation](#installation) -1. [Configuration](#configuration) -1. [Usage](#usage) -1. [Troubleshooting](#troubleshooting) -1. [Building a Software Package](#building-a-software-package) -1. [Support](#support) +![nginx-asg-sync-architecture](https://cdn-1.wp.nginx.com/wp-content/uploads/2017/03/aws-auto-scaling-group-asg-sync.png) + +Below you will find documentation on how to use nginx-asg-sync. + +## Documentation +**Note:** the documentation for **the latest stable release** is available via a link in the description of the release. See the [releases page](https://github.com/nginxinc/nginx-asg-sync/releases). + +**Contents:** +- [Supported Operating Systems](#supported-operating-systems) +- [Setting up Access to the AWS API](#setting-up-access-to-the-aws-api) +- [Installation](#installation) +- [Configuration](#configuration) + - [NGINX Plus Configuration](#nginx-plus-configuration) + - [nginx-asg-sync Configuration](#nginx-asg-sync-configuration) +- [Usage](#usage) +- [Troubleshooting](#troubleshooting) +- [Building a Software Package](#building-a-software-package) +- [Support](#support) ## Supported Operating Systems @@ -34,18 +47,14 @@ nginx-asg-sync uses the AWS API to get the list of IP addresses of the instances 1. [Create an IAM role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) and attach the predefined `AmazonEC2ReadOnlyAccess` policy to it. This policy allows read-only access to EC2 APIs. 1. When you launch the NGINX Plus instance, add this IAM role to the instance. -Alternatively, you can use the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environmental variables to provide credentials to nginx-asg-sync. - ## Installation -To install nginx-asg-sync: - -1. Download a software package for your OS with the latest version of nginx-asg-sync from the [Releases page](https://github.com/nginxinc/nginx-asg-sync/releases). -1. Install the package: - - For Amazon Linux or CentOS/RHEL, run: `$ sudo rpm -i .rpm` - - For Ubuntu, run: `$ sudo dpkg -i .deb` +1. Get a software package for your OS: + * For a stable release, download a package from the [releases page](https://github.com/nginxinc/nginx-asg-sync/releases). + * For the latest source code from the master branch, build a software package by following [these instructions](#building-a-software-package). +2. Install the package: + * For Amazon Linux or CentOS/RHEL, run: `$ sudo rpm -i .rpm` + * For Ubuntu, run: `$ sudo dpkg -i .deb` ## Configuration @@ -54,6 +63,8 @@ As an example, we configure NGINX Plus to load balance two AWS Auto Scaling grou * Requests for /backend-one go to Backend One group. * Requests for /backend-two go to Backend Two group. +This example corresponds to [the diagram](#how-it-works) at the top of this README. + ### NGINX Plus Configuration ```nginx @@ -86,31 +97,20 @@ server { server { listen 8080; - root /usr/share/nginx/html; - - location = / { - return 302 /status.html; - } - - location = /status.html { - } - - location /status { - access_log off; - status; + location /api { + api write=on; } - location /upstream_conf { - upstream_conf; + location /dashboard.html { + root /usr/share/nginx/html; } } ``` * We declare two upstream groups – **backend-one** and **backend-two**, which correspond to our Auto Scaling groups. However, we do not add any servers to the upstream groups, because the servers will be added by nginx-aws-sync. The `state` directive names the file where the dynamically configurable list of servers is stored, enabling it to persist across restarts of NGINX Plus. * We define a virtual server that listens on port 80. NGINX Plus passes requests for **/backend-one** to the instances of the Backend One group, and requests for **/backend-two** to the instances of the Backend Two group. -* We define a second virtual server listening on port 8080 and configure the NGINX Plus APIs on it, which are required by nginx-asg-sync: - * The on-the-fly API is available at **127.0.0.1:8080/upstream_conf** - * The status API is available at **127.0.0.1:8080/status** +* We define a second virtual server listening on port 8080 and configure the NGINX Plus API on it, which is required by nginx-asg-sync: + * The API is available at **127.0.0.1:8080/api** ### nginx-asg-sync Configuration @@ -118,8 +118,7 @@ nginx-asg-sync is configured in the file **aws.yaml** in the **/etc/nginx** fold ```yaml region: us-west-2 -upstream_conf_endpoint: http://127.0.0.1:8080/upstream_conf -status_endpoint: http://127.0.0.1:8080/status +api_endpoint: http://127.0.0.1:8080/api sync_interval_in_seconds: 5 upstreams: - name: backend-one @@ -133,13 +132,13 @@ upstreams: ``` * The `region` key defines the AWS region where we deploy NGINX Plus and the Auto Scaling groups. -* The `upstream_conf` and the `status_endpoint` keys define the NGINX Plus API endpoints. +* The `api_endpoint` key defines the NGINX Plus API endpoint. * The `sync_interval_in_seconds` key defines the synchronization interval: nginx-asg-sync checks for scaling updates every 5 seconds. * The `upstreams` key defines the list of upstream groups. For each upstream group we specify: * `name` – The name we specified for the upstream block in the NGINX Plus configuration. * `autoscaling_group` – The name of the corresponding Auto Scaling group. * `port` – The port on which our backend applications are exposed. - * `protocol` – The protocol of the traffic NGINX Plus load balances to the backend application, here `http`. If the application uses TCP/UDP, specify `stream` instead. + * `kind` – The protocol of the traffic NGINX Plus load balances to the backend application, here `http`. If the application uses TCP/UDP, specify `stream` instead. ## Usage diff --git a/cmd/sync/config.go b/cmd/sync/config.go index 492cf550..434499dc 100644 --- a/cmd/sync/config.go +++ b/cmd/sync/config.go @@ -9,8 +9,7 @@ import ( type config struct { Region string - UpstreamConfEndpont string `yaml:"upstream_conf_endpoint"` - StatusEndpoint string `yaml:"status_endpoint"` + APIEndpoint string `yaml:"api_endpoint"` SyncIntervalInSeconds time.Duration `yaml:"sync_interval_in_seconds"` Upstreams []upstream } @@ -58,11 +57,8 @@ func validateConfig(cfg *config) error { if cfg.Region == "" { return fmt.Errorf(errorMsgFormat, "region") } - if cfg.UpstreamConfEndpont == "" { - return fmt.Errorf(errorMsgFormat, "upstream_conf_endpoint") - } - if cfg.StatusEndpoint == "" { - return fmt.Errorf(errorMsgFormat, "status_endpoint") + if cfg.APIEndpoint == "" { + return fmt.Errorf(errorMsgFormat, "api_endpoint") } if cfg.SyncIntervalInSeconds == 0 { return fmt.Errorf(intervalErrorMsg) diff --git a/cmd/sync/config_test.go b/cmd/sync/config_test.go index 81dbdbb5..56f64731 100644 --- a/cmd/sync/config_test.go +++ b/cmd/sync/config_test.go @@ -3,8 +3,7 @@ package main import "testing" var validYaml = []byte(`region: us-west-2 -upstream_conf_endpoint: http://127.0.0.1:8080/upstream_conf -status_endpoint: http://127.0.0.1:8080/status +api_endpoint: http://127.0.0.1:8080/api sync_interval_in_seconds: 5 upstreams: - name: backend1 @@ -33,8 +32,7 @@ func getValidConfig() *config { } cfg := config{ Region: "us-west-2", - UpstreamConfEndpont: "http://127.0.0.1:8080/upstream_conf", - StatusEndpoint: "http://127.0.0.1:8080/status", + APIEndpoint: "http://127.0.0.1:8080/api", SyncIntervalInSeconds: 1, Upstreams: upstreams, } @@ -49,13 +47,9 @@ func getInvalidConfigInput() []*testInput { invalidRegionCfg.Region = "" input = append(input, &testInput{invalidRegionCfg, "invalid region"}) - invalidUpstreamConfEndponCfg := getValidConfig() - invalidUpstreamConfEndponCfg.UpstreamConfEndpont = "" - input = append(input, &testInput{invalidUpstreamConfEndponCfg, "invalid upstream_conf_endpoint"}) - - invalidStatusEndpointCfg := getValidConfig() - invalidStatusEndpointCfg.StatusEndpoint = "" - input = append(input, &testInput{invalidStatusEndpointCfg, "invalid status_endpoint"}) + invalidAPIEndpointCfg := getValidConfig() + invalidAPIEndpointCfg.APIEndpoint = "" + input = append(input, &testInput{invalidAPIEndpointCfg, "invalid api_endpoint"}) invalidSyncIntervalInSecondsCfg := getValidConfig() invalidSyncIntervalInSecondsCfg.SyncIntervalInSeconds = 0 diff --git a/cmd/sync/main.go b/cmd/sync/main.go index 4617aaf3..2f349661 100644 --- a/cmd/sync/main.go +++ b/cmd/sync/main.go @@ -50,7 +50,9 @@ func main() { os.Exit(10) } - nginx, err := NewNginxClient(cfg.UpstreamConfEndpont, cfg.StatusEndpoint, connTimeoutInSecs*time.Second) + httpClient := &http.Client{Timeout: connTimeoutInSecs * time.Second} + nginx, err := NewNginxClient(httpClient, cfg.APIEndpoint) + if err != nil { log.Printf("Couldn't create NGINX client: %v", err) os.Exit(10) @@ -60,10 +62,11 @@ func main() { for _, ups := range cfg.Upstreams { if ups.Kind == "http" { - err = nginx.CheckIfHTTPUpstreamExists(ups.Name) + err = nginx.CheckIfUpstreamExists(ups.Name) } else { err = nginx.CheckIfStreamUpstreamExists(ups.Name) } + if err != nil { log.Printf("Problem with the NGINX configuration: %v", err) os.Exit(10) @@ -81,33 +84,53 @@ func main() { signal.Notify(sigterm, syscall.SIGTERM) for { - for _, ups := range cfg.Upstreams { - ips, err := awsClient.GetPrivateIPsOfInstancesOfAutoscalingGroup(ups.AutoscalingGroup) + for _, upstream := range cfg.Upstreams { + ips, err := awsClient.GetPrivateIPsOfInstancesOfAutoscalingGroup(upstream.AutoscalingGroup) if err != nil { - log.Printf("Couldn't get the IP addresses of instances of the Auto Scaling group %v: %v", ups.AutoscalingGroup, err) + log.Printf("Couldn't get the IP addresses of instances of the Auto Scaling group %v: %v", upstream.AutoscalingGroup, err) continue } - var backends []string - for _, ip := range ips { - backend := fmt.Sprintf("%v:%v", ip, ups.Port) - backends = append(backends, backend) - } - - var added, removed []string - - if ups.Kind == "http" { - added, removed, err = nginx.UpdateHTTPServers(ups.Name, backends) + if upstream.Kind == "http" { + var upsServers []UpstreamServer + for _, ip := range ips { + backend := fmt.Sprintf("%v:%v", ip, upstream.Port) + upsServers = append(upsServers, UpstreamServer{ + Server: backend, + MaxFails: 1, + }) + } + + added, removed, err := nginx.UpdateHTTPServers(upstream.Name, upsServers) + if err != nil { + log.Printf("Couldn't update HTTP servers in NGINX: %v", err) + continue + } + + if len(added) > 0 || len(removed) > 0 { + log.Printf("Updated HTTP servers of %v; Added: %v, Removed: %v", upstream, added, removed) + } } else { - added, removed, err = nginx.UpdateStreamServers(ups.Name, backends) - } - if err != nil { - log.Printf("Couldn't update servers in NGINX: %v", err) - continue - } - if len(removed) > 0 || len(added) > 0 { - log.Printf("Upstream: %v has been updated; Added: %v; Removed: %v\n", ups.Name, added, removed) + var upsServers []StreamUpstreamServer + for _, ip := range ips { + backend := fmt.Sprintf("%v:%v", ip, upstream.Port) + upsServers = append(upsServers, StreamUpstreamServer{ + Server: backend, + MaxFails: 1, + }) + } + + added, removed, err := nginx.UpdateStreamServers(upstream.Name, upsServers) + if err != nil { + log.Printf("Couldn't update Steam servers in NGINX: %v", err) + continue + } + + if len(added) > 0 || len(removed) > 0 { + log.Printf("Updated Stream servers of %v; Added: %v, Removed: %v", upstream, added, removed) + } } + } select { diff --git a/cmd/sync/nginx.go b/cmd/sync/nginx.go index 822737fb..4d0b3fbe 100644 --- a/cmd/sync/nginx.go +++ b/cmd/sync/nginx.go @@ -1,266 +1,554 @@ package main import ( + "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" - "time" ) -// NginxClient lets you update HTTP/Stream servers in NGINX Plus via its upstream_conf API +// Client version - 75018e7 + +// APIVersion is a version of NGINX Plus API. +const APIVersion = 2 + +// NginxClient lets you access NGINX Plus API. type NginxClient struct { - httpClient *Client - streamClient *Client + apiEndpoint string + httpClient *http.Client } -// NewNginxClient creates an NginxClient. -func NewNginxClient(upstreamConfEndpoint string, statusEndpoint string, timeout time.Duration) (*NginxClient, error) { - httpClient, err := NewHTTPClient(upstreamConfEndpoint, statusEndpoint, timeout) - if err != nil { - return nil, err - } +type versions []int - streamClient, err := NewStreamClient(upstreamConfEndpoint, statusEndpoint, timeout) - if err != nil { - return nil, err - } +// UpstreamServer lets you configure HTTP upstreams. +type UpstreamServer struct { + ID int `json:"id,omitempty"` + Server string `json:"server"` + MaxFails int `json:"max_fails"` + FailTimeout string `json:"fail_timeout,omitempty"` + SlowStart string `json:"slow_start,omitempty"` +} - return &NginxClient{httpClient, streamClient}, nil +// StreamUpstreamServer lets you configure Stream upstreams. +type StreamUpstreamServer struct { + ID int `json:"id,omitempty"` + Server string `json:"server"` + MaxFails int `json:"max_fails"` + FailTimeout string `json:"fail_timeout,omitempty"` + SlowStart string `json:"slow_start,omitempty"` +} +type apiErrorResponse struct { + Path string + Method string + Error apiError + RequestID string `json:"request_id"` + Href string } -// CheckIfHTTPUpstreamExists checks if the HTTP upstream exists in NGINX. If the upstream doesn't exist, it returns an error. -func (client *NginxClient) CheckIfHTTPUpstreamExists(upstream string) error { - return client.httpClient.CheckIfUpstreamExists(upstream) +func (resp *apiErrorResponse) toString() string { + return fmt.Sprintf("path=%v; method=%v; error.status=%v; error.text=%v; error.code=%v; request_id=%v; href=%v", + resp.Path, resp.Method, resp.Error.Status, resp.Error.Text, resp.Error.Code, resp.RequestID, resp.Href) } -// CheckIfStreamUpstreamExists checks if the Stream upstream exists in NGINX. If the upstream doesn't exist, it returns an error. -func (client *NginxClient) CheckIfStreamUpstreamExists(upstream string) error { - return client.streamClient.CheckIfUpstreamExists(upstream) +type apiError struct { + Status int + Text string + Code string } -// UpdateHTTPServers updates the servers of the HTTP upstream. -// Servers that are in the slice, but don't exist in NGINX will be added to NGINX. -// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. -func (client *NginxClient) UpdateHTTPServers(upstream string, servers []string) ([]string, []string, error) { - return client.httpClient.UpdateServers(upstream, servers) +// Stats represents NGINX Plus stats fetched from the NGINX Plus API. +// https://nginx.org/en/docs/http/ngx_http_api_module.html +type Stats struct { + Connections Connections + HTTPRequests HTTPRequests + SSL SSL + ServerZones ServerZones + Upstreams Upstreams } -// UpdateStreamServers updates the servers of the Stream upstream. -// Servers that are in the slice, but don't exist in NGINX will be added to NGINX. -// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. -func (client *NginxClient) UpdateStreamServers(upstream string, servers []string) ([]string, []string, error) { - return client.streamClient.UpdateServers(upstream, servers) +// Connections represents connection related stats. +type Connections struct { + Accepted uint64 + Dropped uint64 + Active uint64 + Idle uint64 +} + +// HTTPRequests represents HTTP request related stats. +type HTTPRequests struct { + Total uint64 + Current uint64 } -// Client lets you add/remove servers to/from NGINX Plus via its upstream_conf API -type Client struct { - upstreamConfEndpoint string - statusEndpoint string - httpClient *http.Client +// SSL represents SSL related stats. +type SSL struct { + Handshakes uint64 + HandshakesFailed uint64 `json:"handshakes_failed"` + SessionReuses uint64 `json:"session_reuses"` } -type peers struct { - Peers []peer +// ServerZones is map of server zone stats by zone name +type ServerZones map[string]ServerZone + +// ServerZone represents server zone related stats. +type ServerZone struct { + Processing uint64 + Requests uint64 + Responses Responses + Discarded uint64 + Received uint64 + Sent uint64 } -type peer struct { - ID int - Server string +// Responses represents HTTP reponse related stats. +type Responses struct { + Responses1xx uint64 `json:"1xx"` + Responses2xx uint64 `json:"2xx"` + Responses3xx uint64 `json:"3xx"` + Responses4xx uint64 `json:"4xx"` + Responses5xx uint64 `json:"5xx"` + Total uint64 } -// NewHTTPClient creates a new HTTP client. -func NewHTTPClient(upstreamConfEndpoint string, statusEndpoint string, timeout time.Duration) (*Client, error) { - httpClient := &http.Client{Timeout: timeout} +// Upstreams is a map of upstream stats by upstream name. +type Upstreams map[string]Upstream + +// Upstream represents upstream related stats. +type Upstream struct { + Peers []Peer + Keepalives int + Zombies int + Zone string + Queue Queue +} + +// Queue represents queue related stats for an upstream. +type Queue struct { + Size int + MaxSize int `json:"max_size"` + Overflows uint64 +} + +// Peer represents peer (upstream server) related stats. +type Peer struct { + ID int + Server string + Service string + Name string + Backup bool + Weight int + State string + Active uint64 + MaxConns int `json:"max_conns"` + Requests uint64 + Responses Responses + Sent uint64 + Received uint64 + Fails uint64 + Unavail uint64 + HealthChecks HealthChecks `json:"health_checks"` + Downtime uint64 + Downstart string + Selected string + HeaderTime uint64 `json:"header_time"` + ResponseTime uint64 `json:"response_time"` +} + +// HealthChecks represents health check related stats for a peer. +type HealthChecks struct { + Checks uint64 + Fails uint64 + Unhealthy uint64 + LastPassed bool `json:"last_passed"` +} + +// NewNginxClient creates an NginxClient. +func NewNginxClient(httpClient *http.Client, apiEndpoint string) (*NginxClient, error) { + versions, err := getAPIVersions(httpClient, apiEndpoint) - err := checkIfUpstreamConfIsAccessible(httpClient, upstreamConfEndpoint) if err != nil { - return nil, err + return nil, fmt.Errorf("error accessing the API: %v", err) } - err = checkIfStatusIsAccessible(httpClient, statusEndpoint) - if err != nil { - return nil, err + found := false + for _, v := range *versions { + if v == APIVersion { + found = true + break + } + } + + if !found { + return nil, fmt.Errorf("API version %v of the client is not supported by API versions of NGINX Plus: %v", APIVersion, *versions) } - client := &Client{upstreamConfEndpoint: upstreamConfEndpoint + "?", statusEndpoint: statusEndpoint, httpClient: httpClient} - return client, nil + return &NginxClient{ + apiEndpoint: apiEndpoint, + httpClient: httpClient, + }, nil } -// NewStreamClient creates a new Stream client. -func NewStreamClient(upstreamConfEndpoint string, statusEndpoint string, timeout time.Duration) (*Client, error) { - httpClient := &http.Client{Timeout: timeout} +func getAPIVersions(httpClient *http.Client, endpoint string) (*versions, error) { + resp, err := httpClient.Get(endpoint) + if err != nil { + return nil, fmt.Errorf("%v is not accessible: %v", endpoint, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("%v is not accessible: expected %v response, got %v", endpoint, http.StatusOK, resp.StatusCode) + } - err := checkIfUpstreamConfIsAccessible(httpClient, upstreamConfEndpoint) + body, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, err + return nil, fmt.Errorf("error while reading body of the response: %v", err) } - err = checkIfStatusIsAccessible(httpClient, statusEndpoint) + var vers versions + err = json.Unmarshal(body, &vers) if err != nil { - return nil, err + return nil, fmt.Errorf("error unmarshalling versions, got %q response: %v", string(body), err) } - client := &Client{upstreamConfEndpoint: upstreamConfEndpoint + "?stream=", statusEndpoint: statusEndpoint + "/stream", httpClient: httpClient} - return client, nil + return &vers, nil } -func checkIfUpstreamConfIsAccessible(httpClient *http.Client, endpoint string) error { - resp, err := httpClient.Get(endpoint) +func createResponseMismatchError(respBody io.ReadCloser, mainErr error) error { + apiErr, err := readAPIErrorResponse(respBody) if err != nil { - return fmt.Errorf("upstream_conf endpoint %v is not accessible: %v", endpoint, err) + return fmt.Errorf("%v; failed to read the response body: %v", mainErr, err) } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + return fmt.Errorf("%v; error: %v", mainErr, apiErr.toString()) +} + +func readAPIErrorResponse(respBody io.ReadCloser) (*apiErrorResponse, error) { + body, err := ioutil.ReadAll(respBody) if err != nil { - return fmt.Errorf("upstream_conf endpoint %v is not accessible: %v", endpoint, err) + return nil, fmt.Errorf("failed to read the response body: %v", err) } - if resp.StatusCode != http.StatusNotFound && resp.StatusCode != http.StatusBadRequest { - return fmt.Errorf("upstream_conf endpoint %v is not accessible: expected 404 or 400 response, got %v", endpoint, resp.StatusCode) + var apiErr apiErrorResponse + err = json.Unmarshal(body, &apiErr) + if err != nil { + return nil, fmt.Errorf("error unmarshalling apiErrorResponse: got %q response: %v", string(body), err) } - bodyStr := string(body) - expected := "missing \"upstream\" argument\n" - if bodyStr != expected { - return fmt.Errorf("upstream_conf endpoint %v is not accessible: expected %q body, got %q", endpoint, expected, bodyStr) + return &apiErr, nil +} + +// CheckIfUpstreamExists checks if the upstream exists in NGINX. If the upstream doesn't exist, it returns the error. +func (client *NginxClient) CheckIfUpstreamExists(upstream string) error { + _, err := client.GetHTTPServers(upstream) + return err +} + +// GetHTTPServers returns the servers of the upstream from NGINX. +func (client *NginxClient) GetHTTPServers(upstream string) ([]UpstreamServer, error) { + path := fmt.Sprintf("http/upstreams/%v/servers", upstream) + + var servers []UpstreamServer + err := client.get(path, &servers) + + if err != nil { + return nil, fmt.Errorf("failed to get the HTTP servers of upstream %v: %v", upstream, err) + } + + return servers, nil +} + +// AddHTTPServer adds the server to the upstream. +func (client *NginxClient) AddHTTPServer(upstream string, server UpstreamServer) error { + id, err := client.getIDOfHTTPServer(upstream, server.Server) + + if err != nil { + return fmt.Errorf("failed to add %v server to %v upstream: %v", server.Server, upstream, err) + } + if id != -1 { + return fmt.Errorf("failed to add %v server to %v upstream: server already exists", server.Server, upstream) + } + + path := fmt.Sprintf("http/upstreams/%v/servers/", upstream) + err = client.post(path, &server) + if err != nil { + return fmt.Errorf("failed to add %v server to %v upstream: %v", server.Server, upstream, err) } return nil } -func checkIfStatusIsAccessible(httpClient *http.Client, endpoint string) error { - resp, err := httpClient.Get(endpoint) +// DeleteHTTPServer the server from the upstream. +func (client *NginxClient) DeleteHTTPServer(upstream string, server string) error { + id, err := client.getIDOfHTTPServer(upstream, server) if err != nil { - return fmt.Errorf("status endpoint is %v not accessible: %v", endpoint, err) + return fmt.Errorf("failed to remove %v server from %v upstream: %v", server, upstream, err) + } + if id == -1 { + return fmt.Errorf("failed to remove %v server from %v upstream: server doesn't exist", server, upstream) } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("status endpoint is %v not accessible: expected 200 response, got %v", endpoint, resp.StatusCode) + path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, id) + err = client.delete(path) + + if err != nil { + return fmt.Errorf("failed to remove %v server from %v upstream: %v", server, upstream, err) } return nil } -// CheckIfUpstreamExists checks if the upstream exists in NGINX. If the upstream doesn't exist, it returns an error. -func (client *Client) CheckIfUpstreamExists(upstream string) error { - _, err := client.getUpstreamPeers(upstream) - return err +// UpdateHTTPServers updates the servers of the upstream. +// Servers that are in the slice, but don't exist in NGINX will be added to NGINX. +// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. +func (client *NginxClient) UpdateHTTPServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, []UpstreamServer, error) { + serversInNginx, err := client.GetHTTPServers(upstream) + if err != nil { + return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + } + + toAdd, toDelete := determineUpdates(servers, serversInNginx) + + for _, server := range toAdd { + err := client.AddHTTPServer(upstream, server) + if err != nil { + return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + } + } + + for _, server := range toDelete { + err := client.DeleteHTTPServer(upstream, server.Server) + if err != nil { + return nil, nil, fmt.Errorf("failed to update servers of %v upstream: %v", upstream, err) + } + } + + return toAdd, toDelete, nil +} + +func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer) { + for _, server := range updatedServers { + found := false + for _, serverNGX := range nginxServers { + if server.Server == serverNGX.Server { + found = true + break + } + } + if !found { + toAdd = append(toAdd, server) + } + } + + for _, serverNGX := range nginxServers { + found := false + for _, server := range updatedServers { + if serverNGX.Server == server.Server { + found = true + break + } + } + if !found { + toRemove = append(toRemove, serverNGX) + } + } + + return } -func (client *Client) getUpstreamPeers(upstream string) (*peers, error) { - request := fmt.Sprintf("%v/upstreams/%v", client.statusEndpoint, upstream) +func (client *NginxClient) getIDOfHTTPServer(upstream string, name string) (int, error) { + servers, err := client.GetHTTPServers(upstream) + if err != nil { + return -1, fmt.Errorf("error getting id of server %v of upstream %v: %v", name, upstream, err) + } - resp, err := client.httpClient.Get(request) + for _, s := range servers { + if s.Server == name { + return s.ID, nil + } + } + + return -1, nil +} + +func (client *NginxClient) get(path string, data interface{}) error { + url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, APIVersion, path) + resp, err := client.httpClient.Get(url) if err != nil { - return nil, fmt.Errorf("Failed to connect to the status api to get upstream %v info: %v", upstream, err) + return fmt.Errorf("failed to get %v: %v", path, err) } defer resp.Body.Close() - - if resp.StatusCode == http.StatusNotFound { - return nil, fmt.Errorf("Upstream %v is not found", upstream) + if resp.StatusCode != http.StatusOK { + mainErr := fmt.Errorf("expected %v response, got %v", http.StatusOK, resp.StatusCode) + return createResponseMismatchError(resp.Body, mainErr) } body, err := ioutil.ReadAll(resp.Body) if err != nil { - return nil, fmt.Errorf("Failed to read the response body with upstream %v info: %v", upstream, err) + return fmt.Errorf("failed to read the response body: %v", err) } - var prs peers - err = json.Unmarshal(body, &prs) + + err = json.Unmarshal(body, data) if err != nil { - return nil, fmt.Errorf("Error unmarshaling upstream %v: got %q response: %v", upstream, string(body), err) + return fmt.Errorf("error unmarshaling response %q: %v", string(body), err) } - - return &prs, nil + return nil } -// AddServer adds the server to the upstream. -func (client *Client) AddServer(upstream string, server string) error { - id, err := client.getIDOfServer(upstream, server) +func (client *NginxClient) post(path string, input interface{}) error { + url := fmt.Sprintf("%v/%v/%v", client.apiEndpoint, APIVersion, path) + jsonInput, err := json.Marshal(input) if err != nil { - return fmt.Errorf("Failed to add %v server to %v upstream: %v", server, upstream, err) + return fmt.Errorf("failed to marshall input: %v", err) } - if id != -1 { - return fmt.Errorf("Failed to add %v server to %v upstream: server already exists", server, upstream) + + resp, err := client.httpClient.Post(url, "application/json", bytes.NewBuffer(jsonInput)) + if err != nil { + return fmt.Errorf("failed to post %v: %v", path, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusCreated { + mainErr := fmt.Errorf("expected %v response, got %v", http.StatusCreated, resp.StatusCode) + return createResponseMismatchError(resp.Body, mainErr) } - request := fmt.Sprintf("%v&upstream=%v&add=&server=%v", client.upstreamConfEndpoint, upstream, server) + return nil +} + +func (client *NginxClient) delete(path string) error { + path = fmt.Sprintf("%v/%v/%v/", client.apiEndpoint, APIVersion, path) + + req, err := http.NewRequest(http.MethodDelete, path, nil) + if err != nil { + return fmt.Errorf("failed to create a delete request: %v", err) + } - resp, err := client.httpClient.Get(request) + resp, err := client.httpClient.Do(req) if err != nil { - return fmt.Errorf("Failed to add %v server to %v upstream: %v", server, upstream, err) + return fmt.Errorf("failed to create delete request: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("Failed to add %v server to %v upstream: expected 200 response, got %v", server, upstream, resp.StatusCode) + mainErr := fmt.Errorf("failed to complete delete request: expected %v response, got %v", + http.StatusOK, resp.StatusCode) + return createResponseMismatchError(resp.Body, mainErr) } - return nil } -// DeleteServer the server from the upstream. -func (client *Client) DeleteServer(upstream string, server string) error { - id, err := client.getIDOfServer(upstream, server) +// CheckIfStreamUpstreamExists checks if the stream upstream exists in NGINX. If the upstream doesn't exist, it returns the error. +func (client *NginxClient) CheckIfStreamUpstreamExists(upstream string) error { + _, err := client.GetStreamServers(upstream) + return err +} + +// GetStreamServers returns the stream servers of the upstream from NGINX. +func (client *NginxClient) GetStreamServers(upstream string) ([]StreamUpstreamServer, error) { + path := fmt.Sprintf("stream/upstreams/%v/servers", upstream) + + var servers []StreamUpstreamServer + err := client.get(path, &servers) + if err != nil { - return fmt.Errorf("Failed to remove %v server from %v upstream: %v", server, upstream, err) + return nil, fmt.Errorf("failed to get stream servers of upstream server %v: %v", upstream, err) } - if id == -1 { - return fmt.Errorf("Failed to remove %v server from %v upstream: server doesn't exists", server, upstream) + + return servers, nil +} + +// AddStreamServer adds the server to the upstream. +func (client *NginxClient) AddStreamServer(upstream string, server StreamUpstreamServer) error { + id, err := client.getIDOfStreamServer(upstream, server.Server) + + if err != nil { + return fmt.Errorf("failed to add %v stream server to %v upstream: %v", server.Server, upstream, err) + } + if id != -1 { + return fmt.Errorf("failed to add %v stream server to %v upstream: server already exists", server.Server, upstream) } - request := fmt.Sprintf("%v&upstream=%v&remove=&id=%v", client.upstreamConfEndpoint, upstream, id) + path := fmt.Sprintf("stream/upstreams/%v/servers/", upstream) + err = client.post(path, &server) - resp, err := client.httpClient.Get(request) if err != nil { - return fmt.Errorf("Failed to remove %v server from %v upstream: %v", server, upstream, err) + return fmt.Errorf("failed to add %v stream server to %v upstream: %v", server.Server, upstream, err) } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - return fmt.Errorf("Failed to remove %v server to %v upstream: expected 200 or 204 response, got %v", server, upstream, resp.StatusCode) + return nil +} + +// DeleteStreamServer the server from the upstream. +func (client *NginxClient) DeleteStreamServer(upstream string, server string) error { + id, err := client.getIDOfStreamServer(upstream, server) + if err != nil { + return fmt.Errorf("failed to remove %v stream server from %v upstream: %v", server, upstream, err) + } + if id == -1 { + return fmt.Errorf("failed to remove %v stream server from %v upstream: server doesn't exist", server, upstream) + } + + path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, id) + err = client.delete(path) + + if err != nil { + return fmt.Errorf("failed to remove %v stream server from %v upstream: %v", server, upstream, err) } return nil } -// UpdateServers updates the servers of the upstream. +// UpdateStreamServers updates the servers of the upstream. // Servers that are in the slice, but don't exist in NGINX will be added to NGINX. // Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX. -func (client *Client) UpdateServers(upstream string, servers []string) ([]string, []string, error) { - serversInNginx, err := client.GetServers(upstream) +func (client *NginxClient) UpdateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, []StreamUpstreamServer, error) { + serversInNginx, err := client.GetStreamServers(upstream) if err != nil { - return nil, nil, fmt.Errorf("Failed to update servers of %v upstream: %v", upstream, err) + return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) } - toAdd, toDelete := determineUpdates(servers, serversInNginx) + toAdd, toDelete := determineStreamUpdates(servers, serversInNginx) for _, server := range toAdd { - err := client.AddServer(upstream, server) + err := client.AddStreamServer(upstream, server) if err != nil { - return nil, nil, fmt.Errorf("Failed to update servers of %v upstream: %v", upstream, err) + return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) } } for _, server := range toDelete { - err := client.DeleteServer(upstream, server) + err := client.DeleteStreamServer(upstream, server.Server) if err != nil { - return nil, nil, fmt.Errorf("Failed to update servers of %v upstream: %v", upstream, err) + return nil, nil, fmt.Errorf("failed to update stream servers of %v upstream: %v", upstream, err) } } return toAdd, toDelete, nil } -func determineUpdates(updatedServers []string, nginxServers []string) (toAdd []string, toRemove []string) { +func (client *NginxClient) getIDOfStreamServer(upstream string, name string) (int, error) { + servers, err := client.GetStreamServers(upstream) + if err != nil { + return -1, fmt.Errorf("error getting id of stream server %v of upstream %v: %v", name, upstream, err) + } + + for _, s := range servers { + if s.Server == name { + return s.ID, nil + } + } + + return -1, nil +} + +func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer) { for _, server := range updatedServers { found := false for _, serverNGX := range nginxServers { - if server == serverNGX { + if server.Server == serverNGX.Server { found = true break } @@ -273,7 +561,7 @@ func determineUpdates(updatedServers []string, nginxServers []string) (toAdd []s for _, serverNGX := range nginxServers { found := false for _, server := range updatedServers { - if serverNGX == server { + if serverNGX.Server == server.Server { found = true break } @@ -286,32 +574,85 @@ func determineUpdates(updatedServers []string, nginxServers []string) (toAdd []s return } -// GetServers returns the servers of the upsteam from NGINX. -func (client *Client) GetServers(upstream string) ([]string, error) { - peers, err := client.getUpstreamPeers(upstream) +// GetStats gets connection, request, ssl, zone, and upstream related stats from the NGINX Plus API. +func (client *NginxClient) GetStats() (*Stats, error) { + cons, err := client.getConnections() if err != nil { - return nil, fmt.Errorf("Error getting servers of %v upstream: %v", upstream, err) + return nil, fmt.Errorf("failed to get stats: %v", err) } - var servers []string - for _, peer := range peers.Peers { - servers = append(servers, peer.Server) + requests, err := client.getHTTPRequests() + if err != nil { + return nil, fmt.Errorf("Failed to get stats: %v", err) } - return servers, nil + ssl, err := client.getSSL() + if err != nil { + return nil, fmt.Errorf("failed to get stats: %v", err) + } + + zones, err := client.getServerZones() + if err != nil { + return nil, fmt.Errorf("failed to get stats: %v", err) + } + + upstreams, err := client.getUpstreams() + if err != nil { + return nil, fmt.Errorf("failed to get stats: %v", err) + } + + return &Stats{ + Connections: *cons, + HTTPRequests: *requests, + SSL: *ssl, + ServerZones: *zones, + Upstreams: *upstreams, + }, nil } -func (client *Client) getIDOfServer(upstream string, name string) (int, error) { - peers, err := client.getUpstreamPeers(upstream) +func (client *NginxClient) getConnections() (*Connections, error) { + var cons Connections + err := client.get("connections", &cons) if err != nil { - return -1, fmt.Errorf("Error getting id of server %v of upstream %v:", name, upstream) + return nil, fmt.Errorf("failed to get connections: %v", err) } + return &cons, nil +} - for _, p := range peers.Peers { - if p.Server == name { - return p.ID, nil - } +func (client *NginxClient) getHTTPRequests() (*HTTPRequests, error) { + var requests HTTPRequests + + err := client.get("http/requests", &requests) + if err != nil { + return nil, fmt.Errorf("failed to get http requests: %v", err) } - return -1, nil + return &requests, nil +} + +func (client *NginxClient) getSSL() (*SSL, error) { + var ssl SSL + err := client.get("ssl", &ssl) + if err != nil { + return nil, fmt.Errorf("failed to get ssl: %v", err) + } + return &ssl, nil +} + +func (client *NginxClient) getServerZones() (*ServerZones, error) { + var zones ServerZones + err := client.get("http/server_zones", &zones) + if err != nil { + return nil, fmt.Errorf("failed to get server zones: %v", err) + } + return &zones, err +} + +func (client *NginxClient) getUpstreams() (*Upstreams, error) { + var upstreams Upstreams + err := client.get("http/upstreams", &upstreams) + if err != nil { + return nil, fmt.Errorf("failed to get upstreams: %v", err) + } + return &upstreams, nil } diff --git a/cmd/sync/nginx_test.go b/cmd/sync/nginx_test.go deleted file mode 100644 index 8a3a9eae..00000000 --- a/cmd/sync/nginx_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import ( - "reflect" - "testing" -) - -func TestDetermineUpdates(t *testing.T) { - var tests = []struct { - updated []string - nginx []string - expectedToAdd []string - expectedToDelete []string - }{{ - updated: []string{"10.0.0.3:80", "10.0.0.4:80"}, - nginx: []string{"10.0.0.1:80", "10.0.0.2:80"}, - expectedToAdd: []string{"10.0.0.3:80", "10.0.0.4:80"}, - expectedToDelete: []string{"10.0.0.1:80", "10.0.0.2:80"}, - }, { - updated: []string{"10.0.0.2:80", "10.0.0.3:80", "10.0.0.4:80"}, - nginx: []string{"10.0.0.1:80", "10.0.0.2:80", "10.0.0.3:80"}, - expectedToAdd: []string{"10.0.0.4:80"}, - expectedToDelete: []string{"10.0.0.1:80"}, - }, { - updated: []string{"10.0.0.1:80", "10.0.0.2:80", "10.0.0.3:80"}, - nginx: []string{"10.0.0.1:80", "10.0.0.2:80", "10.0.0.3:80"}, - }, { - // empty values - }} - - for _, test := range tests { - toAdd, toDelete := determineUpdates(test.updated, test.nginx) - if !reflect.DeepEqual(toAdd, test.expectedToAdd) || !reflect.DeepEqual(toDelete, test.expectedToDelete) { - t.Errorf("determiteUpdates(%v, %v) = (%v, %v)", test.updated, test.nginx, toAdd, toDelete) - } - } -}