Skip to content

Commit

Permalink
GH-47 Make acceptance test pass
Browse files Browse the repository at this point in the history
Implemented connect-websocket, disconnect-websocket, and update-user lambda
  • Loading branch information
ceilfors committed Jun 3, 2019
1 parent 4e40b0c commit cdb799e
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 16 deletions.
42 changes: 26 additions & 16 deletions packages/laconia-acceptance-test/acceptance-test/acceptance.spec.js
Expand Up @@ -23,13 +23,13 @@ frisby.globalSetup({
}
});

const deleteAllItems = async tableName => {
const deleteAllItems = async (tableName, keyName, keyValue) => {
const params = { TableName: tableName };
const data = await documentClient.scan(params).promise();
for (const item of data.Items) {
const deleteParams = {
TableName: tableName,
Key: { orderId: item.orderId }
Key: { [keyName]: keyValue(item) }
};
await documentClient.delete(deleteParams).promise();
}
Expand Down Expand Up @@ -72,16 +72,14 @@ const getOrderUrl = async () => {
};

const getWebSocketUrl = async () => {
const apig = new AWS.APIGateway();
const restApis = await apig.getRestApis().promise();
const restApiName = `${SERVERLESS_STAGE}-${SERVERLESS_SERVICE_NAME}`;
const restApi = restApis.items.find(i => i.name === restApiName);
if (!restApi) {
throw new Error(`${restApiName} could not be found!`);
const apig = new AWS.ApiGatewayV2();
const wsApis = await apig.getApis().promise();
const wsApiName = `${SERVERLESS_STAGE}-${SERVERLESS_SERVICE_NAME}-websockets`;
const wsApi = wsApis.Items.find(i => i.Name === wsApiName);
if (!wsApi) {
throw new Error(`${wsApiName} could not be found!`);
}
return `wss://${
restApi.id
}.execute-api.eu-west-1.amazonaws.com/${SERVERLESS_STAGE}`;
return `${wsApi.ApiEndpoint}/${SERVERLESS_STAGE}`;
};

const waitForOrderMessage = url => {
Expand All @@ -94,8 +92,14 @@ const waitForOrderMessage = url => {

ws.on("error", err => {
console.log("websocket error", err);
ws.close();
reject(err);
});

setTimeout(() => {
ws.close();
reject(new Error("Did not get message from web socket"));
}, 20000);
});
};

Expand All @@ -118,7 +122,12 @@ describe("order flow", () => {
});

beforeAll(async () => {
await deleteAllItems(name("order"));
await deleteAllItems(name("order"), "orderId", item => item.orderId);
await deleteAllItems(
name("connection"),
"connectionId",
item => item.connectionId
);
orderRepository = new DynamoDbOrderRepository(name("order"));
totalOrderStorage = new S3TotalOrderStorage(
new AWS.S3(),
Expand Down Expand Up @@ -156,6 +165,8 @@ describe("order flow", () => {
acc[orderIds[i]] = order;
return acc;
}, {});

orderMessagePromise = waitForOrderMessage(await getWebSocketUrl());
});

describe("happy path", () => {
Expand All @@ -179,15 +190,14 @@ describe("order flow", () => {
}, 20000);

it("should receive a reply from order websocket", async () => {
orderMessagePromise = waitForOrderMessage(await getWebSocketUrl());

const message = await orderMessagePromise;
expect(JSON.parse(message)).toEqual(
expect.objectContaining({
eventType: "accepted",
orderId: expect.any(String)
message: "order accepted"
})
);

// TODO: send message back, and probably close?
}, 20000);

it("should capture all card payments", async () => {
Expand Down
5 changes: 5 additions & 0 deletions packages/laconia-acceptance-test/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/laconia-acceptance-test/package.json
Expand Up @@ -41,6 +41,7 @@
"@laconia/middleware-lambda-warmer": "^1.2.0",
"@laconia/test": "^1.2.0",
"@laconia/xray": "^1.2.0",
"aws-apigatewaymanagementapi": "^0.1.0",
"p-wait-for": "^1.0.0",
"pino": "^4.16.1",
"r2": "^2.0.1",
Expand Down
48 changes: 48 additions & 0 deletions packages/laconia-acceptance-test/serverless.yml
Expand Up @@ -24,6 +24,7 @@ provider:
- "dynamodb:Scan"
- "dynamodb:Query"
- "dynamodb:PutItem"
- "dynamodb:DeleteItem"
- "xray:PutTraceSegments"
- "xray:PutTelemetryRecords"
Resource: "*"
Expand Down Expand Up @@ -56,6 +57,7 @@ custom:
orderEventsStreamName: ${self:custom.fullName}-order-events
restaurantNotificationTopicName: ${self:custom.fullName}-restaurant-notification
orderDynamoDbBatchTableName: ${self:custom.fullName}-order
connectionDynamoDbTableName: ${self:custom.fullName}-connection
remover:
buckets:
- ${self:custom.trackerBucketName}
Expand All @@ -68,6 +70,12 @@ custom:
- - "https://"
- Ref: "ApiGatewayRestApi"
- ".execute-api.#{AWS::Region}.amazonaws.com/${self:provider.stage}"
webSocketEndpoint:
Fn::Join:
- ""
- - "https://"
- Ref: "WebsocketsApi"
- ".execute-api.#{AWS::Region}.amazonaws.com/${self:provider.stage}"

functions:
place-order:
Expand Down Expand Up @@ -164,6 +172,33 @@ functions:
environment:
API_BASE_URL: ${self:custom.apigatewayUrl}

update-user:
handler: src/update-user.handler
events:
- stream:
type: kinesis
arn:
Fn::GetAtt:
- OrderEventsStream
- Arn
environment:
CONNECTION_TABLE_NAME: ${self:custom.connectionDynamoDbTableName}
WEBSOCKET_ENDPOINT: ${self:custom.webSocketEndpoint}
connect-websocket:
handler: src/connect-websocket.handler
events:
- websocket:
route: $connect
environment:
CONNECTION_TABLE_NAME: ${self:custom.connectionDynamoDbTableName}
disconnect-websocket:
handler: src/disconnect-websocket.handler
events:
- websocket:
route: $disconnect
environment:
CONNECTION_TABLE_NAME: ${self:custom.connectionDynamoDbTableName}

resources:
Resources:
S3Tracker:
Expand Down Expand Up @@ -201,3 +236,16 @@ resources:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: ${self:custom.orderDynamoDbBatchTableName}
ConnectionTable:
Type: "AWS::DynamoDB::Table"
Properties:
AttributeDefinitions:
- AttributeName: "connectionId"
AttributeType: "S"
KeySchema:
- AttributeName: "connectionId"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: ${self:custom.connectionDynamoDbTableName}
55 changes: 55 additions & 0 deletions packages/laconia-acceptance-test/src/DynamoDbWebSocketServer.js
@@ -0,0 +1,55 @@
const AWS = require("aws-sdk");
const ensureApiGatewayManagementApi = require("aws-apigatewaymanagementapi");

ensureApiGatewayManagementApi(AWS);

module.exports = class DynamoDbWebSocketServer {
constructor(connectionTableName, endpoint) {
this.connectionTableName = connectionTableName;
this.DDB = new AWS.DynamoDB();
this.documentClient = new AWS.DynamoDB.DocumentClient();
this.endpoint = endpoint;
}

addConnection(connectionId) {
const putParams = {
TableName: this.connectionTableName,
Item: { connectionId }
};

return this.documentClient.put(putParams).promise();
}

removeConnection(connectionId) {
const deleteParams = {
TableName: this.connectionTableName,
Key: { connectionId }
};

return this.documentClient.delete(deleteParams).promise();
}

async broadcast(message) {
const client = new AWS.ApiGatewayManagementApi({
apiVersion: "2018-11-29",
endpoint: this.endpoint
});

const data = await this.documentClient
.scan({
TableName: this.connectionTableName
})
.promise();

return Promise.all(
data.Items.map(item =>
client
.postToConnection({
ConnectionId: item.connectionId,
Data: JSON.stringify(message)
})
.promise()
)
);
}
};
18 changes: 18 additions & 0 deletions packages/laconia-acceptance-test/src/connect-websocket.js
@@ -0,0 +1,18 @@
const laconia = require("@laconia/core");
const { res } = require("@laconia/event").apigateway;
const DynamoDbWebSocketServer = require("./DynamoDbWebSocketServer");

const app = async (connectionId, { webSocketServer }) => {
return webSocketServer.addConnection(connectionId);
};

const adapter = app => async (event, laconiaContext) => {
await app(event.requestContext.connectionId, laconiaContext);
return res("success", 200);
};

exports.handler = laconia(adapter(app)).register(() => ({
webSocketServer: new DynamoDbWebSocketServer(
process.env.CONNECTION_TABLE_NAME
)
}));
18 changes: 18 additions & 0 deletions packages/laconia-acceptance-test/src/disconnect-websocket.js
@@ -0,0 +1,18 @@
const laconia = require("@laconia/core");
const { res } = require("@laconia/event").apigateway;
const DynamoDbWebSocketServer = require("./DynamoDbWebSocketServer");

const app = async (connectionId, { webSocketServer }) => {
return webSocketServer.removeConnection(connectionId);
};

const adapter = app => async (event, laconiaContext) => {
await app(event.requestContext.connectionId, laconiaContext);
return res("success", 200);
};

exports.handler = laconia(adapter(app)).register(() => ({
webSocketServer: new DynamoDbWebSocketServer(
process.env.CONNECTION_TABLE_NAME
)
}));
18 changes: 18 additions & 0 deletions packages/laconia-acceptance-test/src/update-user.js
@@ -0,0 +1,18 @@
const laconia = require("@laconia/core");
const kinesis = require("@laconia/adapter").kinesis();
const DynamoDbWebSocketServer = require("./DynamoDbWebSocketServer");

const app = async (orderEvents, { webSocketServer, event }) => {
const acceptedEvents = orderEvents.filter(o => o.eventType === "accepted");

if (acceptedEvents.length > 0) {
return webSocketServer.broadcast({ message: "order accepted" });
}
};

exports.handler = laconia(kinesis(app)).register(() => ({
webSocketServer: new DynamoDbWebSocketServer(
process.env.CONNECTION_TABLE_NAME,
process.env.WEBSOCKET_ENDPOINT
)
}));

0 comments on commit cdb799e

Please sign in to comment.