Skip to content

Commit

Permalink
remove unused providers (emulator, weighted) (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
mthenw committed Mar 16, 2018
1 parent cd14c40 commit 393787c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 183 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
0.x.x / xxxx-xx-xx
==================

BACKWARDS INCOMPATIBILITIES:

* remove unused providers (emulator, weighted) (#387)
* refactor Prometheus metrics. Less metrics, more labels (#384)
* use different messages on lambda errors (#381)
* add more detailed metrics (#379)
* improve function invocation error (#377)

IMPROVEMENTS:

* hide sensitive information in logs (#385)
* minimalize Dockerfile (#378)

BUG FIXES:

* fix for UpdateFunction (#382)
* update create subscription docs regarding path param
* update README with info about Docker image. Closes #325 (#375)

0.6.0 / 2018-02-19
==================
Expand Down Expand Up @@ -281,3 +302,4 @@ BUG FIXES:
* start workers (#126)
* add info about default config api port
* update API path

110 changes: 1 addition & 109 deletions function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ package function

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/url"
"path"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -46,7 +42,7 @@ type ID string

// Provider provides provider specific info about a function.
type Provider struct {
Type ProviderType `json:"type" validate:"required,eq=awslambda|eq=http|eq=weighted|eq=emulator"`
Type ProviderType `json:"type" validate:"required,eq=awslambda|eq=http"`

// AWS Lambda function
ARN string `json:"arn,omitempty"`
Expand All @@ -55,15 +51,8 @@ type Provider struct {
AWSSecretAccessKey string `json:"awsSecretAccessKey,omitempty"`
AWSSessionToken string `json:"awsSessionToken,omitempty"`

// Group weighted function
Weighted WeightedFunctions `json:"weighted,omitempty"`

// HTTP function
URL string `json:"url,omitempty" validate:"omitempty,url"`

// Emulator function
EmulatorURL string `json:"emulatorUrl,omitempty"`
APIVersion string `json:"apiVersion,omitempty"`
}

// ProviderType represents what kind of function provider this is.
Expand All @@ -87,81 +76,29 @@ func (p Provider) MarshalLogObject(enc zapcore.ObjectEncoder) error {
if p.URL != "" {
enc.AddString("url", p.URL)
}
if p.EmulatorURL != "" {
enc.AddString("emulatorUrl", p.EmulatorURL)
}
if p.APIVersion != "" {
enc.AddString("apiVersion", p.APIVersion)
}

return nil
}

const (
// AWSLambda represents AWS Lambda function.
AWSLambda ProviderType = "awslambda"
// Weighted contains a set of other functions and their load balancing weights.
Weighted ProviderType = "weighted"
// HTTPEndpoint represents function defined as HTTP endpoint.
HTTPEndpoint ProviderType = "http"
// Emulator represents a function registered with the local emulator.
Emulator ProviderType = "emulator"
)

// Call tries to send a payload to a target function
func (f *Function) Call(payload []byte) ([]byte, error) {
switch f.Provider.Type {
case AWSLambda:
return f.callAWSLambda(payload)
case Emulator:
return f.callEmulator(payload)
case HTTPEndpoint:
return f.callHTTP(payload)
}

return []byte{}, errors.New("calling this kind of function is not implemented")
}

// WeightedFunction is a function along with its load-balacing proportional weight.
type WeightedFunction struct {
FunctionID ID `json:"functionId" validate:"required"`
Weight uint `json:"weight" validate:"required"`
}

// WeightedFunctions is a slice of WeightedFunction's that you can choose from based on weight
type WeightedFunctions []WeightedFunction

// Choose uses the function weights to pick a single one.
func (w WeightedFunctions) Choose() (ID, error) {
var chosenFunction ID

if len(w) == 1 {
chosenFunction = w[0].FunctionID
} else {
weightTotal := uint(0)
for _, wf := range w {
weightTotal += wf.Weight
}

if weightTotal < 1 {
err := errors.New("target function weights sum to 0, there is not one function to target")
return ID(""), err
}

chosenWeight := uint(1 + rand.Intn(int(weightTotal)))
weightsSoFar := uint(0)
for _, wf := range w {
chosenFunction = wf.FunctionID
weightsSoFar += wf.Weight
if weightsSoFar >= chosenWeight {
break
}
}
}

return chosenFunction, nil
}

// nolint: gocyclo
func (f *Function) callAWSLambda(payload []byte) ([]byte, error) {
config := aws.NewConfig().WithRegion(f.Provider.Region)
Expand Down Expand Up @@ -217,48 +154,3 @@ func (f *Function) callHTTP(payload []byte) ([]byte, error) {
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}

func (f *Function) callEmulator(payload []byte) ([]byte, error) {
type emulatorInvokeSchema struct {
FunctionID string `json:"functionId"`
Payload interface{} `json:"payload"`
}

client := http.Client{
Timeout: time.Second * 5,
}

var invokePayload interface{}
err := json.Unmarshal(payload, &invokePayload)
if err != nil {
return nil, err
}

emulatorURL, err := url.Parse(f.Provider.EmulatorURL)
if err != nil {
return nil, fmt.Errorf("Invalid Emulator URL %q for Function %q", f.Provider.EmulatorURL, f.ID)
}

switch f.Provider.APIVersion {
case "v0":
emulatorURL.Path = path.Join(f.Provider.APIVersion, "emulator/api/functions/invoke")
default:
return nil, fmt.Errorf("Invalid Emulator API version %q for Function %q", f.Provider.APIVersion, f.ID)
}

emulatorPayload, err := json.Marshal(emulatorInvokeSchema{FunctionID: string(f.ID), Payload: invokePayload})
if err != nil {
return nil, err
}

resp, err := client.Post(emulatorURL.String(), "application/json", bytes.NewReader(emulatorPayload))
if err != nil {
return nil, &ErrFunctionCallFailed{err}
}
if resp.StatusCode == http.StatusInternalServerError {
return nil, &ErrFunctionError{fmt.Errorf("HTTP status code: %d", http.StatusInternalServerError)}
}

defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
34 changes: 0 additions & 34 deletions libkv/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,44 +163,10 @@ func (service Service) validateFunction(fn *function.Function) error {
}
}

if fn.Provider.Type == function.Emulator {
return service.validateEmulator(fn)
}

if fn.Provider.Type == function.HTTPEndpoint && fn.Provider.URL == "" {
return &function.ErrFunctionValidation{Message: "Missing required fields for HTTP endpoint."}
}

if fn.Provider.Type == function.Weighted {
return service.validateWeighted(fn)
}

return nil
}

func (service Service) validateEmulator(fn *function.Function) error {
if fn.Provider.EmulatorURL == "" {
return &function.ErrFunctionValidation{Message: "Missing required field emulatorURL for Emulator function."}
} else if fn.Provider.APIVersion == "" {
return &function.ErrFunctionValidation{Message: "Missing required field apiVersion for Emulator function."}
}
return nil
}

func (service Service) validateWeighted(fn *function.Function) error {
if len(fn.Provider.Weighted) == 0 {
return &function.ErrFunctionValidation{Message: "Missing required fields for weighted function."}
}

weightTotal := uint(0)
for _, wf := range fn.Provider.Weighted {
weightTotal += wf.Weight
}

if weightTotal < 1 {
return &function.ErrFunctionValidation{Message: "Function weights sum to zero."}
}

return nil
}

Expand Down
25 changes: 2 additions & 23 deletions libkv/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,34 +317,13 @@ func TestValidateFunction_MissingID(t *testing.T) {
Message: "Key: 'Function.ID' Error:Field validation for 'ID' failed on the 'required' tag"})
}

func TestValidateFunction_EmulatorMissingURL(t *testing.T) {
service := &Service{Log: zap.NewNop()}

err := service.validateFunction(&function.Function{ID: "id", Provider: &function.Provider{Type: function.Emulator}})

assert.Equal(t, err, &function.ErrFunctionValidation{
Message: "Missing required field emulatorURL for Emulator function."})
}

func TestValidateFunction_EmulatorMissingAPIVersion(t *testing.T) {
service := &Service{Log: zap.NewNop()}

fn := &function.Function{
ID: "id",
Provider: &function.Provider{Type: function.Emulator, EmulatorURL: "http://example.com"}}
err := service.validateFunction(fn)

assert.Equal(t, err, &function.ErrFunctionValidation{
Message: "Missing required field apiVersion for Emulator function."})
}

func TestValidateFunction_SpaceInvalid(t *testing.T) {
service := &Service{Log: zap.NewNop()}

fn := &function.Function{
ID: "id",
Space: "///",
Provider: &function.Provider{Type: function.Emulator, EmulatorURL: "http://example.com"}}
Provider: &function.Provider{Type: function.HTTPEndpoint, URL: "http://example.com"}}
err := service.validateFunction(fn)

assert.Equal(t, err, &function.ErrFunctionValidation{
Expand All @@ -356,7 +335,7 @@ func TestValidateFunction_SetDefaultSpace(t *testing.T) {

fn := &function.Function{
ID: "id",
Provider: &function.Provider{Type: function.Emulator, EmulatorURL: "http://example.com"}}
Provider: &function.Provider{Type: function.HTTPEndpoint, URL: "http://example.com"}}
service.validateFunction(fn)

assert.Equal(t, "default", fn.Space)
Expand Down
16 changes: 1 addition & 15 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,20 +359,6 @@ func (router *Router) enqueueWork(path string, event *eventpkg.Event) {

// callFunction looks up a function and calls it.
func (router *Router) callFunction(space string, backingFunctionID function.ID, event eventpkg.Event) ([]byte, error) {
backingFunction := router.targetCache.Function(space, backingFunctionID)
if backingFunction == nil {
return []byte{}, errUnableToLookUpRegisteredFunction
}

var chosenFunction = backingFunction.ID
if backingFunction.Provider.Type == function.Weighted {
chosen, err := backingFunction.Provider.Weighted.Choose()
if err != nil {
return nil, err
}
chosenFunction = chosen
}

router.log.Debug("Invoking function.",
zap.String("space", space),
zap.String("functionId", string(backingFunctionID)),
Expand All @@ -386,7 +372,7 @@ func (router *Router) callFunction(space string, backingFunctionID function.ID,
}

// Call the target backing function.
f := router.targetCache.Function(space, chosenFunction)
f := router.targetCache.Function(space, backingFunctionID)
if f == nil {
return []byte{}, errUnableToLookUpRegisteredFunction
}
Expand Down
4 changes: 2 additions & 2 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestRouterServeHTTP_InvokeEventFunctionNotFound(t *testing.T) {
target := mock.NewMockTargeter(ctrl)
target.EXPECT().Function("default", function.ID("testfunc")).Return(nil).MaxTimes(1)
target.EXPECT().InvokableFunction("/", "default", function.ID("testfunc")).Return(true).MaxTimes(1)
target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]router.FunctionInfo{}).MaxTimes(1)
target.EXPECT().SubscribersOfEvent("/", gomock.Any()).Return([]router.FunctionInfo{}).MaxTimes(2)
router := testrouter(target)

req, _ := http.NewRequest(http.MethodPost, "/", nil)
Expand All @@ -76,7 +76,7 @@ func TestRouterServeHTTP_InvokeEventDefaultSpace(t *testing.T) {
target := mock.NewMockTargeter(ctrl)
target.EXPECT().Function("default", function.ID("testfunc")).Return(nil).MaxTimes(1)
target.EXPECT().InvokableFunction("/", "default", function.ID("testfunc")).Return(true).MaxTimes(1)
target.EXPECT().SubscribersOfEvent("/", event.SystemEventReceivedType).Return([]router.FunctionInfo{}).MaxTimes(1)
target.EXPECT().SubscribersOfEvent("/", gomock.Any()).Return([]router.FunctionInfo{}).MaxTimes(2)
router := testrouter(target)

req, _ := http.NewRequest(http.MethodPost, "/", nil)
Expand Down

0 comments on commit 393787c

Please sign in to comment.