From e8055f26654cc8c22f66c0980ee53870324d21fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 22 Nov 2022 08:03:42 +0100 Subject: [PATCH 1/2] Rate lmit check for slack integration and integration processor rate limit error detection and delaying --- .../services/integrationProcessor.ts | 50 +++++++++++++++++-- .../integrations/types/slackTypes.ts | 16 ++---- .../usecases/slack/getChannels.ts | 11 +++- .../integrations/usecases/slack/getMember.ts | 11 +++- .../integrations/usecases/slack/getMembers.ts | 15 +++--- .../usecases/slack/getMessages.ts | 19 ++++--- .../usecases/slack/getMessagesInThreads.ts | 19 ++++--- .../integrations/usecases/slack/getTeam.ts | 17 ++++--- backend/src/types/baseError.ts | 14 ++++++ .../src/types/integration/rateLimitError.ts | 10 ++++ 10 files changed, 134 insertions(+), 48 deletions(-) create mode 100644 backend/src/types/baseError.ts create mode 100644 backend/src/types/integration/rateLimitError.ts diff --git a/backend/src/serverless/integrations/services/integrationProcessor.ts b/backend/src/serverless/integrations/services/integrationProcessor.ts index 2d7fd928ad..80432fd21a 100644 --- a/backend/src/serverless/integrations/services/integrationProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationProcessor.ts @@ -1,3 +1,5 @@ +// noinspection ExceptionCaughtLocallyJS + import moment from 'moment' import { v4 as uuid } from 'uuid' import { createChildLogger } from '../../../utils/logging' @@ -204,7 +206,18 @@ export class IntegrationProcessor extends LoggingBase { // preprocess if needed logger.trace('Preprocessing integration!') - await intService.preprocess(stepContext) + try { + await intService.preprocess(stepContext) + } catch (err) { + if (err.rateLimitResetSeconds) { + // need to delay integration processing + logger.warn(err, 'Rate limit reached while preprocessing integration! Delaying...') + await sendNodeWorkerMessage(req.tenantId, req, err.rateLimitResetSeconds + 5) + return + } + + throw err + } // detect streams to process for this integration let streams: IIntegrationStream[] @@ -227,11 +240,23 @@ export class IntegrationProcessor extends LoggingBase { } } else { logger.trace('Detecting streams!') - streams = await intService.getStreams(stepContext) + try { + streams = await intService.getStreams(stepContext) + } catch (err) { + if (err.rateLimitResetSeconds) { + // need to delay integration processing + logger.warn(err, 'Rate limit reached while getting integration streams! Delaying...') + await sendNodeWorkerMessage(req.tenantId, req, err.rateLimitResetSeconds + 5) + return + } + + throw err + } } // delay for retries/continuing with the remaining streams (in seconds) let delay: number = 5 + let stopProcessing = false if (streams.length > 0) { logger.info({ streamCount: streams.length }, 'Detected streams to process!') @@ -246,7 +271,22 @@ export class IntegrationProcessor extends LoggingBase { { stream: stream.value, remainingStreams: streams.length }, `Processing stream.`, ) - const processStreamResult = await intService.processStream(stream, stepContext) + let processStreamResult + try { + processStreamResult = await intService.processStream(stream, stepContext) + } catch (err) { + if (err.rateLimitResetSeconds) { + delay = err.RateLimitResetSeconds + 5 + stopProcessing = true + } else { + throw err + } + } + + if (stopProcessing) { + failedStreams.push(stream) + break + } if (processStreamResult.newStreams && processStreamResult.newStreams.length > 0) { logger.info( @@ -368,6 +408,10 @@ export class IntegrationProcessor extends LoggingBase { retryCount, stream: failedStream, }) + + if (delay < retryCount * 5) { + delay = retryCount * 5 + } } } diff --git a/backend/src/serverless/integrations/types/slackTypes.ts b/backend/src/serverless/integrations/types/slackTypes.ts index f2863c9477..cfa50dabbd 100644 --- a/backend/src/serverless/integrations/types/slackTypes.ts +++ b/backend/src/serverless/integrations/types/slackTypes.ts @@ -71,25 +71,15 @@ export interface SlackMember { export type SlackMembers = SlackMember[] -export interface SlackParsedReponse { +export interface SlackParsedResponse { records: any nextPage: string - limit: number - timeUntilReset: number } -export interface SlackGetChannelsOutput extends SlackParsedReponse { - records: SlackChannels -} - -export interface SlackGetMessagesOutput extends SlackParsedReponse { - records: SlackMessages | [] -} - -export interface SlackGetMembersOutput extends SlackParsedReponse { +export interface SlackGetMembersOutput extends SlackParsedResponse { records: SlackMembers | [] } -export interface SlackGetMemberOutput extends SlackParsedReponse { +export interface SlackGetMemberOutput extends SlackParsedResponse { records: SlackMember } diff --git a/backend/src/serverless/integrations/usecases/slack/getChannels.ts b/backend/src/serverless/integrations/usecases/slack/getChannels.ts index 6ed92d7e13..fafbb710b2 100644 --- a/backend/src/serverless/integrations/usecases/slack/getChannels.ts +++ b/backend/src/serverless/integrations/usecases/slack/getChannels.ts @@ -2,6 +2,7 @@ import axios from 'axios' import { SlackChannels, SlackGetChannelsInput } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' +import { RateLimitError } from '../../../../types/integration/rateLimitError' async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise { await timeout(2000) @@ -25,8 +26,14 @@ async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promis id: c.id, })) } catch (err) { - logger.error({ err, input }, 'Error while getting channels from Slack') - throw err + if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) { + logger.warn('Slack API rate limit exceeded') + const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10) + throw new RateLimitError(rateLimitResetSeconds, '/conversations.list') + } else { + logger.error({ err, input }, 'Error while getting channels from Slack') + throw err + } } } diff --git a/backend/src/serverless/integrations/usecases/slack/getMember.ts b/backend/src/serverless/integrations/usecases/slack/getMember.ts index 4958353859..ded25ffad5 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMember.ts +++ b/backend/src/serverless/integrations/usecases/slack/getMember.ts @@ -2,6 +2,7 @@ import axios from 'axios' import { SlackGetMemberInput, SlackGetMemberOutput } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' +import { RateLimitError } from '../../../../types/integration/rateLimitError' async function getMembers( input: SlackGetMemberInput, @@ -30,8 +31,14 @@ async function getMembers( timeUntilReset, } } catch (err) { - logger.error({ err, input }, 'Error while getting members from Slack') - throw err + if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) { + logger.warn('Slack API rate limit exceeded') + const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10) + throw new RateLimitError(rateLimitResetSeconds, '/users.info') + } else { + logger.error({ err, input }, 'Error while getting members from Slack') + throw err + } } } diff --git a/backend/src/serverless/integrations/usecases/slack/getMembers.ts b/backend/src/serverless/integrations/usecases/slack/getMembers.ts index 66b4f1318b..7286f01401 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMembers.ts +++ b/backend/src/serverless/integrations/usecases/slack/getMembers.ts @@ -2,6 +2,7 @@ import axios from 'axios' import { SlackGetMembersInput, SlackGetMembersOutput, SlackMembers } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' +import { RateLimitError } from '../../../../types/integration/rateLimitError' async function getMembers( input: SlackGetMembersInput, @@ -20,18 +21,20 @@ async function getMembers( const response = await axios(config) const records: SlackMembers = response.data.members - const limit = 100 - const timeUntilReset = 0 const nextPage = response.data.response_metadata?.next_cursor || '' return { records, nextPage, - limit, - timeUntilReset, } } catch (err) { - logger.error({ err, input }, 'Error while getting members from Slack') - throw err + if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) { + logger.warn('Slack API rate limit exceeded') + const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10) + throw new RateLimitError(rateLimitResetSeconds, '/users.list') + } else { + logger.error({ err, input }, 'Error while getting members from Slack') + throw err + } } } diff --git a/backend/src/serverless/integrations/usecases/slack/getMessages.ts b/backend/src/serverless/integrations/usecases/slack/getMessages.ts index 50c1dcfa8a..d365259205 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMessages.ts +++ b/backend/src/serverless/integrations/usecases/slack/getMessages.ts @@ -1,12 +1,13 @@ import axios from 'axios' -import { SlackMessages, SlackParsedReponse, SlackGetMessagesInput } from '../../types/slackTypes' +import { SlackMessages, SlackParsedResponse, SlackGetMessagesInput } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' +import { RateLimitError } from '../../../../types/integration/rateLimitError' async function getMessages( input: SlackGetMessagesInput, logger: Logger, -): Promise { +): Promise { await timeout(2000) try { @@ -21,18 +22,20 @@ async function getMessages( const response = await axios(config) const records: SlackMessages = response.data.messages - const limit = 100 - const timeUntilReset = 0 const nextPage = response.data.response_metadata?.next_cursor || '' return { records, nextPage, - limit, - timeUntilReset, } } catch (err) { - logger.error({ err, input }, 'Error while getting messages from Slack') - throw err + if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) { + logger.warn('Slack API rate limit exceeded') + const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10) + throw new RateLimitError(rateLimitResetSeconds, '/conversation.history') + } else { + logger.error({ err, input }, 'Error while getting messages from Slack') + throw err + } } } diff --git a/backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts b/backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts index 41df5104d4..8c34087d49 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts +++ b/backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts @@ -1,16 +1,17 @@ import axios from 'axios' import { SlackMessages, - SlackParsedReponse, + SlackParsedResponse, SlackGetMessagesInThreadsInput, } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' +import { RateLimitError } from '../../../../types/integration/rateLimitError' async function getMessagesInThreads( input: SlackGetMessagesInThreadsInput, logger: Logger, -): Promise { +): Promise { await timeout(2000) try { @@ -24,18 +25,20 @@ async function getMessagesInThreads( const response = await axios(config) const records: SlackMessages = response.data.messages - const limit = 100 - const timeUntilReset = 0 const nextPage = response.data.response_metadata?.next_cursor || '' return { records, nextPage, - limit, - timeUntilReset, } } catch (err) { - logger.error({ err, input }, 'Error while getting messages from Slack') - throw err + if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) { + logger.warn('Slack API rate limit exceeded') + const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10) + throw new RateLimitError(rateLimitResetSeconds, '/conversation.replies') + } else { + logger.error({ err, input }, 'Error while getting messages from Slack') + throw err + } } } diff --git a/backend/src/serverless/integrations/usecases/slack/getTeam.ts b/backend/src/serverless/integrations/usecases/slack/getTeam.ts index c28867819b..030480b796 100644 --- a/backend/src/serverless/integrations/usecases/slack/getTeam.ts +++ b/backend/src/serverless/integrations/usecases/slack/getTeam.ts @@ -1,7 +1,8 @@ import axios from 'axios' -import { SlackTeam, SlackGetChannelsInput } from '../../types/slackTypes' +import { SlackGetChannelsInput, SlackTeam } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' +import { RateLimitError } from '../../../../types/integration/rateLimitError' async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise { await timeout(2000) @@ -16,12 +17,16 @@ async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promis } const response = await axios(config) - const result: SlackTeam = response.data.team - - return result + return response.data.team } catch (err) { - logger.error({ err, input }, 'Error while getting channels from Slack') - throw err + if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) { + logger.warn('Slack API rate limit exceeded') + const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10) + throw new RateLimitError(rateLimitResetSeconds, '/team.info') + } else { + logger.error({ err, input }, 'Error while getting channels from Slack') + throw err + } } } diff --git a/backend/src/types/baseError.ts b/backend/src/types/baseError.ts new file mode 100644 index 0000000000..a03fa4147a --- /dev/null +++ b/backend/src/types/baseError.ts @@ -0,0 +1,14 @@ +export class BaseError extends Error { + public name: string + + public message: string + + public stack?: string + + constructor(message: string) { + super(message) + this.name = this.constructor.name + this.message = message + Error.captureStackTrace(this, this.constructor) + } +} diff --git a/backend/src/types/integration/rateLimitError.ts b/backend/src/types/integration/rateLimitError.ts new file mode 100644 index 0000000000..746502ce6f --- /dev/null +++ b/backend/src/types/integration/rateLimitError.ts @@ -0,0 +1,10 @@ +import { BaseError } from '../baseError' + +export class RateLimitError extends BaseError { + public rateLimitResetSeconds: number + + constructor(rateLimitResetSeconds: number, endpoint: string) { + super(`Endpoint: '${endpoint}' rate limit exceeded`) + this.rateLimitResetSeconds = rateLimitResetSeconds + } +} From 37237daa1fe646008449de429c3f0c362d7332e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 22 Nov 2022 15:52:13 +0100 Subject: [PATCH 2/2] bugfix --- .../serverless/integrations/services/integrationProcessor.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/serverless/integrations/services/integrationProcessor.ts b/backend/src/serverless/integrations/services/integrationProcessor.ts index 80432fd21a..7718888b65 100644 --- a/backend/src/serverless/integrations/services/integrationProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationProcessor.ts @@ -276,7 +276,7 @@ export class IntegrationProcessor extends LoggingBase { processStreamResult = await intService.processStream(stream, stepContext) } catch (err) { if (err.rateLimitResetSeconds) { - delay = err.RateLimitResetSeconds + 5 + delay = err.rateLimitResetSeconds + 5 stopProcessing = true } else { throw err