Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// noinspection ExceptionCaughtLocallyJS

import moment from 'moment'
import { v4 as uuid } from 'uuid'
import { createChildLogger } from '../../../utils/logging'
Expand Down Expand Up @@ -204,7 +206,18 @@ export class IntegrationProcessor extends LoggingBase {

// preprocess if needed
logger.trace('Preprocessing integration!')
await intService.preprocess(stepContext)
try {
await intService.preprocess(stepContext)
} catch (err) {
if (err.rateLimitResetSeconds) {
// need to delay integration processing
logger.warn(err, 'Rate limit reached while preprocessing integration! Delaying...')
await sendNodeWorkerMessage(req.tenantId, req, err.rateLimitResetSeconds + 5)
return
}

throw err
}

// detect streams to process for this integration
let streams: IIntegrationStream[]
Expand All @@ -227,11 +240,23 @@ export class IntegrationProcessor extends LoggingBase {
}
} else {
logger.trace('Detecting streams!')
streams = await intService.getStreams(stepContext)
try {
streams = await intService.getStreams(stepContext)
} catch (err) {
if (err.rateLimitResetSeconds) {
// need to delay integration processing
logger.warn(err, 'Rate limit reached while getting integration streams! Delaying...')
await sendNodeWorkerMessage(req.tenantId, req, err.rateLimitResetSeconds + 5)
return
}

throw err
}
}

// delay for retries/continuing with the remaining streams (in seconds)
let delay: number = 5
let stopProcessing = false

if (streams.length > 0) {
logger.info({ streamCount: streams.length }, 'Detected streams to process!')
Expand All @@ -246,7 +271,22 @@ export class IntegrationProcessor extends LoggingBase {
{ stream: stream.value, remainingStreams: streams.length },
`Processing stream.`,
)
const processStreamResult = await intService.processStream(stream, stepContext)
let processStreamResult
try {
processStreamResult = await intService.processStream(stream, stepContext)
} catch (err) {
if (err.rateLimitResetSeconds) {
delay = err.rateLimitResetSeconds + 5
stopProcessing = true
} else {
throw err
}
}

if (stopProcessing) {
failedStreams.push(stream)
break
}

if (processStreamResult.newStreams && processStreamResult.newStreams.length > 0) {
logger.info(
Expand Down Expand Up @@ -368,6 +408,10 @@ export class IntegrationProcessor extends LoggingBase {
retryCount,
stream: failedStream,
})

if (delay < retryCount * 5) {
delay = retryCount * 5
}
}
}

Expand Down
16 changes: 3 additions & 13 deletions backend/src/serverless/integrations/types/slackTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,15 @@ export interface SlackMember {

export type SlackMembers = SlackMember[]

export interface SlackParsedReponse {
export interface SlackParsedResponse {
records: any
nextPage: string
limit: number
timeUntilReset: number
}

export interface SlackGetChannelsOutput extends SlackParsedReponse {
records: SlackChannels
}

export interface SlackGetMessagesOutput extends SlackParsedReponse {
records: SlackMessages | []
}

export interface SlackGetMembersOutput extends SlackParsedReponse {
export interface SlackGetMembersOutput extends SlackParsedResponse {
records: SlackMembers | []
}

export interface SlackGetMemberOutput extends SlackParsedReponse {
export interface SlackGetMemberOutput extends SlackParsedResponse {
records: SlackMember
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import axios from 'axios'
import { SlackChannels, SlackGetChannelsInput } from '../../types/slackTypes'
import { Logger } from '../../../../utils/logging'
import { timeout } from '../../../../utils/timing'
import { RateLimitError } from '../../../../types/integration/rateLimitError'

async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise<SlackChannels> {
await timeout(2000)
Expand All @@ -25,8 +26,14 @@ async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promis
id: c.id,
}))
} catch (err) {
logger.error({ err, input }, 'Error while getting channels from Slack')
throw err
if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) {
logger.warn('Slack API rate limit exceeded')
const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10)
throw new RateLimitError(rateLimitResetSeconds, '/conversations.list')
} else {
logger.error({ err, input }, 'Error while getting channels from Slack')
throw err
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions backend/src/serverless/integrations/usecases/slack/getMember.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import axios from 'axios'
import { SlackGetMemberInput, SlackGetMemberOutput } from '../../types/slackTypes'
import { Logger } from '../../../../utils/logging'
import { timeout } from '../../../../utils/timing'
import { RateLimitError } from '../../../../types/integration/rateLimitError'

async function getMembers(
input: SlackGetMemberInput,
Expand Down Expand Up @@ -30,8 +31,14 @@ async function getMembers(
timeUntilReset,
}
} catch (err) {
logger.error({ err, input }, 'Error while getting members from Slack')
throw err
if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) {
logger.warn('Slack API rate limit exceeded')
const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10)
throw new RateLimitError(rateLimitResetSeconds, '/users.info')
} else {
logger.error({ err, input }, 'Error while getting members from Slack')
throw err
}
}
}

Expand Down
15 changes: 9 additions & 6 deletions backend/src/serverless/integrations/usecases/slack/getMembers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import axios from 'axios'
import { SlackGetMembersInput, SlackGetMembersOutput, SlackMembers } from '../../types/slackTypes'
import { Logger } from '../../../../utils/logging'
import { timeout } from '../../../../utils/timing'
import { RateLimitError } from '../../../../types/integration/rateLimitError'

async function getMembers(
input: SlackGetMembersInput,
Expand All @@ -20,18 +21,20 @@ async function getMembers(

const response = await axios(config)
const records: SlackMembers = response.data.members
const limit = 100
const timeUntilReset = 0
const nextPage = response.data.response_metadata?.next_cursor || ''
return {
records,
nextPage,
limit,
timeUntilReset,
}
} catch (err) {
logger.error({ err, input }, 'Error while getting members from Slack')
throw err
if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) {
logger.warn('Slack API rate limit exceeded')
const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10)
throw new RateLimitError(rateLimitResetSeconds, '/users.list')
} else {
logger.error({ err, input }, 'Error while getting members from Slack')
throw err
}
}
}

Expand Down
19 changes: 11 additions & 8 deletions backend/src/serverless/integrations/usecases/slack/getMessages.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import axios from 'axios'
import { SlackMessages, SlackParsedReponse, SlackGetMessagesInput } from '../../types/slackTypes'
import { SlackMessages, SlackParsedResponse, SlackGetMessagesInput } from '../../types/slackTypes'
import { Logger } from '../../../../utils/logging'
import { timeout } from '../../../../utils/timing'
import { RateLimitError } from '../../../../types/integration/rateLimitError'

async function getMessages(
input: SlackGetMessagesInput,
logger: Logger,
): Promise<SlackParsedReponse> {
): Promise<SlackParsedResponse> {
await timeout(2000)

try {
Expand All @@ -21,18 +22,20 @@ async function getMessages(
const response = await axios(config)
const records: SlackMessages = response.data.messages

const limit = 100
const timeUntilReset = 0
const nextPage = response.data.response_metadata?.next_cursor || ''
return {
records,
nextPage,
limit,
timeUntilReset,
}
} catch (err) {
logger.error({ err, input }, 'Error while getting messages from Slack')
throw err
if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) {
logger.warn('Slack API rate limit exceeded')
const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10)
throw new RateLimitError(rateLimitResetSeconds, '/conversation.history')
} else {
logger.error({ err, input }, 'Error while getting messages from Slack')
throw err
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import axios from 'axios'
import {
SlackMessages,
SlackParsedReponse,
SlackParsedResponse,
SlackGetMessagesInThreadsInput,
} from '../../types/slackTypes'
import { Logger } from '../../../../utils/logging'
import { timeout } from '../../../../utils/timing'
import { RateLimitError } from '../../../../types/integration/rateLimitError'

async function getMessagesInThreads(
input: SlackGetMessagesInThreadsInput,
logger: Logger,
): Promise<SlackParsedReponse> {
): Promise<SlackParsedResponse> {
await timeout(2000)

try {
Expand All @@ -24,18 +25,20 @@ async function getMessagesInThreads(

const response = await axios(config)
const records: SlackMessages = response.data.messages
const limit = 100
const timeUntilReset = 0
const nextPage = response.data.response_metadata?.next_cursor || ''
return {
records,
nextPage,
limit,
timeUntilReset,
}
} catch (err) {
logger.error({ err, input }, 'Error while getting messages from Slack')
throw err
if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) {
logger.warn('Slack API rate limit exceeded')
const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10)
throw new RateLimitError(rateLimitResetSeconds, '/conversation.replies')
} else {
logger.error({ err, input }, 'Error while getting messages from Slack')
throw err
}
}
}

Expand Down
17 changes: 11 additions & 6 deletions backend/src/serverless/integrations/usecases/slack/getTeam.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import axios from 'axios'
import { SlackTeam, SlackGetChannelsInput } from '../../types/slackTypes'
import { SlackGetChannelsInput, SlackTeam } from '../../types/slackTypes'
import { Logger } from '../../../../utils/logging'
import { timeout } from '../../../../utils/timing'
import { RateLimitError } from '../../../../types/integration/rateLimitError'

async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise<SlackTeam> {
await timeout(2000)
Expand All @@ -16,12 +17,16 @@ async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promis
}

const response = await axios(config)
const result: SlackTeam = response.data.team

return result
return response.data.team
} catch (err) {
logger.error({ err, input }, 'Error while getting channels from Slack')
throw err
if (err && err.response && err.response.status === 429 && err.response.headers['Retry-After']) {
logger.warn('Slack API rate limit exceeded')
const rateLimitResetSeconds = parseInt(err.response.headers['Retry-After'], 10)
throw new RateLimitError(rateLimitResetSeconds, '/team.info')
} else {
logger.error({ err, input }, 'Error while getting channels from Slack')
throw err
}
}
}

Expand Down
14 changes: 14 additions & 0 deletions backend/src/types/baseError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
export class BaseError extends Error {
public name: string

public message: string

public stack?: string

constructor(message: string) {
super(message)
this.name = this.constructor.name
this.message = message
Error.captureStackTrace(this, this.constructor)
}
}
10 changes: 10 additions & 0 deletions backend/src/types/integration/rateLimitError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { BaseError } from '../baseError'

export class RateLimitError extends BaseError {
public rateLimitResetSeconds: number

constructor(rateLimitResetSeconds: number, endpoint: string) {
super(`Endpoint: '${endpoint}' rate limit exceeded`)
this.rateLimitResetSeconds = rateLimitResetSeconds
}
}