Permalink
Browse files

Disable KeepAlive on proxy (#21)

* Service lookup

* Disable KeepAlive to force RR for replias.

* set KeepAlive to 0

* Remove temporary file

* Support white and rename function

* Remove debug from node sample
  • Loading branch information...
alexellis committed Mar 21, 2017
1 parent 9cf4376 commit 2aeadfda1c9e98fb7d7b7f18f77837f355a3bd08
@@ -8,3 +8,8 @@ This is a useful Prometheus query to show:
http://localhost:9090/graph?g0.range_input=15m&g0.expr=gateway_service_count&g0.tab=0&g1.range_input=15m&g1.expr=rate(gateway_function_invocation_total%5B20s%5D)&g1.tab=0&g2.range_input=15m&g2.expr=gateway_functions_seconds_sum+%2F+gateway_functions_seconds_count&g2.tab=0
```
$ docker service ls -q |xargs -n 1 -I {} docker service scale {}=10;docker service scale func_gateway=1 ;
$ docker service scale func_prometheus=1 ; docker service scale func_alertmanager=1
```
@@ -12,6 +12,38 @@ import (
"github.com/docker/docker/client"
)
// MakeAlertHandler handles alerts from Prometheus Alertmanager
func MakeAlertHandler(c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Alert received.")
body, readErr := ioutil.ReadAll(r.Body)
log.Println(string(body))
if readErr != nil {
log.Println(readErr)
return
}
var req requests.PrometheusAlert
err := json.Unmarshal(body, &req)
if err != nil {
log.Println(err)
return
}
if len(req.Alerts) > 0 {
if err := scaleService(req, c); err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
}
}
}
// CalculateReplicas decides what replica count to set depending on a Prometheus alert
func CalculateReplicas(status string, currentReplicas uint64) uint64 {
newReplicas := currentReplicas
@@ -34,61 +66,35 @@ func CalculateReplicas(status string, currentReplicas uint64) uint64 {
func scaleService(req requests.PrometheusAlert, c *client.Client) error {
var err error
//Todo: convert to loop / handler.
serviceName := req.Alerts[0].Labels.FunctionName
service, _, inspectErr := c.ServiceInspectWithRaw(context.Background(), serviceName)
if inspectErr == nil {
currentReplicas := *service.Spec.Mode.Replicated.Replicas
status := req.Status
newReplicas := CalculateReplicas(status, currentReplicas)
if len(serviceName) > 0 {
if newReplicas == currentReplicas {
return nil
}
service, _, inspectErr := c.ServiceInspectWithRaw(context.Background(), serviceName)
if inspectErr == nil {
log.Printf("Scaling %s to %d replicas.\n", serviceName, newReplicas)
service.Spec.Mode.Replicated.Replicas = &newReplicas
updateOpts := types.ServiceUpdateOptions{}
updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec
currentReplicas := *service.Spec.Mode.Replicated.Replicas
status := req.Status
newReplicas := CalculateReplicas(status, currentReplicas)
response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts)
if updateErr != nil {
err = updateErr
}
log.Println(response)
} else {
err = inspectErr
}
return err
}
func MakeAlertHandler(c *client.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
log.Println("Alert received.")
body, readErr := ioutil.ReadAll(r.Body)
if readErr != nil {
log.Println(readErr)
return
}
var req requests.PrometheusAlert
err := json.Unmarshal(body, &req)
if err != nil {
log.Println(err)
return
}
if newReplicas == currentReplicas {
return nil
}
if len(req.Alerts) > 0 {
log.Printf("Scaling %s to %d replicas.\n", serviceName, newReplicas)
service.Spec.Mode.Replicated.Replicas = &newReplicas
updateOpts := types.ServiceUpdateOptions{}
updateOpts.RegistryAuthFrom = types.RegistryAuthFromSpec
if err := scaleService(req, c); err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
response, updateErr := c.ServiceUpdate(context.Background(), service.ID, service.Version, service.Spec, updateOpts)
if updateErr != nil {
err = updateErr
}
log.Println(response)
} else {
err = inspectErr
}
}
return err
}
@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"time"
@@ -20,6 +21,20 @@ import (
// MakeProxy creates a proxy for HTTP web requests which can be routed to a function.
func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client, logger *logrus.Logger) http.HandlerFunc {
proxyClient := http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 3 * time.Second,
KeepAlive: 0,
}).DialContext,
MaxIdleConns: 1,
DisableKeepAlives: true,
IdleConnTimeout: 120 * time.Millisecond,
ExpectContinueTimeout: 1500 * time.Millisecond,
},
}
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
@@ -31,11 +46,11 @@ func MakeProxy(metrics metrics.MetricOptions, wildcard bool, c *client.Client, l
vars := mux.Vars(r)
name := vars["name"]
fmt.Println("invoke by name")
lookupInvoke(w, r, metrics, name, c, logger)
lookupInvoke(w, r, metrics, name, c, logger, &proxyClient)
defer r.Body.Close()
} else if len(header) > 0 {
lookupInvoke(w, r, metrics, header[0], c, logger)
lookupInvoke(w, r, metrics, header[0], c, logger, &proxyClient)
defer r.Body.Close()
} else {
w.WriteHeader(http.StatusBadRequest)
@@ -59,7 +74,7 @@ func trackTime(then time.Time, metrics metrics.MetricOptions, name string) {
metrics.GatewayFunctionsHistogram.WithLabelValues(name).Observe(since.Seconds())
}
func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, name string, c *client.Client, logger *logrus.Logger) {
func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, name string, c *client.Client, logger *logrus.Logger, proxyClient *http.Client) {
exists, err := lookupSwarmService(name, c)
if err != nil || exists == false {
@@ -75,7 +90,7 @@ func lookupInvoke(w http.ResponseWriter, r *http.Request, metrics metrics.Metric
if exists {
defer trackTime(time.Now(), metrics, name)
requestBody, _ := ioutil.ReadAll(r.Body)
invokeService(w, r, metrics, name, requestBody, logger)
invokeService(w, r, metrics, name, requestBody, logger, proxyClient)
}
}
@@ -88,7 +103,7 @@ func lookupSwarmService(serviceName string, c *client.Client) (bool, error) {
return len(services) > 0, err
}
func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte, logger *logrus.Logger) {
func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.MetricOptions, service string, requestBody []byte, logger *logrus.Logger, proxyClient *http.Client) {
stamp := strconv.FormatInt(time.Now().Unix(), 10)
defer func(when time.Time) {
@@ -98,17 +113,27 @@ func invokeService(w http.ResponseWriter, r *http.Request, metrics metrics.Metri
metrics.GatewayFunctionsHistogram.WithLabelValues(service).Observe(seconds)
}(time.Now())
// start := time.Now()
buf := bytes.NewBuffer(requestBody)
url := "http://" + service + ":" + strconv.Itoa(8080) + "/"
watchdogPort := 8080
addr, lookupErr := net.LookupIP(service)
var url string
if len(addr) > 0 && lookupErr == nil {
url = fmt.Sprintf("http://%s:%d/", addr[0].String(), watchdogPort)
} else {
url = fmt.Sprintf("http://%s:%d/", service, watchdogPort)
}
contentType := r.Header.Get("Content-Type")
if len(contentType) == 0 {
contentType = "text/plain"
}
fmt.Printf("[%s] Forwarding request [%s] to: %s\n", stamp, contentType, url)
response, err := http.Post(url, r.Header.Get("Content-Type"), buf)
request, err := http.NewRequest("POST", url, bytes.NewReader(requestBody))
request.Header.Add("Content-Type", contentType)
defer request.Body.Close()
response, err := proxyClient.Do(request)
if err != nil {
logger.Infoln(err)
writeHead(service, metrics, http.StatusInternalServerError, w)
@@ -1,47 +1,47 @@
{
"receiver":"scale-up",
"status":"firing",
"alerts":[
{
"status":"firing",
"labels":{
"alertname":"APIHighInvocationRate",
"function_name":"func_echoit",
"instance":"gateway:8080",
"job":"gateway",
"monitor":"faas-monitor",
"service":"gateway",
"severity":"major",
"value":"8"
},
"annotations":{
"description":"High invocation total on gateway:8080",
"summary":"High invocation total on gateway:8080"
},
"startsAt":"2017-01-22T10:40:52.804Z",
"endsAt":"0001-01-01T00:00:00Z",
"generatorURL":"http://bb1b23e87070:9090/graph?g0.expr=rate%28gateway_function_invocation_total%5B10s%5D%29+%3E+5\u0026g0.tab=0"
}
],
"groupLabels":{
"alertname":"APIHighInvocationRate",
"service":"gateway"
},
"commonLabels":{
"alertname":"APIHighInvocationRate",
"function_name":"func_echoit",
"instance":"gateway:8080",
"job":"gateway",
"monitor":"faas-monitor",
"service":"gateway",
"severity":"major",
"value":"8"
},
"commonAnnotations":{
"description":"High invocation total on gateway:8080",
"summary":"High invocation total on gateway:8080"
},
"externalURL":"http://c052c835bcee:9093",
"version":"3",
"groupKey":18195285354214864953
"receiver": "scale-up",
"status": "firing",
"alerts": [{
"status": "firing",
"labels": {
"alertname": "APIHighInvocationRate",
"code": "200",
"function_name": "func_nodeinfo",
"instance": "gateway:8080",
"job": "gateway",
"monitor": "faas-monitor",
"service": "gateway",
"severity": "major",
"value": "8.998200359928017"
},
"annotations": {
"description": "High invocation total on gateway:8080",
"summary": "High invocation total on gateway:8080"
},
"startsAt": "2017-03-15T15:52:57.805Z",
"endsAt": "0001-01-01T00:00:00Z",
"generatorURL": "http://4156cb797423:9090/graph?g0.expr=rate%28gateway_function_invocation_total%5B10s%5D%29+%3E+5\u0026g0.tab=0"
}],
"groupLabels": {
"alertname": "APIHighInvocationRate",
"service": "gateway"
},
"commonLabels": {
"alertname": "APIHighInvocationRate",
"code": "200",
"function_name": "func_nodeinfo",
"instance": "gateway:8080",
"job": "gateway",
"monitor": "faas-monitor",
"service": "gateway",
"severity": "major",
"value": "8.998200359928017"
},
"commonAnnotations": {
"description": "High invocation total on gateway:8080",
"summary": "High invocation total on gateway:8080"
},
"externalURL": "http://f054879d97db:9093",
"version": "3",
"groupKey": 18195285354214864953
}
@@ -0,0 +1,47 @@
{
"receiver": "scale-up",
"status": "resolved",
"alerts": [{
"status": "resolved",
"labels": {
"alertname": "APIHighInvocationRate",
"code": "200",
"function_name": "func_nodeinfo",
"instance": "gateway:8080",
"job": "gateway",
"monitor": "faas-monitor",
"service": "gateway",
"severity": "major",
"value": "8.998200359928017"
},
"annotations": {
"description": "High invocation total on gateway:8080",
"summary": "High invocation total on gateway:8080"
},
"startsAt": "2017-03-15T15:52:57.805Z",
"endsAt": "2017-03-15T15:53:52.806Z",
"generatorURL": "http://4156cb797423:9090/graph?g0.expr=rate%28gateway_function_invocation_total%5B10s%5D%29+%3E+5\u0026g0.tab=0"
}],
"groupLabels": {
"alertname": "APIHighInvocationRate",
"service": "gateway"
},
"commonLabels": {
"alertname": "APIHighInvocationRate",
"code": "200",
"function_name": "func_nodeinfo",
"instance": "gateway:8080",
"job": "gateway",
"monitor": "faas-monitor",
"service": "gateway",
"severity": "major",
"value": "8.998200359928017"
},
"commonAnnotations": {
"description": "High invocation total on gateway:8080",
"summary": "High invocation total on gateway:8080"
},
"externalURL": "http://f054879d97db:9093",
"version": "3",
"groupKey": 18195285354214864953
}
BIN -153 Bytes monitor/._prometheus.yml
Binary file not shown.
@@ -6,11 +6,11 @@ ADD https://github.com/alexellis/faas/releases/download/v0.3-alpha/fwatchdog /us
RUN chmod +x /usr/bin/fwatchdog
COPY package.json .
RUN npm i
COPY handler.js .
COPY sendColor.js .
COPY sample_response.json .
RUN npm i
ENV fprocess="node handler.js"
CMD ["fwatchdog"]
@@ -1,4 +1,3 @@
#!/bin/bash
docker build -t alexellis2/faas-alexachangecolorintent:latest-dev1 .
#!/bin/sh
docker build -t functions/alexa-leds:latest .
Oops, something went wrong.

0 comments on commit 2aeadfd

Please sign in to comment.