Skip to content

Commit dee2e78

Browse files
omerdemirokactions-user
authored andcommitted
feat: add AWS Lambda Event Source Mapping adapter (#2708)
- Add lambda-event-source-mapping adapter using GetListAdapterV2 - Support Get, List, and Search operations by UUID and ARN - Link to Lambda functions via FunctionArn with SEARCH method - Link to event sources (SQS, DynamoDB, Kinesis, Kafka, MQ, RDS/DocumentDB, ElastiCache) - Map event source mapping states to appropriate health statuses - Handle DocumentDB ARNs correctly (use RDS service identifier) - Skip creating links for unknown event source services - Include comprehensive test coverage with mock implementations - Register adapter in main proc.go registry Resolves: Add support for Lambda Event Source Mapping discovery GitOrigin-RevId: d17e1ea296f30407ddd34dc18b09a9f8f19216f6
1 parent ed35129 commit dee2e78

File tree

5 files changed

+696
-18
lines changed

5 files changed

+696
-18
lines changed
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
package adapters
2+
3+
import (
4+
"context"
5+
6+
"github.com/aws/aws-sdk-go-v2/service/lambda"
7+
"github.com/aws/aws-sdk-go-v2/service/lambda/types"
8+
9+
"github.com/overmindtech/cli/aws-source/adapterhelpers"
10+
"github.com/overmindtech/cli/sdp-go"
11+
)
12+
13+
type lambdaEventSourceMappingClient interface {
14+
ListEventSourceMappings(ctx context.Context, params *lambda.ListEventSourceMappingsInput, optFns ...func(*lambda.Options)) (*lambda.ListEventSourceMappingsOutput, error)
15+
GetEventSourceMapping(ctx context.Context, params *lambda.GetEventSourceMappingInput, optFns ...func(*lambda.Options)) (*lambda.GetEventSourceMappingOutput, error)
16+
}
17+
18+
func eventSourceMappingListFunc(ctx context.Context, client lambdaEventSourceMappingClient, _ string) ([]*types.EventSourceMappingConfiguration, error) {
19+
out, err := client.ListEventSourceMappings(ctx, &lambda.ListEventSourceMappingsInput{})
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
var items []*types.EventSourceMappingConfiguration
25+
for _, mapping := range out.EventSourceMappings {
26+
items = append(items, &mapping)
27+
}
28+
29+
return items, nil
30+
}
31+
32+
// convertGetEventSourceMappingOutputToConfiguration converts a GetEventSourceMappingOutput to EventSourceMappingConfiguration
33+
func convertGetEventSourceMappingOutputToConfiguration(output *lambda.GetEventSourceMappingOutput) *types.EventSourceMappingConfiguration {
34+
return &types.EventSourceMappingConfiguration{
35+
AmazonManagedKafkaEventSourceConfig: output.AmazonManagedKafkaEventSourceConfig,
36+
BatchSize: output.BatchSize,
37+
BisectBatchOnFunctionError: output.BisectBatchOnFunctionError,
38+
DestinationConfig: output.DestinationConfig,
39+
DocumentDBEventSourceConfig: output.DocumentDBEventSourceConfig,
40+
EventSourceArn: output.EventSourceArn,
41+
EventSourceMappingArn: output.EventSourceMappingArn,
42+
FilterCriteria: output.FilterCriteria,
43+
FilterCriteriaError: output.FilterCriteriaError,
44+
FunctionArn: output.FunctionArn,
45+
FunctionResponseTypes: output.FunctionResponseTypes,
46+
KMSKeyArn: output.KMSKeyArn,
47+
LastModified: output.LastModified,
48+
LastProcessingResult: output.LastProcessingResult,
49+
MaximumBatchingWindowInSeconds: output.MaximumBatchingWindowInSeconds,
50+
MaximumRecordAgeInSeconds: output.MaximumRecordAgeInSeconds,
51+
MaximumRetryAttempts: output.MaximumRetryAttempts,
52+
MetricsConfig: output.MetricsConfig,
53+
ParallelizationFactor: output.ParallelizationFactor,
54+
ProvisionedPollerConfig: output.ProvisionedPollerConfig,
55+
Queues: output.Queues,
56+
ScalingConfig: output.ScalingConfig,
57+
SelfManagedEventSource: output.SelfManagedEventSource,
58+
SelfManagedKafkaEventSourceConfig: output.SelfManagedKafkaEventSourceConfig,
59+
SourceAccessConfigurations: output.SourceAccessConfigurations,
60+
StartingPosition: output.StartingPosition,
61+
StartingPositionTimestamp: output.StartingPositionTimestamp,
62+
State: output.State,
63+
StateTransitionReason: output.StateTransitionReason,
64+
Topics: output.Topics,
65+
TumblingWindowInSeconds: output.TumblingWindowInSeconds,
66+
UUID: output.UUID,
67+
}
68+
}
69+
70+
func eventSourceMappingOutputMapper(query, scope string, awsItem *types.EventSourceMappingConfiguration) (*sdp.Item, error) {
71+
attributes, err := adapterhelpers.ToAttributesWithExclude(awsItem)
72+
if err != nil {
73+
return nil, err
74+
}
75+
76+
// Set the unique attribute (UUID)
77+
if awsItem.UUID != nil {
78+
err = attributes.Set("UUID", *awsItem.UUID)
79+
if err != nil {
80+
return nil, err
81+
}
82+
}
83+
84+
item := sdp.Item{
85+
Type: "lambda-event-source-mapping",
86+
UniqueAttribute: "UUID",
87+
Attributes: attributes,
88+
Scope: scope,
89+
}
90+
91+
// Link to the Lambda function if FunctionArn is present
92+
if awsItem.FunctionArn != nil {
93+
parsedARN, err := adapterhelpers.ParseARN(*awsItem.FunctionArn)
94+
if err == nil {
95+
item.LinkedItemQueries = append(item.LinkedItemQueries, &sdp.LinkedItemQuery{
96+
Query: &sdp.Query{
97+
Type: "lambda-function",
98+
Method: sdp.QueryMethod_SEARCH,
99+
Query: *awsItem.FunctionArn,
100+
Scope: adapterhelpers.FormatScope(parsedARN.AccountID, parsedARN.Region),
101+
},
102+
BlastPropagation: &sdp.BlastPropagation{
103+
// They are tightly linked
104+
In: true,
105+
Out: true,
106+
},
107+
})
108+
}
109+
}
110+
111+
// Link to the event source if EventSourceArn is present
112+
if awsItem.EventSourceArn != nil {
113+
parsedARN, err := adapterhelpers.ParseARN(*awsItem.EventSourceArn)
114+
if err == nil {
115+
var queryType string
116+
117+
switch parsedARN.Service {
118+
case "dynamodb":
119+
queryType = "dynamodb-table"
120+
case "kinesis":
121+
queryType = "kinesis-stream"
122+
case "sqs":
123+
queryType = "sqs-queue"
124+
case "kafka":
125+
queryType = "kafka-cluster"
126+
case "mq":
127+
queryType = "mq-broker"
128+
// Note: DocumentDB clusters use the RDS service identifier ("rds") in their ARNs.
129+
// Therefore, we map both RDS and DocumentDB clusters to "rds-db-cluster" here.
130+
case "rds":
131+
queryType = "rds-db-cluster"
132+
default:
133+
// Skip creating links for unknown services
134+
queryType = ""
135+
}
136+
137+
// Only create link if we have a valid queryType
138+
if queryType != "" {
139+
item.LinkedItemQueries = append(item.LinkedItemQueries, &sdp.LinkedItemQuery{
140+
Query: &sdp.Query{
141+
Type: queryType,
142+
Method: sdp.QueryMethod_SEARCH,
143+
Query: *awsItem.EventSourceArn,
144+
Scope: adapterhelpers.FormatScope(parsedARN.AccountID, parsedARN.Region),
145+
},
146+
BlastPropagation: &sdp.BlastPropagation{
147+
// Changing the event source will affect the mapping
148+
In: true,
149+
// Changing the mapping won't affect the event source
150+
Out: false,
151+
},
152+
})
153+
}
154+
}
155+
}
156+
157+
// Set health status based on state
158+
if awsItem.State != nil {
159+
switch *awsItem.State {
160+
case "Enabled":
161+
item.Health = sdp.Health_HEALTH_OK.Enum()
162+
case "Creating":
163+
item.Health = sdp.Health_HEALTH_PENDING.Enum()
164+
case "Deleting":
165+
item.Health = sdp.Health_HEALTH_PENDING.Enum()
166+
case "Disabled":
167+
item.Health = nil
168+
case "Enabling":
169+
item.Health = sdp.Health_HEALTH_PENDING.Enum()
170+
case "Updating":
171+
item.Health = sdp.Health_HEALTH_PENDING.Enum()
172+
case "Disabling":
173+
item.Health = sdp.Health_HEALTH_PENDING.Enum()
174+
}
175+
}
176+
177+
return &item, nil
178+
}
179+
180+
func NewLambdaEventSourceMappingAdapter(client lambdaEventSourceMappingClient, accountID string, region string) *adapterhelpers.GetListAdapter[*types.EventSourceMappingConfiguration, lambdaEventSourceMappingClient, *lambda.Options] {
181+
return &adapterhelpers.GetListAdapter[*types.EventSourceMappingConfiguration, lambdaEventSourceMappingClient, *lambda.Options]{
182+
ItemType: "lambda-event-source-mapping",
183+
Client: client,
184+
AccountID: accountID,
185+
Region: region,
186+
AdapterMetadata: lambdaEventSourceMappingAdapterMetadata,
187+
GetFunc: func(ctx context.Context, client lambdaEventSourceMappingClient, scope, query string) (*types.EventSourceMappingConfiguration, error) {
188+
out, err := client.GetEventSourceMapping(ctx, &lambda.GetEventSourceMappingInput{
189+
UUID: &query,
190+
})
191+
if err != nil {
192+
return nil, err
193+
}
194+
return convertGetEventSourceMappingOutputToConfiguration(out), nil
195+
},
196+
ListFunc: eventSourceMappingListFunc,
197+
SearchFunc: func(ctx context.Context, client lambdaEventSourceMappingClient, scope string, query string) ([]*types.EventSourceMappingConfiguration, error) {
198+
// Use the query directly as event source ARN input to ListEventSourceMappings
199+
out, err := client.ListEventSourceMappings(ctx, &lambda.ListEventSourceMappingsInput{
200+
EventSourceArn: &query,
201+
})
202+
if err != nil {
203+
return nil, err
204+
}
205+
206+
response := make([]*types.EventSourceMappingConfiguration, 0, len(out.EventSourceMappings))
207+
for _, mapping := range out.EventSourceMappings {
208+
response = append(response, &mapping)
209+
}
210+
211+
return response, nil
212+
},
213+
ItemMapper: func(query, scope string, awsItem *types.EventSourceMappingConfiguration) (*sdp.Item, error) {
214+
return eventSourceMappingOutputMapper(query, scope, awsItem)
215+
},
216+
}
217+
}
218+
219+
var lambdaEventSourceMappingAdapterMetadata = Metadata.Register(&sdp.AdapterMetadata{
220+
Type: "lambda-event-source-mapping",
221+
DescriptiveName: "Lambda Event Source Mapping",
222+
SupportedQueryMethods: &sdp.AdapterSupportedQueryMethods{
223+
Get: true,
224+
Search: true,
225+
List: true,
226+
GetDescription: "Get a Lambda event source mapping by UUID",
227+
SearchDescription: "Search for Lambda event source mappings by Event Source ARN (SQS, DynamoDB, Kinesis, etc.)",
228+
ListDescription: "List all Lambda event source mappings",
229+
},
230+
TerraformMappings: []*sdp.TerraformMapping{
231+
{
232+
TerraformQueryMap: "aws_lambda_event_source_mapping.arn",
233+
TerraformMethod: sdp.QueryMethod_SEARCH,
234+
},
235+
},
236+
PotentialLinks: []string{
237+
"lambda-function",
238+
"dynamodb-table",
239+
"kinesis-stream",
240+
"sqs-queue",
241+
"kafka-cluster",
242+
"mq-broker",
243+
"rds-db-cluster",
244+
},
245+
Category: sdp.AdapterCategory_ADAPTER_CATEGORY_COMPUTE_APPLICATION,
246+
})

0 commit comments

Comments
 (0)