This repository has been archived by the owner on Nov 16, 2020. It is now read-only.
/
driver.go
129 lines (101 loc) · 3.54 KB
/
driver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
///////////////////////////////////////////////////////////////////////
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
///////////////////////////////////////////////////////////////////////
package riff
import (
"context"
"encoding/json"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
kapi "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/vmware/dispatch/lib/riff"
"github.com/vmware/dispatch/pkg/functions"
"github.com/vmware/dispatch/pkg/trace"
)
const consumerGroupID = "dispatch-riff-driver"
const correlationIDHeader = "correlationId" // header propagated by riff function-sidecar
// Config contains the riff configuration
type Config struct {
KafkaBrokers []string
K8sConfig string
FuncNamespace string
FuncDefaultLimits *functions.FunctionResources
FuncDefaultRequests *functions.FunctionResources
ZookeeperLocation string
}
type riffDriver struct {
requester *riff.Requester
riffTalk *riff.RiffTalk
}
type systemError struct {
Err error `json:"err"`
}
func (err *systemError) Error() string {
return err.Err.Error()
}
func (err *systemError) AsSystemErrorObject() interface{} {
return err
}
func (err *systemError) StackTrace() errors.StackTrace {
if e, ok := err.Err.(functions.StackTracer); ok {
return e.StackTrace()
}
return nil
}
func (d *riffDriver) Close() error {
return d.requester.Close()
}
// New creates a new riff driver
func New(config *Config) (functions.FaaSDriver, error) {
requester, err := riff.NewRequester(correlationIDHeader, consumerGroupID, config.KafkaBrokers, config.ZookeeperLocation)
if err != nil {
return nil, err
}
funcDefaultResourceReq := kapi.ResourceRequirements{}
if config.FuncDefaultLimits != nil {
funcDefaultResourceReq.Limits = kapi.ResourceList{
kapi.ResourceCPU: resource.MustParse(config.FuncDefaultLimits.CPU),
kapi.ResourceMemory: resource.MustParse(config.FuncDefaultLimits.Memory)}
}
if config.FuncDefaultRequests != nil {
funcDefaultResourceReq.Requests = kapi.ResourceList{
kapi.ResourceCPU: resource.MustParse(config.FuncDefaultRequests.CPU),
kapi.ResourceMemory: resource.MustParse(config.FuncDefaultRequests.Memory)}
}
d := &riffDriver{
requester: requester,
riffTalk: riff.NewRiffTalk(config.K8sConfig, config.FuncNamespace),
}
return d, nil
}
func (d *riffDriver) Create(ctx context.Context, f *functions.Function) error {
span, ctx := trace.Trace(ctx, "")
defer span.Finish()
return d.riffTalk.Create(fnID(f.FaasID), f.FunctionImageURL)
}
func (d *riffDriver) Delete(ctx context.Context, f *functions.Function) error {
return d.riffTalk.Delete(fnID(f.FaasID))
}
func (d *riffDriver) GetRunnable(e *functions.FunctionExecution) functions.Runnable {
return func(ctx functions.Context, in interface{}) (interface{}, error) {
bytesIn, _ := json.Marshal(functions.Message{Context: ctx, Payload: in})
topic := fnID(e.FaasID)
log.Debugf("Posting to topic '%s': '%s'", topic, string(bytesIn))
resBytes, err := d.requester.Request(topic, e.RunID, bytesIn)
if err != nil {
return nil, &systemError{errors.Wrapf(err, "riff: error invoking function: '%s', runID: '%s'", e.FunctionID, e.RunID)}
}
var out functions.Message
if err := json.Unmarshal(resBytes, &out); err != nil {
return nil, &systemError{errors.Errorf("cannot JSON-parse result from riff: %s %s", err, string(resBytes))}
}
ctx.AddLogs(out.Context.Logs())
ctx.SetError(out.Context.GetError())
return out.Payload, nil
}
}
func fnID(id string) string {
return "fn-" + id
}