Skip to content

Commit

Permalink
Publish to multiple topics
Browse files Browse the repository at this point in the history
Enables publishing to various topics according to annotations
on the functions. The function cache is moved up one level so
that it can be shared between the scale from zero code and the
queue proxy.

Unit tests added for new internal methods.

Tested e2e with arkade and the newest queue-worker and RC
gateway image with two queues and an annotation on one of the
functions of com.openfaas.queue. It worked as expected including
with multiple namespace support.

Signed-off-by: Alex Ellis (OpenFaaS Ltd) <alexellis2@gmail.com>
  • Loading branch information
alexellis committed Apr 22, 2020
1 parent a7c6c39 commit 2bfca6d
Show file tree
Hide file tree
Showing 22 changed files with 264 additions and 106 deletions.
1 change: 1 addition & 0 deletions gateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ COPY queue queue
COPY plugin plugin
COPY version version
COPY scaling scaling
COPY pkg pkg
COPY main.go .

# Run a gofmt and exclude all vendored code.
Expand Down
1 change: 1 addition & 0 deletions gateway/Dockerfile.arm64
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COPY queue queue
COPY plugin plugin
COPY version version
COPY scaling scaling
COPY pkg pkg
COPY main.go .

# Run a gofmt and exclude all vendored code.
Expand Down
1 change: 1 addition & 0 deletions gateway/Dockerfile.armhf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COPY queue queue
COPY plugin plugin
COPY version version
COPY scaling scaling
COPY pkg pkg
COPY main.go .

# Run a gofmt and exclude all vendored code.
Expand Down
12 changes: 6 additions & 6 deletions gateway/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion gateway/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

[[constraint]]
name = "github.com/openfaas/nats-queue-worker"
version = "0.10.0"
version = "0.10.1"

[[constraint]]
name = "github.com/prometheus/client_golang"
Expand Down
5 changes: 3 additions & 2 deletions gateway/handlers/forwarding_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/types"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy,
notifiers []HTTPNotifier,
baseURLResolver BaseURLResolver,
urlPathTransformer URLPathTransformer,
serviceAuthInjector AuthInjector) http.HandlerFunc {
serviceAuthInjector middleware.AuthInjector) http.HandlerFunc {

writeRequestURI := false
if _, exists := os.LookupEnv("write_request_uri"); exists {
Expand Down Expand Up @@ -108,7 +109,7 @@ func forwardRequest(w http.ResponseWriter,
requestURL string,
timeout time.Duration,
writeRequestURI bool,
serviceAuthInjector AuthInjector) (int, error) {
serviceAuthInjector middleware.AuthInjector) (int, error) {

upstreamReq := buildUpstreamRequest(r, baseURL, requestURL)
if upstreamReq.Body != nil {
Expand Down
90 changes: 69 additions & 21 deletions gateway/handlers/queue_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ package handlers
import (
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"

"github.com/gorilla/mux"
"github.com/openfaas/faas/gateway/metrics"
"github.com/openfaas/faas/gateway/queue"
"github.com/openfaas/faas/gateway/scaling"
)

// MakeQueuedProxy accepts work onto a queue
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueRequests queue.CanQueueRequests, pathTransformer URLPathTransformer) http.HandlerFunc {
func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, queuer queue.RequestQueuer, pathTransformer URLPathTransformer, defaultNS string, functionCacher scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
Expand All @@ -24,29 +27,20 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque
body, err := ioutil.ReadAll(r.Body)

if err != nil {
w.WriteHeader(http.StatusBadRequest)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

w.Write([]byte(err.Error()))
callbackURL, err := getCallbackURLHeader(r.Header)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

vars := mux.Vars(r)
name := vars["name"]

callbackURLHeader := r.Header.Get("X-Callback-Url")
var callbackURL *url.URL

if len(callbackURLHeader) > 0 {
urlVal, urlErr := url.Parse(callbackURLHeader)
if urlErr != nil {
w.WriteHeader(http.StatusBadRequest)

w.Write([]byte(urlErr.Error()))
return
}

callbackURL = urlVal
}
queueName, err := getQueueName(name, functionCacher, serviceQuery)

req := &queue.Request{
Function: name,
Expand All @@ -57,15 +51,69 @@ func MakeQueuedProxy(metrics metrics.MetricOptions, wildcard bool, canQueueReque
Header: r.Header,
Host: r.Host,
CallbackURL: callbackURL,
QueueName: queueName,
}

if len(queueName) > 0 {
log.Printf("Queueing %s to: %s\n", name, queueName)
}

if err = canQueueRequests.Queue(req); err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
fmt.Println(err)
if err = queuer.Queue(req); err != nil {
fmt.Printf("Queue error: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusAccepted)
}
}

func getQueueName(name string, cache scaling.FunctionCacher, serviceQuery scaling.ServiceQuery) (queueName string, err error) {
fn, ns := getNameParts(name)

query, hit := cache.Get(fn, ns)
if !hit {
queryResponse, err := serviceQuery.GetReplicas(fn, ns)
if err != nil {
return "", err
}
cache.Set(fn, ns, queryResponse)
}

query, _ = cache.Get(fn, ns)

queueName = ""
if query.Annotations != nil {
if v := (*query.Annotations)["com.openfaas.queue"]; len(v) > 0 {
queueName = v
}
}
return queueName, err
}

func getCallbackURLHeader(header http.Header) (*url.URL, error) {
value := header.Get("X-Callback-Url")
var callbackURL *url.URL

if len(value) > 0 {
urlVal, err := url.Parse(value)
if err != nil {
return callbackURL, err
}

callbackURL = urlVal
}

return callbackURL, nil
}

func getNameParts(name string) (fn, ns string) {
fn = name
ns = ""

if index := strings.LastIndex(name, "."); index > 0 {
fn = name[:index]
ns = name[index+1:]
}
return fn, ns
}
74 changes: 74 additions & 0 deletions gateway/handlers/queue_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) Alex Ellis 2017. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package handlers

import (
"net/http"
"testing"
)

func Test_getNameParts(t *testing.T) {
fn, ns := getNameParts("figlet.openfaas-fn")
wantFn := "figlet"
wantNs := "openfaas-fn"

if fn != wantFn {
t.Fatalf("want %s, got %s", wantFn, fn)
}
if ns != wantNs {
t.Fatalf("want %s, got %s", wantNs, ns)
}
}

func Test_getNamePartsDualDot(t *testing.T) {
fn, ns := getNameParts("dev.figlet.openfaas-fn")
wantFn := "dev.figlet"
wantNs := "openfaas-fn"

if fn != wantFn {
t.Fatalf("want %s, got %s", wantFn, fn)
}
if ns != wantNs {
t.Fatalf("want %s, got %s", wantNs, ns)
}
}

func Test_getNameParts_NoNs(t *testing.T) {
fn, ns := getNameParts("figlet")
wantFn := "figlet"
wantNs := ""

if fn != wantFn {
t.Fatalf("want %s, got %s", wantFn, fn)
}
if ns != wantNs {
t.Fatalf("want %s, got %s", wantNs, ns)
}
}

func Test_getCallbackURLHeader(t *testing.T) {
want := "http://localhost:8080"
header := http.Header{}
header.Add("X-Callback-Url", want)

uri, err := getCallbackURLHeader(header)
if err != nil {
t.Fatal(err)
}

if uri.String() != want {
t.Fatalf("want %s, but got %s", want, uri.String())
}
}

func Test_getCallbackURLHeader_ParseFails(t *testing.T) {
want := "ht tp://foo.com"
header := http.Header{}
header.Add("X-Callback-Url", want)

_, err := getCallbackURLHeader(header)
if err == nil {
t.Fatal("wanted a parsing error.")
}
}
4 changes: 1 addition & 3 deletions gateway/handlers/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ func getNamespace(defaultNamespace, fullName string) (string, string) {
// be called. If the function is not ready after the configured
// amount of attempts / queries then next will not be invoked and a status
// will be returned to the client.
func MakeScalingHandler(next http.HandlerFunc, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc {

scaler := scaling.NewFunctionScaler(config)
func MakeScalingHandler(next http.HandlerFunc, scaler scaling.FunctionScaler, config scaling.ScalingConfig, defaultNamespace string) http.HandlerFunc {

return func(w http.ResponseWriter, r *http.Request) {

Expand Down
40 changes: 23 additions & 17 deletions gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openfaas/faas-provider/auth"
"github.com/openfaas/faas/gateway/handlers"
"github.com/openfaas/faas/gateway/metrics"
"github.com/openfaas/faas/gateway/pkg/middleware"
"github.com/openfaas/faas/gateway/plugin"
"github.com/openfaas/faas/gateway/scaling"
"github.com/openfaas/faas/gateway/types"
Expand Down Expand Up @@ -60,6 +61,9 @@ func main() {

servicePollInterval := time.Second * 5

metadataQuery := metrics.NewMetadataQuery(credentials)
fmt.Println(metadataQuery)

metricsOptions := metrics.BuildMetricsOptions()
exporter := metrics.NewExporter(metricsOptions, credentials)
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
Expand Down Expand Up @@ -100,10 +104,10 @@ func main() {
functionURLTransformer = nilURLTransformer
}

var serviceAuthInjector handlers.AuthInjector
var serviceAuthInjector middleware.AuthInjector

if config.UseBasicAuth {
serviceAuthInjector = &handlers.BasicAuthInjector{Credentials: credentials}
serviceAuthInjector = &middleware.BasicAuthInjector{Credentials: credentials}
}

decorateExternalAuth := handlers.MakeExternalAuthHandler
Expand All @@ -129,6 +133,21 @@ func main() {

faasHandlers.LogProxyHandler = handlers.NewLogHandlerFunc(*config.LogsProviderURL, config.WriteTimeout)

scalingConfig := scaling.ScalingConfig{
MaxPollCount: uint(1000),
SetScaleRetries: uint(20),
FunctionPollInterval: time.Millisecond * 50,
CacheExpiry: time.Second * 5, // freshness of replica values before going stale
ServiceQuery: externalServiceQuery,
}

functionProxy := faasHandlers.Proxy
if config.ScaleFromZero {
scalingFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
scaler := scaling.NewFunctionScaler(scalingConfig, scalingFunctionCache)
functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scaler, scalingConfig, config.Namespace)
}

if config.UseNATS() {
log.Println("Async enabled: Using NATS Streaming.")
maxReconnect := 60
Expand All @@ -141,8 +160,9 @@ func main() {
log.Fatalln(queueErr)
}

queueFunctionCache := scaling.NewFunctionCache(scalingConfig.CacheExpiry)
faasHandlers.QueuedProxy = handlers.MakeNotifierWrapper(
handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer)),
handlers.MakeCallIDMiddleware(handlers.MakeQueuedProxy(metricsOptions, true, natsQueue, trimURLTransformer, config.Namespace, queueFunctionCache, externalServiceQuery)),
forwardingNotifiers,
)

Expand Down Expand Up @@ -188,20 +208,6 @@ func main() {
r := mux.NewRouter()
// max wait time to start a function = maxPollCount * functionPollInterval

functionProxy := faasHandlers.Proxy

if config.ScaleFromZero {
scalingConfig := scaling.ScalingConfig{
MaxPollCount: uint(1000),
SetScaleRetries: uint(20),
FunctionPollInterval: time.Millisecond * 50,
CacheExpiry: time.Second * 5, // freshness of replica values before going stale
ServiceQuery: externalServiceQuery,
}

functionProxy = handlers.MakeScalingHandler(faasHandlers.Proxy, scalingConfig, config.Namespace)
}

r.HandleFunc("/function/{name:["+NameExpression+"]+}", functionProxy)
r.HandleFunc("/function/{name:["+NameExpression+"]+}/", functionProxy)
r.HandleFunc("/function/{name:["+NameExpression+"]+}/{params:.*}", functionProxy)
Expand Down
Loading

0 comments on commit 2bfca6d

Please sign in to comment.