Skip to content

Commit 3d369fc

Browse files
authored
Merge pull request #24 from horike37/change-behavior-kinesis
feat: change default behavior of kinesis proxy
2 parents 84bdc00 + 1aaae26 commit 3d369fc

File tree

7 files changed

+357
-85
lines changed

7 files changed

+357
-85
lines changed

README.md

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,36 @@ Sample syntax for Kinesis proxy in `serverless.yml`.
3333
```yaml
3434
custom:
3535
apiGatewayServiceProxies:
36+
- kinesis: # partitionkey is set apigateway requestid by default
37+
path: /kinesis
38+
method: post
39+
streamName: { Ref: 'YourStream' }
40+
cors: true
41+
- kinesis:
42+
path: /kinesis
43+
method: post
44+
partitionKey: 'hardcordedkey' # use static partitionkey
45+
streamName: { Ref: 'YourStream' }
46+
cors: true
47+
- kinesis:
48+
path: /kinesis/{myKey} # use path parameter
49+
method: post
50+
partitionKey:
51+
pathParam: myKey
52+
streamName: { Ref: 'YourStream' }
53+
cors: true
54+
- kinesis:
55+
path: /kinesis
56+
method: post
57+
partitionKey:
58+
bodyParam: data.myKey # use body parameter
59+
streamName: { Ref: 'YourStream' }
60+
cors: true
3661
- kinesis:
3762
path: /kinesis
3863
method: post
64+
partitionKey:
65+
queryStringParam: myKey # use query string param
3966
streamName: { Ref: 'YourStream' }
4067
cors: true
4168

@@ -50,7 +77,7 @@ resources:
5077
Sample request after deploying.
5178
5279
```bash
53-
curl -X POST https://xxxxxxx.execute-api.us-east-1.amazonaws.com/dev/kinesis -d '{"Data": "some data","PartitionKey": "some key"}' -H 'Content-Type:application/json'
80+
curl -X POST https://xxxxxxx.execute-api.us-east-1.amazonaws.com/dev/kinesis -d '{"message": "some data"}' -H 'Content-Type:application/json'
5481
```
5582
5683
### SQS
@@ -78,6 +105,28 @@ Sample request after deploying.
78105
curl -X POST https://xxxxxx.execute-api.us-east-1.amazonaws.com/dev/sqs -d '{"message": "testtest"}' -H 'Content-Type:application/json'
79106
```
80107

108+
#### Customizing request parameters
109+
110+
If you'd like to pass additional data to the integration request, you can do so by including your custom [API Gateway request parameters](https://docs.aws.amazon.com/apigateway/latest/developerguide/request-response-data-mappings.html) in `serverless.yml` like so:
111+
112+
```yml
113+
custom:
114+
apiGatewayServiceProxies:
115+
- sqs:
116+
path: /queue
117+
method: post
118+
queueName: !GetAtt MyQueue.QueueName
119+
cors: true
120+
121+
requestParameters:
122+
'integration.request.querystring.MessageAttribute.1.Name': "'cognitoIdentityId'"
123+
'integration.request.querystring.MessageAttribute.1.Value.StringValue': 'context.identity.cognitoIdentityId'
124+
'integration.request.querystring.MessageAttribute.1.Value.DataType': "'String'"
125+
'integration.request.querystring.MessageAttribute.2.Name': "'cognitoAuthenticationProvider'"
126+
'integration.request.querystring.MessageAttribute.2.Value.StringValue': 'context.identity.cognitoAuthenticationProvider'
127+
'integration.request.querystring.MessageAttribute.2.Value.DataType': "'String'"
128+
```
129+
81130
### S3
82131

83132
Sample syntax for S3 proxy in `serverless.yml`.
@@ -231,11 +280,7 @@ resources:
231280

232281
Source: [AWS::ApiGateway::Method docs](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-apigateway-method.html#cfn-apigateway-method-authorizationtype)
233282

234-
## Specific features
235-
236-
### Kinesis
237-
238-
#### Customizing request body mapping templates
283+
### Customizing request body mapping templates
239284

240285
If you'd like to add content types or customize the default templates, you can do so by including your custom [API Gateway request mapping template](https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-mapping-template-reference.html) in `serverless.yml` like so:
241286

@@ -263,27 +308,3 @@ custom:
263308
```
264309

265310
Source: [How to connect SNS to Kinesis for cross-account delivery via API Gateway](https://theburningmonk.com/2019/07/how-to-connect-sns-to-kinesis-for-cross-account-delivery-via-api-gateway/)
266-
267-
### SQS
268-
269-
#### Customizing request parameters
270-
271-
If you'd like to pass additional data to the integration request, you can do so by including your custom [API Gateway request parameters](https://docs.aws.amazon.com/apigateway/latest/developerguide/request-response-data-mappings.html) in `serverless.yml` like so:
272-
273-
```yml
274-
custom:
275-
apiGatewayServiceProxies:
276-
- sqs:
277-
path: /queue
278-
method: post
279-
queueName: !GetAtt MyQueue.QueueName
280-
cors: true
281-
282-
requestParameters:
283-
'integration.request.querystring.MessageAttribute.1.Name': "'cognitoIdentityId'"
284-
'integration.request.querystring.MessageAttribute.1.Value.StringValue': 'context.identity.cognitoIdentityId'
285-
'integration.request.querystring.MessageAttribute.1.Value.DataType': "'String'"
286-
'integration.request.querystring.MessageAttribute.2.Name': "'cognitoAuthenticationProvider'"
287-
'integration.request.querystring.MessageAttribute.2.Value.StringValue': 'context.identity.cognitoAuthenticationProvider'
288-
'integration.request.querystring.MessageAttribute.2.Value.DataType': "'String'"
289-
```

__tests__/integration/kinesis/multiple-integrations/service/serverless.yml

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,39 @@ custom:
1414
- kinesis:
1515
path: /kinesis1
1616
method: post
17-
streamName: { Ref: 'YourStream1' }
17+
streamName: { Ref: 'YourStream' }
1818
cors: true
19-
2019
- kinesis:
2120
path: /kinesis2
2221
method: post
23-
streamName: { Ref: 'YourStream2' }
22+
partitionKey: 'hardcordedkey' # use static partitionkey
23+
streamName: { Ref: 'YourStream' }
24+
cors: true
25+
- kinesis:
26+
path: /kinesis3/{myKey} # use path parameter
27+
method: post
28+
partitionKey:
29+
pathParam: myKey
30+
streamName: { Ref: 'YourStream' }
31+
cors: true
32+
- kinesis:
33+
path: /kinesis4
34+
method: post
35+
partitionKey:
36+
bodyParam: data.myKey # use body parameter
37+
streamName: { Ref: 'YourStream' }
2438
cors: true
25-
2639
- kinesis:
27-
path: /kinesis3
40+
path: /kinesis5
2841
method: post
29-
streamName: { Ref: 'YourStream3' }
42+
partitionKey:
43+
queryStringParam: myKey # use query string param
44+
streamName: { Ref: 'YourStream' }
3045
cors: true
3146

3247
resources:
3348
Resources:
34-
YourStream1:
35-
Type: AWS::Kinesis::Stream
36-
Properties:
37-
ShardCount: 1
38-
YourStream2:
39-
Type: AWS::Kinesis::Stream
40-
Properties:
41-
ShardCount: 1
42-
YourStream3:
49+
YourStream:
4350
Type: AWS::Kinesis::Stream
4451
Properties:
4552
ShardCount: 1

__tests__/integration/kinesis/multiple-integrations/tests.js

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,81 @@ describe('Multiple Kinesis Proxy Integrations Test', () => {
1919
removeService(stage, config)
2020
})
2121

22-
it('should get correct response from multiple kinesis proxy endpoints', async () => {
23-
const streams = ['kinesis1', 'kinesis2', 'kinesis3']
24-
25-
for (const stream of streams) {
26-
const testEndpoint = `${endpoint}/${stream}`
27-
28-
const response = await fetch(testEndpoint, {
29-
method: 'POST',
30-
headers: { 'Content-Type': 'application/json' },
31-
body: JSON.stringify({ Data: `data for stream ${stream}`, PartitionKey: 'some key' })
32-
})
33-
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
34-
expect(response.status).to.be.equal(200)
35-
const body = await response.json()
36-
expect(body).to.have.own.property('ShardId')
37-
expect(body).to.have.own.property('SequenceNumber')
38-
}
22+
it('should get correct response from kinesis proxy endpoints with default partitionkey', async () => {
23+
const stream = 'kinesis1'
24+
const testEndpoint = `${endpoint}/${stream}`
25+
const response = await fetch(testEndpoint, {
26+
method: 'POST',
27+
headers: { 'Content-Type': 'application/json' },
28+
body: JSON.stringify({ message: `data for stream ${stream}` })
29+
})
30+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
31+
expect(response.status).to.be.equal(200)
32+
const body = await response.json()
33+
expect(body).to.have.own.property('ShardId')
34+
expect(body).to.have.own.property('SequenceNumber')
35+
})
36+
37+
it('should get correct response from kinesis proxy endpoints with hardcorded partitionkey', async () => {
38+
const stream = 'kinesis2'
39+
const testEndpoint = `${endpoint}/${stream}`
40+
const response = await fetch(testEndpoint, {
41+
method: 'POST',
42+
headers: { 'Content-Type': 'application/json' },
43+
body: JSON.stringify({ message: `data for stream ${stream}` })
44+
})
45+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
46+
expect(response.status).to.be.equal(200)
47+
const body = await response.json()
48+
expect(body).to.have.own.property('ShardId')
49+
expect(body).to.have.own.property('SequenceNumber')
50+
})
51+
52+
it('should get correct response from kinesis proxy endpoints with pathparam partitionkey', async () => {
53+
const stream = 'kinesis3'
54+
const partitionkey = 'partitionkey'
55+
const testEndpoint = `${endpoint}/${stream}/${partitionkey}`
56+
const response = await fetch(testEndpoint, {
57+
method: 'POST',
58+
headers: { 'Content-Type': 'application/json' },
59+
body: JSON.stringify({ message: `data for stream ${stream}` })
60+
})
61+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
62+
expect(response.status).to.be.equal(200)
63+
const body = await response.json()
64+
expect(body).to.have.own.property('ShardId')
65+
expect(body).to.have.own.property('SequenceNumber')
66+
})
67+
68+
it('should get correct response from kinesis proxy endpoints with bodyparam partitionkey', async () => {
69+
const stream = 'kinesis4'
70+
const partitionkey = 'partitionkey'
71+
const testEndpoint = `${endpoint}/${stream}`
72+
const response = await fetch(testEndpoint, {
73+
method: 'POST',
74+
headers: { 'Content-Type': 'application/json' },
75+
body: JSON.stringify({ message: `data for stream ${stream}`, data: { myKey: partitionkey } })
76+
})
77+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
78+
expect(response.status).to.be.equal(200)
79+
const body = await response.json()
80+
expect(body).to.have.own.property('ShardId')
81+
expect(body).to.have.own.property('SequenceNumber')
82+
})
83+
84+
it('should get correct response from kinesis proxy endpoints with queryStringParam partitionkey', async () => {
85+
const stream = 'kinesis5'
86+
const partitionkey = 'partitionkey'
87+
const testEndpoint = `${endpoint}/${stream}?myKey=${partitionkey}`
88+
const response = await fetch(testEndpoint, {
89+
method: 'POST',
90+
headers: { 'Content-Type': 'application/json' },
91+
body: JSON.stringify({ message: `data for stream ${stream}`, data: { mykey: partitionkey } })
92+
})
93+
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
94+
expect(response.status).to.be.equal(200)
95+
const body = await response.json()
96+
expect(body).to.have.own.property('ShardId')
97+
expect(body).to.have.own.property('SequenceNumber')
3998
})
4099
})

__tests__/integration/kinesis/single-integration/tests.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ describe('Single Kinesis Proxy Integration Test', () => {
2525
const response = await fetch(testEndpoint, {
2626
method: 'POST',
2727
headers: { 'Content-Type': 'application/json' },
28-
body: JSON.stringify({ Data: 'some data', PartitionKey: 'some key' })
28+
body: JSON.stringify({ message: 'some data' })
2929
})
3030
expect(response.headers.get('access-control-allow-origin')).to.deep.equal('*')
3131
expect(response.status).to.be.equal(200)

lib/package/kinesis/compileMethodsToKinesis.js

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,26 @@ module.exports = {
126126
}
127127
},
128128

129+
getKinesisObjectRequestParameter(http) {
130+
if (!_.has(http, 'partitionKey')) {
131+
return '$context.requestId'
132+
}
133+
134+
if (http.partitionKey.pathParam) {
135+
return `$input.params().path.${http.partitionKey.pathParam}`
136+
}
137+
138+
if (http.partitionKey.queryStringParam) {
139+
return `$input.params().querystring.${http.partitionKey.queryStringParam}`
140+
}
141+
142+
if (http.partitionKey.bodyParam) {
143+
return `$util.parseJson($input.body).${http.partitionKey.bodyParam}`
144+
}
145+
146+
return `${http.partitionKey}`
147+
},
148+
129149
buildDefaultKinesisRequestTemplate(http) {
130150
let streamName
131151
if (typeof http.streamName == 'object') {
@@ -134,6 +154,8 @@ module.exports = {
134154
streamName = `"${http.streamName}"`
135155
}
136156

157+
const objectRequestParam = this.getKinesisObjectRequestParameter(http)
158+
137159
return {
138160
'Fn::Join': [
139161
'',
@@ -142,8 +164,8 @@ module.exports = {
142164
'"StreamName": "',
143165
streamName,
144166
'",',
145-
'"Data": "$util.base64Encode($input.json(\'$.Data\'))",',
146-
'"PartitionKey": "$input.path(\'$.PartitionKey\')"',
167+
'"Data": "$util.base64Encode($input.body)",',
168+
`"PartitionKey": "${objectRequestParam}"`,
147169
'}'
148170
]
149171
]

0 commit comments

Comments
 (0)