/
kinesis.go
67 lines (57 loc) · 2.19 KB
/
kinesis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package archetype
import (
"context"
"reflect"
"runtime"
awsLambdaEvents "github.com/aws/aws-lambda-go/events"
sparta "github.com/mweagle/Sparta"
gocf "github.com/mweagle/go-cloudformation"
"github.com/pkg/errors"
)
// KinesisReactor represents a lambda function that responds to Kinesis messages
type KinesisReactor interface {
// OnEvent when an SNS event occurs. Check the snsEvent field
// for the specific event
OnKinesisMessage(ctx context.Context,
kinesisEvent awsLambdaEvents.KinesisEvent) (interface{}, error)
}
// KinesisReactorFunc is a free function that adapts a KinesisReactor
// compliant signature into a function that exposes an OnEvent
// function
type KinesisReactorFunc func(ctx context.Context,
kinesisEvent awsLambdaEvents.KinesisEvent) (interface{}, error)
// OnKinesisMessage satisfies the KinesisReactor interface
func (reactorFunc KinesisReactorFunc) OnKinesisMessage(ctx context.Context,
kinesisEvent awsLambdaEvents.KinesisEvent) (interface{}, error) {
return reactorFunc(ctx, kinesisEvent)
}
// ReactorName provides the name of the reactor func
func (reactorFunc KinesisReactorFunc) ReactorName() string {
return runtime.FuncForPC(reflect.ValueOf(reactorFunc).Pointer()).Name()
}
// NewKinesisReactor returns an Kinesis reactor lambda function
func NewKinesisReactor(reactor KinesisReactor,
kinesisStream gocf.Stringable,
startingPosition string,
batchSize int64,
additionalLambdaPermissions []sparta.IAMRolePrivilege) (*sparta.LambdaAWSInfo, error) {
reactorLambda := func(ctx context.Context, kinesisEvent awsLambdaEvents.KinesisEvent) (interface{}, error) {
return reactor.OnKinesisMessage(ctx, kinesisEvent)
}
lambdaFn, lambdaFnErr := sparta.NewAWSLambda(reactorName(reactor),
reactorLambda,
sparta.IAMRoleDefinition{})
if lambdaFnErr != nil {
return nil, errors.Wrapf(lambdaFnErr, "attempting to create reactor")
}
lambdaFn.EventSourceMappings = append(lambdaFn.EventSourceMappings,
&sparta.EventSourceMapping{
EventSourceArn: kinesisStream,
StartingPosition: startingPosition,
BatchSize: batchSize,
})
if len(additionalLambdaPermissions) != 0 {
lambdaFn.RoleDefinition.Privileges = additionalLambdaPermissions
}
return lambdaFn, nil
}