Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ export class IntegrationProcessor extends LoggingBase {

if (streams.length > 0 || failedStreams.length > 0) {
logger.warn(
`${failedStreams.length} streams have not been successfully processed or are remaining - retrying them with delay! We have ${streams.length} streams left to process in total!`,
`${failedStreams.length} streams have not been successfully processed - retrying them with delay! We also have ${streams.length} remaining streams left to process!`,
)

const existingRetryStreams = req.retryStreams || []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import moment from 'moment/moment'
import lodash from 'lodash'
import { DiscordMessages, DiscordMembers, DiscordMention } from '../../types/discordTypes'
import {
DiscordMessages,
DiscordMembers,
DiscordMention,
DiscordStreamProcessResult,
} from '../../types/discordTypes'
import { DISCORD_CONFIG } from '../../../../config'
import { DiscordMemberAttributes } from '../../../../database/attributes/member/discord'
import { MemberAttributeName } from '../../../../database/attributes/member/enums'
Expand All @@ -15,15 +20,15 @@ import { IntegrationType, PlatformType } from '../../../../types/integrationEnum
import { timeout } from '../../../../utils/timing'
import Operations from '../../../dbOperations/operations'
import { DiscordGrid } from '../../grid/discordGrid'
import { AddActivitiesSingle } from '../../types/messageTypes'
import { Channels } from '../../types/regularTypes'
import getChannels from '../../usecases/discord/getChannels'
import getMembers from '../../usecases/discord/getMembers'
import getMessages from '../../usecases/discord/getMessages'
import getThreads from '../../usecases/discord/getThreads'
import { IntegrationServiceBase } from '../integrationServiceBase'
import { sendNodeWorkerMessage } from '../../../utils/nodeWorkerSQS'
import { NodeWorkerIntegrationProcessMessage } from '../../../../types/mq/nodeWorkerIntegrationProcessMessage'
import { AddActivitiesSingle } from '../../types/messageTypes'
import { singleOrDefault } from '../../../../utils/arrays'

/* eslint class-methods-use-this: 0 */

Expand Down Expand Up @@ -78,13 +83,6 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
async preprocess(context: IStepContext): Promise<void> {
const guildId = context.integration.integrationIdentifier

const threads: Channels = await getThreads(
{
guildId,
token: this.getToken(context),
},
this.logger(context),
)
let channelsFromDiscordAPI: Channels = await getChannels(
{
guildId,
Expand All @@ -105,15 +103,12 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
return c
})

const channelsWithThreads = channelsFromDiscordAPI.concat(threads)

context.pipelineData = {
channelsFromDiscordAPI,
channels: channelsWithThreads,
channelsInfo: channelsWithThreads.reduce((acc, channel) => {
settingsChannels: channels,
channels: channelsFromDiscordAPI,
channelsInfo: channelsFromDiscordAPI.reduce((acc, channel) => {
acc[channel.id] = {
name: channel.name,
thread: !!channel.thread,
new: !!channel.new,
}
return acc
Expand All @@ -139,8 +134,9 @@ export class DiscordIntegrationService extends IntegrationServiceBase {

return predefined.concat(
context.pipelineData.channels.map((c) => ({
value: c.id,
value: 'channel',
metadata: {
id: c.id,
page: '',
},
})),
Expand All @@ -162,7 +158,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
) {
try {
const { fn, arg } = DiscordIntegrationService.getUsecase(
stream.value,
stream,
context.pipelineData.guildId,
)
const { records, nextPage, limit, timeUntilReset } = await fn(
Expand All @@ -176,7 +172,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
)

const nextPageStream = nextPage
? { value: stream.value, metadata: { page: nextPage } }
? { value: stream.value, metadata: { ...stream.metadata, page: nextPage } }
: undefined

const sleep = limit <= 1 ? timeUntilReset : undefined
Expand All @@ -189,7 +185,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
}
}

const activities = this.parseActivities(stream, context, records)
const { activities, newStreams } = this.parseActivities(stream, context, records)

const lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined
return {
Expand All @@ -202,6 +198,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
lastRecord,
lastRecordTimestamp: lastRecord ? lastRecord.timestamp.getTime() : undefined,
nextPageStream,
newStreams,
sleep,
}
} catch (err) {
Expand Down Expand Up @@ -244,7 +241,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
)

default:
if (context.pipelineData.channelsInfo[currentStream.value].new) return false
if (context.pipelineData.channelsInfo[currentStream.metadata.id].new) return false

return IntegrationServiceBase.isRetrospectOver(
lastRecordTimestamp,
Expand All @@ -259,44 +256,36 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
failedStreams?: IIntegrationStream[],
remainingStreams?: IIntegrationStream[],
): Promise<void> {
context.integration.settings.channels = context.pipelineData.channelsFromDiscordAPI.map(
(ch) => {
const { new: _, ...raw } = ch
return raw
},
)
context.integration.settings.channels = context.pipelineData.channels.map((ch) => {
const { new: _, ...raw } = ch
return raw
})
}

parseActivities(
stream: IIntegrationStream,
context: IStepContext,
records: DiscordMessages | DiscordMembers,
): AddActivitiesSingle[] {
): DiscordStreamProcessResult {
switch (stream.value) {
case 'members':
return this.parseMembers(context.integration.tenantId, records as DiscordMembers)
return this.parseMembers(context, records as DiscordMembers)
default:
return this.parseMessages(
context.pipelineData.guildId,
context.integration.tenantId,
context.pipelineData.channelsInfo,
records as DiscordMessages,
stream,
)
return this.parseMessages(context, records as DiscordMessages, stream)
}
}

parseMembers(tenantId: string, records: Array<any>): Array<AddActivitiesSingle> {
parseMembers(context: IStepContext, records: Array<any>): DiscordStreamProcessResult {
// We only need the members if they are not bots
return records.reduce((acc, record) => {
const activities: AddActivitiesSingle[] = records.reduce((acc, record) => {
if (!record.user.bot) {
let avatarUrl: string | boolean = false

if (record.user.avatar !== null && record.user.avatar !== undefined) {
avatarUrl = `https://cdn.discordapp.com/avatars/${record.user.id}/${record.user.avatar}.png`
}
acc.push({
tenant: tenantId,
tenant: context.integration.tenantId,
platform: PlatformType.DISCORD,
type: 'joined_guild',
sourceId: IntegrationServiceBase.generateSourceIdHash(
Expand Down Expand Up @@ -325,26 +314,49 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
}
return acc
}, [])

return {
activities,
newStreams: [],
}
}

parseMessages(
guildId: string,
tenantId: string,
channelsInfo: any,
context: IStepContext,
records: DiscordMessages,
stream: IIntegrationStream,
): Array<AddActivitiesSingle> {
return records.reduce((acc, record) => {
): DiscordStreamProcessResult {
const newStreams: IIntegrationStream[] = []
const activities: AddActivitiesSingle[] = records.reduce((acc, record) => {
let parent = ''

// if we're parsing a thread, mark each message as a child
const channelInfo = channelsInfo[stream.value]
if (channelInfo.thread) {
parent = stream.value
}
const channelInfo = context.pipelineData.channelsInfo[stream.metadata.id]

// is the message starting a thread?
if (record.thread) {
parent = record.thread.id
newStreams.push({
value: 'thread',
metadata: {
id: record.thread.id,
},
})

context.pipelineData.channelsInfo[record.thread.id] = {
name: context.pipelineData.channelsInfo[record.channel_id].name,
new:
singleOrDefault(
context.pipelineData.settingsChannels,
(c) => c.id === record.thread.id,
) === undefined,
}
}
// if we're parsing a thread, mark each message as a child of this thread
else if (stream.value === 'thread') {
parent = stream.metadata.id
}
// record.parentId means that it's a reply
if (record.message_reference && record.message_reference.message_id) {
else if (record.message_reference && record.message_reference.message_id) {
parent = record.message_reference.message_id
}

Expand All @@ -355,7 +367,7 @@ export class DiscordIntegrationService extends IntegrationServiceBase {

if (!record.author.bot) {
const activityObject = {
tenant: tenantId,
tenant: context.integration.tenantId,
platform: PlatformType.DISCORD,
type: 'message',
sourceId: record.id,
Expand All @@ -364,10 +376,10 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
body: record.content
? DiscordIntegrationService.replaceMentions(record.content, record.mentions)
: '',
url: `https://discordapp.com/channels/${guildId}/${stream.value}/${record.id}`,
url: `https://discordapp.com/channels/${context.pipelineData.guildId}/${stream.metadata.id}/${record.id}`,
channel: channelInfo.name,
attributes: {
thread: channelInfo.thread ? channelInfo.name : false,
thread: record.thread !== undefined || stream.value === 'thread',
reactions: record.reactions ? record.reactions : [],
attachments: record.attachments ? record.attachments : [],
},
Expand All @@ -392,11 +404,17 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
}
return acc
}, [])

return {
activities,
newStreams,
}
}

/**
* Parse mentions
* @param text Message text
* @param mentions
* @returns Message text, swapping mention IDs by mentions
*/
private static replaceMentions(text: string, mentions: [DiscordMention] | undefined): string {
Expand All @@ -423,17 +441,20 @@ export class DiscordIntegrationService extends IntegrationServiceBase {
* @returns The function to call, as well as its main argument
*/
private static getUsecase(
stream: string,
stream: IIntegrationStream,
guildId: string,
): {
fn: Function
arg: any
} {
switch (stream) {
switch (stream.value) {
case 'members':
return { fn: getMembers, arg: { guildId } }
case 'channel':
case 'thread':
return { fn: getMessages, arg: { channelId: stream.metadata.id } }
default:
return { fn: getMessages, arg: { channelId: stream } }
throw new Error(`Unknown stream ${stream.value}!`)
}
}
}
11 changes: 11 additions & 0 deletions backend/src/serverless/integrations/types/discordTypes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { AddActivitiesSingle } from './messageTypes'
import { IIntegrationStream } from '../../../types/integration/stepResult'

export interface DiscordGetChannelsInput {
guildId: string
token: string
Expand Down Expand Up @@ -53,10 +56,18 @@ export interface DiscordMessage {
guild_id: string
channel_id: string
}
thread: {
id: string
}
}

export type DiscordMessages = DiscordMessage[]

export interface DiscordStreamProcessResult {
activities: AddActivitiesSingle[]
newStreams: IIntegrationStream[]
}

export interface DiscordMember {
user: DiscordAuthor
joined_at: string
Expand Down