Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { addSeconds, singleOrDefault } from '@crowd/common'
import { DbStore } from '@crowd/database'
import { INTEGRATION_SERVICES, IProcessStreamContext } from '@crowd/integrations'
import { Logger, LoggerBase, getChildLogger } from '@crowd/logging'
import { RedisCache, RedisClient } from '@crowd/redis'
import { RedisCache, RedisClient, RateLimiter } from '@crowd/redis'
import {
IntegrationDataWorkerEmitter,
IntegrationRunWorkerEmitter,
Expand Down Expand Up @@ -181,6 +181,8 @@ export default class IntegrationStreamService extends LoggerBase {
this.log,
)

const globalCache = new RedisCache(`int-global`, this.redisClient, this.log)

const nangoConfig = NANGO_CONFIG()

const context: IProcessStreamContext = {
Expand Down Expand Up @@ -209,6 +211,7 @@ export default class IntegrationStreamService extends LoggerBase {

log: this.log,
cache,
globalCache,

publishData: async (data) => {
await this.publishData(
Expand Down Expand Up @@ -241,6 +244,9 @@ export default class IntegrationStreamService extends LoggerBase {
this.log.error({ message }, 'Aborting run with error!')
await this.triggerRunError(streamInfo.runId, 'stream-run-abort', message, metadata, error)
},
getRateLimiter: (maxRequests: number, timeWindowSeconds: number, counterKey: string) => {
return new RateLimiter(globalCache, maxRequests, timeWindowSeconds, counterKey)
},
}

this.log.debug('Marking stream as in progress!')
Expand Down
21 changes: 21 additions & 0 deletions services/libs/integrations/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IProcessStreamContext } from '@/types'
import { PlatformType } from '@crowd/types'
import { RedditGetCommentsInput, RedditCommentsResponse } from '../types'
import { timeout } from '@crowd/common'
import { getRateLimiter } from './handleRateLimit'

/**
* Get the comment tree of a post.
Expand All @@ -16,12 +17,17 @@ async function getComments(
ctx: IProcessStreamContext,
): Promise<RedditCommentsResponse> {
try {
const rateLimiter = getRateLimiter(ctx)

ctx.log.info({ message: 'Fetching comments from a post in a sub-reddit', input })

// Wait for 1.5s for rate limits.
// eslint-disable-next-line no-promise-executor-return
await timeout(1500)

// Check if we can make a request - if not, it will throw a RateLimitError
await rateLimiter.checkRateLimit('getComments')

// Gett an access token from Nango
const accessToken = await getNangoToken(input.nangoId, PlatformType.REDDIT, ctx)

Expand All @@ -36,6 +42,9 @@ async function getComments(
},
}

// we are going to make a request, so increment the rate limit
await rateLimiter.incrementRateLimit()

const response: RedditCommentsResponse = (await axios(config)).data
return response
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IProcessStreamContext } from '@/types'
import { PlatformType } from '@crowd/types'
import { RedditMoreCommentsInput, RedditMoreCommentsResponse } from '../types'
import { timeout } from '@crowd/common'
import { getRateLimiter } from './handleRateLimit'

/**
* Expand a list of comment IDs into a comment tree.
Expand All @@ -17,12 +18,17 @@ async function getMoreComments(
ctx: IProcessStreamContext,
): Promise<RedditMoreCommentsResponse> {
try {
const rateLimiter = getRateLimiter(ctx)

ctx.log.info({ message: 'Fetching more comments from a sub-reddit', input })

// Wait for 1.5s for rate limits.
// eslint-disable-next-line no-promise-executor-return
await timeout(1500)

// Check if we can make a request - if not, it will throw a RateLimitError
await rateLimiter.checkRateLimit('getMoreComments')

// Gett an access token from Nango
const accessToken = await getNangoToken(input.nangoId, PlatformType.REDDIT, ctx)

Expand All @@ -39,6 +45,9 @@ async function getMoreComments(
},
}

// we are going to make a request, so increment the rate limit
await rateLimiter.incrementRateLimit()

const response: RedditMoreCommentsResponse = (await axios(config)).data
return response
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IProcessStreamContext } from '@/types'
import { PlatformType } from '@crowd/types'
import { RedditGetPostsInput, RedditPostsResponse, REDDIT_MAX_RETROSPECT_IN_HOURS } from '../types'
import { timeout } from '@crowd/common'
import { getRateLimiter } from './handleRateLimit'

/**
* Get paginated posts from a subreddit
Expand All @@ -16,12 +17,17 @@ async function getPosts(
ctx: IProcessStreamContext,
): Promise<RedditPostsResponse> {
try {
const rateLimiter = getRateLimiter(ctx)

ctx.log.info({ message: 'Fetching posts from a sub-reddit', input })

// Wait for 1.5s for rate limits.
// eslint-disable-next-line no-promise-executor-return
await timeout(1500)

// Check if we can make a request - if not, it will throw a RateLimitError
await rateLimiter.checkRateLimit('getPosts')

// Gett an access token from Nango
const accessToken = await getNangoToken(input.nangoId, PlatformType.REDDIT, ctx)

Expand All @@ -41,6 +47,9 @@ async function getPosts(
config.params.after = input.after
}

// we are going to make a request, so increment the rate limit
await rateLimiter.incrementRateLimit()

const response: RedditPostsResponse = (await axios(config)).data

// If ctx.onboarding is false, check the last post's date
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { IProcessStreamContext } from '@/types'

const REDDIT_RATE_LIMIT = 100
const REDDIT_RATE_LIMIT_TIME = 60
const REDIS_KEY = 'reddit-request-count'

export const getRateLimiter = (ctx: IProcessStreamContext) => {
return ctx.getRateLimiter(REDDIT_RATE_LIMIT, REDDIT_RATE_LIMIT_TIME, REDIS_KEY)
}
6 changes: 5 additions & 1 deletion services/libs/integrations/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IMemberAttribute, IActivityData } from '@crowd/types'
import { Logger } from '@crowd/logging'
import { ICache, IIntegration, IIntegrationStream } from '@crowd/types'
import { ICache, IIntegration, IIntegrationStream, IRateLimiter } from '@crowd/types'

export interface IIntegrationContext {
onboarding: boolean
Expand All @@ -27,6 +27,10 @@ export interface IProcessStreamContext extends IIntegrationContext {
publishData: <T>(data: T) => Promise<void>

abortWithError: (message: string, metadata?: unknown, error?: Error) => Promise<void>

globalCache: ICache

getRateLimiter: (maxRequests: number, timeWindowSeconds: number, cacheKey: string) => IRateLimiter
}

export interface IProcessDataContext extends IIntegrationContext {
Expand Down
16 changes: 16 additions & 0 deletions services/libs/redis/src/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ export class RedisCache extends LoggerBase implements ICache {
}
}

async increment(key: string, incrementBy = 1, ttlSeconds?: number): Promise<number> {
const actualKey = this.prefixer(key)

if (ttlSeconds !== undefined) {
const [incrResult] = await this.client
.multi()
.incrBy(actualKey, incrementBy)
.expire(actualKey, ttlSeconds)
.exec()
return incrResult as number
}

const result = await this.client.incrBy(actualKey, incrementBy)
return result
}

public setIfNotExistsAlready(key: string, value: string): Promise<boolean> {
const actualKey = this.prefixer(key)
return this.client.setNX(actualKey, value)
Expand Down
1 change: 1 addition & 0 deletions services/libs/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export * from './pubsub'
export * from './cache'

export * from './instances'
export * from './rateLimiter'
30 changes: 30 additions & 0 deletions services/libs/redis/src/rateLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { ICache, IRateLimiter, RateLimitError } from '@crowd/types'

export class RateLimiter implements IRateLimiter {
constructor(
private readonly cache: ICache,
private readonly maxRequests: number,
private readonly timeWindowSeconds: number,
private readonly counterKey: string,
) {
this.cache = cache
this.maxRequests = maxRequests
this.timeWindowSeconds = timeWindowSeconds
this.counterKey = counterKey
}

public async checkRateLimit(endpoint: string) {
const value = await this.cache.get(this.counterKey)
const requestCount = value === null ? 0 : parseInt(value)
const canMakeRequest = requestCount < this.maxRequests

if (!canMakeRequest) {
const sleepTime = this.timeWindowSeconds + Math.floor(Math.random() * this.maxRequests)
throw new RateLimitError(sleepTime, endpoint)
}
}

public async incrementRateLimit() {
await this.cache.increment(this.counterKey, 1, this.timeWindowSeconds)
}
}
6 changes: 6 additions & 0 deletions services/libs/types/src/caching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@ export interface ICache {
get(key: string): Promise<string | null>
set(key: string, value: string, ttlSeconds: number): Promise<void>
delete(key: string): Promise<number>
increment(key: string, incrementBy?: number, ttlSeconds?: number): Promise<number>
}

export interface IRateLimiter {
checkRateLimit(endpoint: string): Promise<void>
incrementRateLimit(): Promise<void>
}