Skip to content

Commit

Permalink
support struct and session in sqs handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Maximiliano Danelutti committed Jul 13, 2021
1 parent 1a1e113 commit 337ba46
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 18 deletions.
10 changes: 10 additions & 0 deletions lib/sqs-consumer.js
Expand Up @@ -6,6 +6,8 @@
* @typedef {import('./log-transport')} LogTransport
*/

const { ApiSession } = require('@janiscommerce/api-session');

module.exports = class SQSConsumer {

/**
Expand Down Expand Up @@ -35,4 +37,12 @@ module.exports = class SQSConsumer {
// eslint-disable-next-line no-unused-vars,no-empty-function
async processSingleRecord(record, logger) {}

/**
*
* @param {ApiSession.AuthenticationData} authenticationData
* @returns {ApiSession}
*/
setSession(authenticationData) {
this.session = new ApiSession(authenticationData);
}
};
144 changes: 131 additions & 13 deletions lib/sqs-handler.js
Expand Up @@ -33,6 +33,15 @@
* @property {Array<SQSRecord>} Records
*/

/**
* @typedef SQSRecordAndLoggerReadyToProcess
* @property {ParsedSQSRecord|ParsedSQSRecordWithLogger} preparedRecord
* @property {string} clientCode
* @property {LogTransport} logger
*/

const { struct } = require('@janiscommerce/superstruct');

const LogTransport = require('./log-transport');
const { SQSEventStruct } = require('./structs');

Expand All @@ -50,21 +59,69 @@ module.exports = class SQSHandler {

SQSEventStruct(event);

const consumer = new Consumer();
if(consumer.handlesBatch(event)) {
await consumer.processBatch(event.Records.map(record => {
const parsedRecord = this.parseRecord(record);
const logger = new LogTransport(record.messageId);
parsedRecord[Symbol.for('logger')] = logger;
return parsedRecord;
}));
} else {
await Promise.all(event.Records.map(record => {
const logger = new LogTransport(record.messageId);
return consumer.processSingleRecord(this.parseRecord(record), logger);
}));
const isBatch = (new Consumer()).handlesBatch(event);
if(isBatch)
await this.handleBatch(Consumer, event);
else
await this.handleSingle(Consumer, event);
}

/**
* Process records in batch but splitting by client or without client
*
* @param {import('./sqs-consumer')} Consumer
* @param {SQSEvent} event
* @param {boolean} isBatch
* @returns {Promise}
*/
static handleBatch(Consumer, event, isBatch = true) {
const recordsWithoutClient = [];
const recordsWithClient = {};
const consumerWithoutClient = new Consumer();
event.Records.forEach(record => {
const { preparedRecord, clientCode } = this.prepareRecord(consumerWithoutClient, record, isBatch);
if(clientCode) {
if(!recordsWithClient[clientCode])
recordsWithClient[clientCode] = [];

recordsWithClient[clientCode].push(preparedRecord);
} else
recordsWithoutClient.push(preparedRecord);
});

const batches = [];
if(recordsWithoutClient.length)
batches.push(consumerWithoutClient.processBatch(recordsWithoutClient));

if(Object.keys(recordsWithClient).length) {
Object.keys(recordsWithClient).forEach(clientCode => {
const consumer = new Consumer();
consumer.setSession({ clientCode });
batches.push(consumer.processBatch(recordsWithClient[clientCode]));
});
}

return Promise.all(batches);
}

/**
* Process records one by one
*
* @param {import('./sqs-consumer')} Consumer
* @param {SQSEvent} event
* @param {boolean} isBatch
* @returns {Promise}
*/
static handleSingle(Consumer, event, isBatch = false) {
return Promise.all(event.Records.map(record => {
const consumer = new Consumer();
const { preparedRecord, logger, clientCode } = this.prepareRecord(consumer, record, isBatch);

if(clientCode)
consumer.setSession({ clientCode });

return consumer.processSingleRecord(preparedRecord, logger);
}));
}

/**
Expand All @@ -83,4 +140,65 @@ module.exports = class SQSHandler {

}

/**
* Validates the struct if any
*
*/
static validateRecordStruct(consumer, record) {

if(!consumer.struct)
return record;

const args = !Array.isArray(consumer.struct) ? [consumer.struct] : consumer.struct;

const Schema = struct(...args);

const { body: bodyRecord, ...argsRecord } = record;

const [error, parsed] = Schema.validate(bodyRecord);

if(error)
throw new Error(error.reason || error.message);

return { body: parsed, ...argsRecord };
}

/**
* Prepare record to process in handler
*
* @param {import('./sqs-consumer')} consumer
* @param {SQSRecord} record
* @param {boolean} isBatch
* @returns {SQSRecordAndLoggerReadyToProcess} Ready to process
*/
static prepareRecord(consumer, record, isBatch) {
const parsedRecord = this.parseRecord(record);
const logger = new LogTransport(record.messageId);
const clientCode = this.getClient(record.messageAttributes);
const preparedRecord = this.validateRecordStruct(consumer, parsedRecord);

if(isBatch)
preparedRecord[Symbol.for('logger')] = logger;

return {
preparedRecord,
clientCode,
logger
};
}

/**
* Extract the client code from messageAttributes
*
* @param {SQSRecord.messageAttributes} messageAttributes
* @returns {string | undefined}
*/
static getClient(messageAttributes) {
if(!messageAttributes || !messageAttributes['janis-client'])
return;

const { stringValue: clientCode } = messageAttributes['janis-client'];

return clientCode;
}
};
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -38,6 +38,7 @@
"typescript": "^4.1.3"
},
"dependencies": {
"@janiscommerce/api-session": "^3.1.1",
"@janiscommerce/superstruct": "^1.1.1",
"lllog": "^1.1.2"
}
Expand Down
13 changes: 13 additions & 0 deletions tests/sqs-consumer.js
@@ -1,5 +1,6 @@
'use strict';

const { ApiSession } = require('@janiscommerce/api-session');
const assert = require('assert');
const { SQSConsumer } = require('../lib');
const LogTransport = require('../lib/log-transport');
Expand Down Expand Up @@ -32,4 +33,16 @@ describe('SQS Consumer', () => {
});
});

describe('setSession', () => {
it('Should set a session in consumer correctly', async () => {
const myConsumer = new SQSConsumer();
const clientCode = 'clientTest';

myConsumer.setSession({ clientCode });

assert(myConsumer.session instanceof ApiSession);
assert(myConsumer.session.clientCode === clientCode);
});
});

});

0 comments on commit 337ba46

Please sign in to comment.