From d178e6a94333baa1231873eefa310275fb6276e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 27 Feb 2023 11:42:32 +0100 Subject: [PATCH 1/7] wip --- .../integrations/slackIntegrationService.ts | 365 ++++++++++++------ .../integrations/types/slackTypes.ts | 2 + .../usecases/slack/getChannels.ts | 3 +- .../integrations/usecases/slack/getMember.ts | 21 +- .../integrations/usecases/slack/getProfile.ts | 49 +++ 5 files changed, 317 insertions(+), 123 deletions(-) create mode 100644 backend/src/serverless/integrations/usecases/slack/getProfile.ts diff --git a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts index 36896a4bc1..9067a3d93a 100644 --- a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts @@ -18,12 +18,15 @@ import getMessagesThreads from '../../usecases/slack/getMessagesInThreads' import getMessages from '../../usecases/slack/getMessages' import getTeam from '../../usecases/slack/getTeam' import { timeout } from '../../../../utils/timing' -import { AddActivitiesSingle } from '../../types/messageTypes' +import { AddActivitiesSingle, Member } from '../../types/messageTypes' import { MemberAttributeName } from '../../../../database/attributes/member/enums' import { SlackGrid } from '../../grid/slackGrid' import IntegrationRepository from '../../../../database/repositories/integrationRepository' import Operations from '../../../dbOperations/operations' import getMember from '../../usecases/slack/getMember' +import getMembers from '../../usecases/slack/getMembers' +import { createRedisClient } from '../../../../utils/redis' +import { RedisCache } from '../../../../utils/redis/redisCache' /* eslint class-methods-use-this: 0 */ @@ -41,6 +44,9 @@ export class SlackIntegrationService extends IntegrationServiceBase { } async preprocess(context: IStepContext): Promise { + const redis = await createRedisClient(true) + const membersCache = new RedisCache('slack-members', redis) + let channelsFromSlackAPI = await getChannels( { token: context.integration.token }, context.logger, @@ -60,11 +66,10 @@ export class SlackIntegrationService extends IntegrationServiceBase { const team = await getTeam({ token: context.integration.token }, context.logger) const teamUrl = team.url - const members = context.integration.settings.members ? context.integration.settings.members : {} - context.pipelineData = { - members, + membersCache, channels: channelsFromSlackAPI, + team, teamUrl, channelsInfo: channelsFromSlackAPI.reduce((acc, channel) => { acc[channel.id] = { @@ -82,12 +87,19 @@ export class SlackIntegrationService extends IntegrationServiceBase { } async getStreams(context: IStepContext): Promise { - return context.pipelineData.channels - .map((c) => c.id) - .map((c) => ({ - value: c, + const streams = context.pipelineData.channels.map((c) => ({ + value: 'channel', + metadata: { channelId: c.id, page: '', general: c.general }, + })) + + if (context.onboarding) { + streams.push({ + value: 'members', metadata: { page: '' }, - })) + }) + } + + return streams } async processStream( @@ -96,47 +108,109 @@ export class SlackIntegrationService extends IntegrationServiceBase { ): Promise { await timeout(1000) - const { fn, arg } = this.getUsecase(stream) + const operations: IStreamResultOperation[] = [] + let nextPage: string + let newStreams: IIntegrationStream[] + let lastRecord - const { records, nextPage, limit, timeUntilReset } = await fn( - { - token: context.integration.token, - ...arg, - page: stream.metadata.page, - perPage: 200, - }, - context.logger, - ) + switch (stream.value) { + case 'channel': { + const result = await getMessages( + { + channelId: stream.metadata.channelId, + page: stream.metadata.page, + perPage: 200, + token: context.integration.token, + }, + context.logger, + ) - const nextPageStream: IIntegrationStream = nextPage - ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: nextPage } } - : undefined + nextPage = result.nextPage + + if (result.records.length > 0) { + const { activities, additionalStreams } = await this.parseActivities( + result.records, + stream, + context, + ) + + operations.push({ + type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, + records: activities, + }) + newStreams = additionalStreams + lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined + } + + break + } + case 'threads': { + const result = await getMessagesThreads( + { + token: context.integration.token, + channelId: stream.metadata.channelId, + page: stream.metadata.page, + perPage: 200, + threadId: stream.metadata.threadId, + }, + context.logger, + ) + + nextPage = result.nextPage + + if (result.records.length > 0) { + const { activities, additionalStreams } = await this.parseActivities( + result.records, + stream, + context, + ) + + operations.push({ + type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, + records: activities, + }) + newStreams = additionalStreams + lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined + } + break + } + case 'members': { + const result = await getMembers( + { + token: context.integration.token, + page: stream.metadata.page, + perPage: 200, + teamId: context.pipelineData.team.id, + }, + context.logger, + ) - const sleep = limit <= 1 ? timeUntilReset : undefined + nextPage = result.nextPage + if (result.records.length > 0) { + const { activities } = await this.parseActivities(result.records, stream, context) - if (records.length === 0) { - return { - operations: [], - nextPageStream, - sleep, + operations.push({ + type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, + records: activities, + }) + } + break } + + default: + throw new Error(`Unknown stream value ${stream.value}!`) } - const { activities, additionalStreams } = await this.parseActivities(records, stream, context) + const nextPageStream: IIntegrationStream = nextPage + ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: nextPage } } + : undefined - const lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined return { - operations: [ - { - type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, - records: activities, - }, - ], + operations, lastRecord, lastRecordTimestamp: lastRecord ? lastRecord.timestamp.getTime() : undefined, - newStreams: additionalStreams, + newStreams, nextPageStream, - sleep, } } @@ -158,6 +232,13 @@ export class SlackIntegrationService extends IntegrationServiceBase { context: IStepContext, ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IIntegrationStream[] }> { switch (stream.value) { + case 'members': { + const members = await this.parseMembers(records, context) + return { + activities: members, + additionalStreams: [], + } + } case 'threads': const parseMessagesInThreadsResult = await this.parseMessagesInThreads( records, @@ -185,57 +266,80 @@ export class SlackIntegrationService extends IntegrationServiceBase { * @returns Return the url: workspaceUrl + channelUrl + messageUrl */ private static getUrl(stream, pipelineData, record) { - const channelId = stream.value === 'threads' ? stream.metadata.channelId : stream.value + const channelId = stream.metadata.channelId return `${pipelineData.teamUrl}archives/${channelId}/p${record.ts.replace('.', '')}` } - private async parseMemberAndUpdateContext(context, userId): Promise { + private static parseMember(record: any): Member { + const member: Member = { + displayName: record.profile.real_name, + username: record.name, + email: record.profile.email, + attributes: { + [MemberAttributeName.SOURCE_ID]: { + [PlatformType.SLACK]: record.id, + }, + ...(record.profile.image_72 && { + [MemberAttributeName.AVATAR_URL]: { + [PlatformType.SLACK]: record.profile.image_72, + }, + }), + ...(record.tz_label && { + [MemberAttributeName.TIMEZONE]: { + [PlatformType.SLACK]: record.tz_label, + }, + }), + ...(record.profile.title && { + [MemberAttributeName.JOB_TITLE]: { + [PlatformType.SLACK]: record.profile.title, + }, + }), + }, + } + + ;(member as any).platform = PlatformType.SLACK + + return member + } + + private static async getMember(userId: string, context: IStepContext): Promise { + const membersCache: RedisCache = context.pipelineData.membersCache + + const cached = await membersCache.getValue(userId) + if (cached) { + if (cached === 'null') { + return undefined + } + + return JSON.parse(cached) + } + const result = await getMember({ token: context.integration.token, userId }, context.logger) + + const member = result.records + + if (member) { + await membersCache.setValue(userId, JSON.stringify(member), 24 * 60 * 60) + + return member + } + + await membersCache.setValue(userId, 'null', 24 * 60 * 60) + return undefined + } + + private async parseMemberAndUpdateContext(context: IStepContext, userId: string): Promise { try { if (userId === undefined) { - return { member: undefined, context } - } - if (context.pipelineData.members[userId]) { - if (context.pipelineData.members[userId] === 'bot') { - return { member: undefined, context } - } - return { member: { username: context.pipelineData.members[userId] }, context } - } - const memberResponse = await getMember( - { token: context.integration.token, userId }, - context.logger, - ) - const record = memberResponse.records - const member = { - displayName: record.profile.real_name, - username: record.name, - email: record.profile.email, - attributes: { - [MemberAttributeName.SOURCE_ID]: { - [PlatformType.SLACK]: record.id, - }, - ...(record.profile.image_72 && { - [MemberAttributeName.AVATAR_URL]: { - [PlatformType.SLACK]: record.profile.image_72, - }, - }), - ...(record.tz_label && { - [MemberAttributeName.TIMEZONE]: { - [PlatformType.SLACK]: record.tz_label, - }, - }), - ...(record.profile.title && { - [MemberAttributeName.JOB_TITLE]: { - [PlatformType.SLACK]: record.profile.title, - }, - }), - }, + return undefined } - context.pipelineData.members[userId] = record.is_bot ? 'bot' : member.username - return { - member: record.is_bot ? undefined : member, - context, + const record = await SlackIntegrationService.getMember(userId, context) + + if (!record || record.is_bot) { + return undefined } + + return SlackIntegrationService.parseMember(record) } catch (e) { context.logger.error('Error getting member in Slack', { userId }) throw e @@ -256,20 +360,32 @@ export class SlackIntegrationService extends IntegrationServiceBase { ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IIntegrationStream[] }> { const newStreams: IIntegrationStream[] = [] const activities: AddActivitiesSingle[] = [] + + const subtypesToIgnore = context.onboarding + ? ['channel_join', 'channel_leave', 'group_join', 'group_leave'] + : [] for (const record of records) { - const newMemberContext = await this.parseMemberAndUpdateContext(context, record.user) - const member = newMemberContext.member + if (context.onboarding && subtypesToIgnore.includes(record.subtype)) { + // check if general channel + // eslint-disable-next-line no-continue + continue + } + + const member = await this.parseMemberAndUpdateContext(context, record.user) if (member !== undefined) { - context = newMemberContext.context let body = record.text - ? SlackIntegrationService.removeMentions(record.text, context.pipelineData) + ? await SlackIntegrationService.removeMentions(record.text, context) : '' let activityType let score let isKeyAction if (record.subtype === 'channel_join') { + if (stream.metadata.general !== true) { + // eslint-disable-next-line no-continue + continue + } activityType = 'channel_joined' score = SlackGrid.join.score isKeyAction = SlackGrid.join.isKeyAction @@ -290,7 +406,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { .toDate(), body, url: SlackIntegrationService.getUrl(stream, context.pipelineData, record), - channel: context.pipelineData.channelsInfo[stream.value].name, + channel: context.pipelineData.channelsInfo[stream.metadata.channelId].name, attributes: { thread: false, reactions: record.reactions ? record.reactions : [], @@ -306,10 +422,10 @@ export class SlackIntegrationService extends IntegrationServiceBase { metadata: { page: '', threadId: record.thread_ts, - channel: context.pipelineData.channelsInfo[stream.value].name, - channelId: stream.value, + channel: context.pipelineData.channelsInfo[stream.metadata.channelId].name, + channelId: stream.metadata.channelId, placeholder: body, - new: context.pipelineData.channelsInfo[stream.value].new, + new: context.pipelineData.channelsInfo[stream.metadata.channelId].new, }, }) } @@ -322,6 +438,40 @@ export class SlackIntegrationService extends IntegrationServiceBase { } } + private async parseMembers( + records: any[], + context: IStepContext, + ): Promise { + const activities: AddActivitiesSingle[] = [] + for (const record of records) { + if (record.is_bot) { + // eslint-disable-next-line no-continue + continue + } + + const member = SlackIntegrationService.parseMember(record) + + activities.push({ + tenant: context.integration.tenantId, + platform: PlatformType.SLACK, + type: 'channel_joined', + sourceId: record.id, + sourceParentId: '', + timestamp: moment('1970-01-01T00:00:00+00:00').utc().toDate(), + body: undefined, + attributes: { + thread: false, + reactions: record.reactions ? record.reactions : [], + attachments: record.attachments ? record.attachments : [], + }, + score: SlackGrid.join.score, + isKeyAction: SlackGrid.join.isKeyAction, + member, + }) + } + return activities + } + /** * Map the messages coming from Slack to activities and members records to the format of the message to add activities and members * @param records List of records coming from the API @@ -337,12 +487,10 @@ export class SlackIntegrationService extends IntegrationServiceBase { const threadInfo = stream.metadata const activities: AddActivitiesSingle[] = [] for (const record of records) { - const newMemberContext = await this.parseMemberAndUpdateContext(context, record.user) - const member = newMemberContext.member - context = newMemberContext.context + const member = await this.parseMemberAndUpdateContext(context, record.user) if (member !== undefined) { const body = record.text - ? SlackIntegrationService.removeMentions(record.text, context.pipelineData) + ? await SlackIntegrationService.removeMentions(record.text, context) : '' activities.push({ tenant: context.integration.tenantId, @@ -380,17 +528,20 @@ export class SlackIntegrationService extends IntegrationServiceBase { /** * Parse mentions * @param text Message text - * @param pipelineData + * @param context * @returns Message text, swapping mention IDs by mentions */ - private static removeMentions(text: string, pipelineData?: any): string { + private static async removeMentions(text: string, context: IStepContext): Promise { const regex = /<@!?[^>]*>/ const globalRegex = /<@!?[^>]*>/g const matches = text.match(globalRegex) if (matches) { for (let match of matches) { match = match.replace('<', '').replace('>', '').replace('@', '').replace('!', '') - text = text.replace(regex, `@${pipelineData.members[match] || 'mention'}`) + + const user = await SlackIntegrationService.getMember(match, context) + const username = user ? user.name : 'mention' + text = text.replace(regex, `@${username}`) } } @@ -439,7 +590,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { ) default: - if (context.pipelineData.channelsInfo[currentStream.value].new) { + if (context.pipelineData.channelsInfo[currentStream.metadata.channelId].new) { return false } @@ -452,24 +603,4 @@ export class SlackIntegrationService extends IntegrationServiceBase { ) } } - - /** - * Get the usecase for the given endpoint with its main argument - * @param stream The stream we are currently targeting - * @returns The function to call, as well as its main argument - */ - private getUsecase(stream: IIntegrationStream): { - fn: Function - arg: any - } { - switch (stream.value) { - case 'threads': - return { - fn: getMessagesThreads, - arg: { threadId: stream.metadata.threadId, channelId: stream.metadata.channelId }, - } - default: - return { fn: getMessages, arg: { channelId: stream.value } } - } - } } diff --git a/backend/src/serverless/integrations/types/slackTypes.ts b/backend/src/serverless/integrations/types/slackTypes.ts index cfa50dabbd..327cb8a45d 100644 --- a/backend/src/serverless/integrations/types/slackTypes.ts +++ b/backend/src/serverless/integrations/types/slackTypes.ts @@ -19,6 +19,7 @@ export interface SlackGetMessagesInThreadsInput { export interface SlackGetMembersInput { token: string + teamId: string page: string | undefined perPage: number | 100 } @@ -32,6 +33,7 @@ export interface SlackChannel { id: string name: string is_member?: boolean + is_general: boolean } export interface SlackTeam { diff --git a/backend/src/serverless/integrations/usecases/slack/getChannels.ts b/backend/src/serverless/integrations/usecases/slack/getChannels.ts index 137b4e6177..0b279146b8 100644 --- a/backend/src/serverless/integrations/usecases/slack/getChannels.ts +++ b/backend/src/serverless/integrations/usecases/slack/getChannels.ts @@ -4,7 +4,7 @@ import { SlackChannels, SlackGetChannelsInput } from '../../types/slackTypes' import { Logger } from '../../../../utils/logging' import { timeout } from '../../../../utils/timing' -async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise { +async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise { await timeout(2000) const config: AxiosRequestConfig = { @@ -27,6 +27,7 @@ async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promis .map((c) => ({ name: c.name, id: c.id, + general: c.is_general, })) } catch (err) { const newErr = handleSlackError(err, config, input, logger) diff --git a/backend/src/serverless/integrations/usecases/slack/getMember.ts b/backend/src/serverless/integrations/usecases/slack/getMember.ts index f67e4d0820..24f57f851b 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMember.ts +++ b/backend/src/serverless/integrations/usecases/slack/getMember.ts @@ -23,12 +23,23 @@ async function getMembers( try { const response = await axios(config) - const member = response.data.user - const nextPage = response.data.response_metadata?.next_cursor || '' - return { - records: member, - nextPage, + + if (response.data.ok === true) { + const member = response.data.user + return { + records: member, + nextPage: '', + } + } + + if (response.data.error === 'user_not_found' || response.data.error === 'user_not_visible') { + return { + records: undefined, + nextPage: '', + } } + + throw new Error(`Slack API error ${response.data.error}!`) } catch (err) { const newErr = handleSlackError(err, config, input, logger) throw newErr diff --git a/backend/src/serverless/integrations/usecases/slack/getProfile.ts b/backend/src/serverless/integrations/usecases/slack/getProfile.ts new file mode 100644 index 0000000000..e55e88d9c4 --- /dev/null +++ b/backend/src/serverless/integrations/usecases/slack/getProfile.ts @@ -0,0 +1,49 @@ +import axios, { AxiosRequestConfig } from 'axios' +import { SlackGetMemberInput, SlackGetMemberOutput } from '../../types/slackTypes' +import { Logger } from '../../../../utils/logging' +import { timeout } from '../../../../utils/timing' +import { handleSlackError } from './errorHandler' + +async function getMembers( + input: SlackGetMemberInput, + logger: Logger, +): Promise { + await timeout(2000) + + const config: AxiosRequestConfig = { + method: 'get', + url: `https://slack.com/api/users.profile.get`, + params: { + user: input.userId, + }, + headers: { + Authorization: `Bearer ${input.token}`, + }, + } + + try { + const response = await axios(config) + + if (response.data.ok === true) { + const profile = response.data.profile + return { + records: profile, + nextPage: '', + } + } + + if (response.data.error === 'user_not_found' || response.data.error === 'account_inactive') { + return { + records: undefined, + nextPage: '', + } + } + + throw new Error(`Slack API error ${response.data.error}!`) + } catch (err) { + const newErr = handleSlackError(err, config, input, logger) + throw newErr + } +} + +export default getMembers From 419abeb0598993e7f76445cadb73d6f9f258e688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 27 Feb 2023 12:43:23 +0100 Subject: [PATCH 2/7] fixed script when doing restore db --- scripts/cli | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/scripts/cli b/scripts/cli index b2ebcf0fd8..713030ffa1 100755 --- a/scripts/cli +++ b/scripts/cli @@ -134,6 +134,7 @@ function restore_db_backup() { # docker exec -t ${PROJECT_NAME}_db_1 bash -c "PGPASSWORD=example dropdb -U postgres crowd-web && PGPASSWORD=example createdb -U postgres crowd-web" docker exec -t ${PROJECT_NAME}_db_1 bash -c "PGPASSWORD=example pg_restore --clean -U postgres -d crowd-web backup.dump && rm -f backup.dump" + post_up_scaffold say "All done!" } @@ -146,6 +147,7 @@ function scaffold() { do case "$1" in up) up_scaffold + post_up_scaffold exit; ;; down) down_scaffold @@ -157,9 +159,6 @@ function scaffold() { reset) scaffold_reset exit; ;; - clear-data) scaffold_clear - exit; - ;; create-migration) create_migration $2 exit; ;; @@ -352,10 +351,11 @@ function init_unleash() { function up_scaffold() { scaffold_set_up_network "$PROJECT_NAME-bridge" $DOCKER_NETWORK_SUBNET $DOCKER_NETWORK_GATEWAY $_DC --compatibility -p $PROJECT_NAME -f $CLI_HOME/scaffold.yaml up -d --build - wait_for_db +} + +function post_up_scaffold() { migrate_local bash pizzly-integrations.sh - check_init_premium } @@ -377,11 +377,7 @@ function scaffold_destroy() { function scaffold_reset() { scaffold_destroy up_scaffold -} - -function scaffold_clear() { - scaffold_destroy - up_scaffold + post_up_scaffold } function kill_all_containers() { @@ -439,6 +435,7 @@ function wait_for_db() { function start() { if [[ -z "$CLEAN_START" ]]; then up_scaffold + post_up_scaffold else scaffold_reset fi From 5c7e75b19fb208ee35216d8f24a1ef8f97d985ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 27 Feb 2023 14:50:46 +0100 Subject: [PATCH 3/7] Slack integration fixes for missing/duplicated members and activities. --- .../U1677503916__activity-source-index.sql | 1 + .../V1677503916__activity-source-index.sql | 1 + .../integrations/slackIntegrationService.ts | 58 ++++++------------- 3 files changed, 20 insertions(+), 40 deletions(-) create mode 100644 backend/src/database/migrations/U1677503916__activity-source-index.sql create mode 100644 backend/src/database/migrations/V1677503916__activity-source-index.sql diff --git a/backend/src/database/migrations/U1677503916__activity-source-index.sql b/backend/src/database/migrations/U1677503916__activity-source-index.sql new file mode 100644 index 0000000000..4f6190858c --- /dev/null +++ b/backend/src/database/migrations/U1677503916__activity-source-index.sql @@ -0,0 +1 @@ +drop index create unique index ix_unique_activities_tenantId_platform_type_sourceId; \ No newline at end of file diff --git a/backend/src/database/migrations/V1677503916__activity-source-index.sql b/backend/src/database/migrations/V1677503916__activity-source-index.sql new file mode 100644 index 0000000000..7344a2c818 --- /dev/null +++ b/backend/src/database/migrations/V1677503916__activity-source-index.sql @@ -0,0 +1 @@ +create unique index ix_unique_activities_tenantId_platform_type_sourceId on activities ("tenantId", platform, type, "sourceId"); \ No newline at end of file diff --git a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts index 9067a3d93a..2de84e5568 100644 --- a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts @@ -87,10 +87,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { } async getStreams(context: IStepContext): Promise { - const streams = context.pipelineData.channels.map((c) => ({ - value: 'channel', - metadata: { channelId: c.id, page: '', general: c.general }, - })) + const streams = [] if (context.onboarding) { streams.push({ @@ -99,6 +96,14 @@ export class SlackIntegrationService extends IntegrationServiceBase { }) } + const channelStreams = context.pipelineData.channels.map((c) => ({ + value: 'channel', + metadata: { channelId: c.id, page: '', general: c.general }, + })) + if (channelStreams.length > 0) { + streams.push(...channelStreams) + } + return streams } @@ -327,7 +332,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { return undefined } - private async parseMemberAndUpdateContext(context: IStepContext, userId: string): Promise { + private async fetchAndParseMember(context: IStepContext, userId: string): Promise { try { if (userId === undefined) { return undefined @@ -361,17 +366,8 @@ export class SlackIntegrationService extends IntegrationServiceBase { const newStreams: IIntegrationStream[] = [] const activities: AddActivitiesSingle[] = [] - const subtypesToIgnore = context.onboarding - ? ['channel_join', 'channel_leave', 'group_join', 'group_leave'] - : [] for (const record of records) { - if (context.onboarding && subtypesToIgnore.includes(record.subtype)) { - // check if general channel - // eslint-disable-next-line no-continue - continue - } - - const member = await this.parseMemberAndUpdateContext(context, record.user) + const member = await this.fetchAndParseMember(context, record.user) if (member !== undefined) { let body = record.text @@ -381,25 +377,24 @@ export class SlackIntegrationService extends IntegrationServiceBase { let activityType let score let isKeyAction + let sourceId if (record.subtype === 'channel_join') { - if (stream.metadata.general !== true) { - // eslint-disable-next-line no-continue - continue - } activityType = 'channel_joined' score = SlackGrid.join.score isKeyAction = SlackGrid.join.isKeyAction body = undefined + sourceId = record.user } else { activityType = 'message' score = SlackGrid.message.score isKeyAction = SlackGrid.message.isKeyAction + sourceId = record.ts } activities.push({ tenant: context.integration.tenantId, platform: PlatformType.SLACK, type: activityType, - sourceId: record.ts, + sourceId, sourceParentId: '', timestamp: moment(parseInt(record.ts, 10) * 1000) .utc() @@ -431,7 +426,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { } } } - await SlackIntegrationService.updateMembers(context) + return { activities, additionalStreams: newStreams, @@ -456,7 +451,6 @@ export class SlackIntegrationService extends IntegrationServiceBase { platform: PlatformType.SLACK, type: 'channel_joined', sourceId: record.id, - sourceParentId: '', timestamp: moment('1970-01-01T00:00:00+00:00').utc().toDate(), body: undefined, attributes: { @@ -487,7 +481,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { const threadInfo = stream.metadata const activities: AddActivitiesSingle[] = [] for (const record of records) { - const member = await this.parseMemberAndUpdateContext(context, record.user) + const member = await this.fetchAndParseMember(context, record.user) if (member !== undefined) { const body = record.text ? await SlackIntegrationService.removeMentions(record.text, context) @@ -518,7 +512,7 @@ export class SlackIntegrationService extends IntegrationServiceBase { }) } } - await SlackIntegrationService.updateMembers(context) + return { activities, additionalStreams: [], @@ -548,22 +542,6 @@ export class SlackIntegrationService extends IntegrationServiceBase { return text } - /** - * Update members for an integration. - * This update needs to happen synchronously, since the message endpoints need these members - */ - private static async updateMembers(context: IStepContext) { - const integration = await IntegrationRepository.findById( - context.integration.id, - context.repoContext, - ) - const settings = { - members: context.pipelineData.members, - channels: integration.settings.channels || [], - } - await IntegrationRepository.update(context.integration.id, { settings }, context.repoContext) - } - async isProcessingFinished( context: IStepContext, currentStream: IIntegrationStream, From 15e2cded0b1af495da0b5782a31cb09acea5b167 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 27 Feb 2023 14:58:15 +0100 Subject: [PATCH 4/7] fixed linting issue --- .../services/integrations/slackIntegrationService.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts index 2de84e5568..cb7650e019 100644 --- a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts +++ b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts @@ -21,7 +21,6 @@ import { timeout } from '../../../../utils/timing' import { AddActivitiesSingle, Member } from '../../types/messageTypes' import { MemberAttributeName } from '../../../../database/attributes/member/enums' import { SlackGrid } from '../../grid/slackGrid' -import IntegrationRepository from '../../../../database/repositories/integrationRepository' import Operations from '../../../dbOperations/operations' import getMember from '../../usecases/slack/getMember' import getMembers from '../../usecases/slack/getMembers' From ba21b593ca0723c73be938d7a3363cac1d2ba5db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Tue, 28 Feb 2023 15:10:50 +0100 Subject: [PATCH 5/7] fixing tests --- .../__tests__/memberRepository.test.ts | 8 ++--- .../__tests__/organizationRepository.test.ts | 2 +- .../__tests__/taskRepository.test.ts | 31 ++++++++++++++++--- .../__tests__/conversationService.test.ts | 2 +- .../services/__tests__/memberService.test.ts | 7 ++--- 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/backend/src/database/repositories/__tests__/memberRepository.test.ts b/backend/src/database/repositories/__tests__/memberRepository.test.ts index 539c23fe78..8f7e2c8170 100644 --- a/backend/src/database/repositories/__tests__/memberRepository.test.ts +++ b/backend/src/database/repositories/__tests__/memberRepository.test.ts @@ -297,8 +297,8 @@ describe('MemberRepository tests', () => { const memberCreated = await MemberRepository.create(member2add, mockIRepositoryOptions) expect(memberCreated.tasks).toHaveLength(2) - expect(memberCreated.tasks[0].id).toEqual(tasks1.id) - expect(memberCreated.tasks[1].id).toEqual(task2.id) + expect(memberCreated.tasks.find((t) => t.id === tasks1.id)).not.toBeUndefined() + expect(memberCreated.tasks.find((t) => t.id === task2.id)).not.toBeUndefined() }) }) @@ -1655,8 +1655,8 @@ describe('MemberRepository tests', () => { mockIRepositoryOptions, ) expect(memberUpdated.tasks).toHaveLength(2) - expect(memberUpdated.tasks[0].id).toEqual(tasks1.id) - expect(memberUpdated.tasks[1].id).toEqual(task2.id) + expect(memberUpdated.tasks.find((t) => t.id === tasks1.id)).not.toBeUndefined() + expect(memberUpdated.tasks.find((t) => t.id === task2.id)).not.toBeUndefined() }) it('Should throw 404 error when trying to update non existent member', async () => { diff --git a/backend/src/database/repositories/__tests__/organizationRepository.test.ts b/backend/src/database/repositories/__tests__/organizationRepository.test.ts index e26013b533..16bf3fbe3c 100644 --- a/backend/src/database/repositories/__tests__/organizationRepository.test.ts +++ b/backend/src/database/repositories/__tests__/organizationRepository.test.ts @@ -1015,7 +1015,7 @@ describe('OrganizationRepository tests', () => { type: 'activity', timestamp: '2022-02-27T15:13:30Z', platform: PlatformType.DEVTO, - sourceId: '#sourceId4', + sourceId: '#sourceId5', }, ], }, diff --git a/backend/src/database/repositories/__tests__/taskRepository.test.ts b/backend/src/database/repositories/__tests__/taskRepository.test.ts index aee87d5aa6..b55d242377 100644 --- a/backend/src/database/repositories/__tests__/taskRepository.test.ts +++ b/backend/src/database/repositories/__tests__/taskRepository.test.ts @@ -136,8 +136,12 @@ describe('TaskRepository tests', () => { createdById: mockIRepositoryOptions.currentUser.id, updatedById: mockIRepositoryOptions.currentUser.id, } - expect(createdTask).toStrictEqual(expectedTaskCreated) - expect(createdTask.members.length).toBe(sampleMembers.length) + const clone1 = { ...createdTask } + const clone2 = { ...expectedTaskCreated } + delete clone1.members + delete clone2.members + expect(clone1).toStrictEqual(clone2) + expect(createdTask.members.sort()).toEqual(expectedTaskCreated.members.sort()) // Make sure the task exists in the member for (const memberId of createdTask.members) { @@ -210,7 +214,13 @@ describe('TaskRepository tests', () => { createdById: mockIRepositoryOptions.currentUser.id, updatedById: mockIRepositoryOptions.currentUser.id, } - expect(createdTask).toStrictEqual(expectedTaskCreated) + + const clone1 = { ...createdTask } + const clone2 = { ...expectedTaskCreated } + delete clone1.members + delete clone2.members + expect(clone1).toStrictEqual(clone2) + expect(createdTask.members.sort()).toEqual(expectedTaskCreated.members.sort()) expect(createdTask.activities.length).toBe(sampleActivities.length) expect(createdTask.members.length).toBe(sampleMembers.length) }) @@ -315,7 +325,12 @@ describe('TaskRepository tests', () => { createdById: mockIRepositoryOptions.currentUser.id, updatedById: mockIRepositoryOptions.currentUser.id, } - expect(createdTask).toStrictEqual(expectedTaskCreated) + const clone1 = { ...createdTask } + const clone2 = { ...expectedTaskCreated } + delete clone1.members + delete clone2.members + expect(clone1).toStrictEqual(clone2) + expect(createdTask.members.sort()).toEqual(expectedTaskCreated.members.sort()) expect(createdTask.members.length).toBe(sampleMembers.length) // Make sure the task exists in the member @@ -389,7 +404,13 @@ describe('TaskRepository tests', () => { createdById: mockIRepositoryOptions.currentUser.id, updatedById: mockIRepositoryOptions.currentUser.id, } - expect(createdTask).toStrictEqual(expectedTaskCreated) + + const clone1 = { ...createdTask } + const clone2 = { ...expectedTaskCreated } + delete clone1.members + delete clone2.members + expect(clone1).toStrictEqual(clone2) + expect(createdTask.members.sort()).toEqual(expectedTaskCreated.members.sort()) expect(createdTask.activities.length).toBe(sampleActivities.length) expect(createdTask.members.length).toBe(sampleMembers.length) }) diff --git a/backend/src/services/__tests__/conversationService.test.ts b/backend/src/services/__tests__/conversationService.test.ts index a1ef5c8ae9..a4ca24b708 100644 --- a/backend/src/services/__tests__/conversationService.test.ts +++ b/backend/src/services/__tests__/conversationService.test.ts @@ -1161,7 +1161,7 @@ describe('ConversationService tests', () => { member: memberCreated.id, score: 1, parent: discordActivityParentCreated.id, - sourceId: '#discordSourceId1', + sourceId: '#discordSourceId2', } const discordActivityChildCreated = await ActivityRepository.create( diff --git a/backend/src/services/__tests__/memberService.test.ts b/backend/src/services/__tests__/memberService.test.ts index 5a3fe01c7e..d293f82fa7 100644 --- a/backend/src/services/__tests__/memberService.test.ts +++ b/backend/src/services/__tests__/memberService.test.ts @@ -1767,6 +1767,9 @@ describe('MemberService tests', () => { lastActivity: activityCreated, } + expect(mergedMember.tasks.sort()).toEqual(expectedMember.tasks.sort()) + delete mergedMember.tasks + delete expectedMember.tasks expect(mergedMember).toStrictEqual(expectedMember) }) @@ -1837,8 +1840,6 @@ describe('MemberService tests', () => { a1.member = createdMember2.id - await ActivityRepository.create(a1, mockIRepositoryOptions) - const a3Created = await ActivityRepository.create( { timestamp: '2019-12-27T15:14:30Z', @@ -1912,8 +1913,6 @@ describe('MemberService tests', () => { aRepeated.member = createdMember2.id - await ActivityRepository.create(aRepeated, mockIRepositoryOptions) - const aSameTsDifferentType = await ActivityRepository.create( { timestamp: moment(0).utc().toString(), From 5ea13695cca9ad2434e1a7772d330ccaed1a0a19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 1 Mar 2023 08:08:41 +0100 Subject: [PATCH 6/7] removed deprecated tests --- .../services/__tests__/memberService.test.ts | 136 ------------------ 1 file changed, 136 deletions(-) diff --git a/backend/src/services/__tests__/memberService.test.ts b/backend/src/services/__tests__/memberService.test.ts index d293f82fa7..df22b2639a 100644 --- a/backend/src/services/__tests__/memberService.test.ts +++ b/backend/src/services/__tests__/memberService.test.ts @@ -1798,142 +1798,6 @@ describe('MemberService tests', () => { const found = await memberService.findById(memberCreated.id) expect(found).toStrictEqual(memberCreated) }) - - it('Should not duplicate activities - by sourceId', async () => { - const mockIRepositoryOptions = await SequelizeTestUtils.getTestIRepositoryOptions(db) - const mas = new MemberAttributeSettingsService(mockIRepositoryOptions) - - await mas.createPredefined(GithubMemberAttributes) - - const memberService = new MemberService(mockIRepositoryOptions) - - const member1 = { - username: { - [PlatformType.GITHUB]: 'anil', - }, - displayName: 'Anil', - joinedAt: '2021-05-27T15:14:30Z', - attributes: {}, - } - - const createdMember = await MemberRepository.create(member1, mockIRepositoryOptions) - - const a1 = { - timestamp: '2021-05-27T15:14:30Z', - type: 'activity', - member: createdMember.id, - platform: PlatformType.GITHUB, - sourceId: '#sourceId1', - } - - const a1Created = await ActivityRepository.create(a1, mockIRepositoryOptions) - - const member2 = { - username: { - [PlatformType.GITHUB]: 'anil', - }, - displayName: 'Anil', - joinedAt: '2021-05-27T15:14:30Z', - } - - const createdMember2 = await MemberRepository.create(member2, mockIRepositoryOptions) - - a1.member = createdMember2.id - - const a3Created = await ActivityRepository.create( - { - timestamp: '2019-12-27T15:14:30Z', - type: 'activity', - member: createdMember2.id, - platform: PlatformType.GITHUB, - sourceId: '#sourceId3', - }, - mockIRepositoryOptions, - ) - - // Merge - await memberService.merge(createdMember.id, createdMember2.id) - - const foundMergedActivities = (await memberService.findById(createdMember.id)).activities - .map((a) => a.get({ plain: true }).id) - .sort() - - const expected = [a1Created.id, a3Created.id].sort() - expect(foundMergedActivities).toStrictEqual(expected) - }) - - it('Duplication of activities - one matching timestamp is duplicated because platform is different', async () => { - const mockIRepositoryOptions = await SequelizeTestUtils.getTestIRepositoryOptions(db) - const mas = new MemberAttributeSettingsService(mockIRepositoryOptions) - - await mas.createPredefined(GithubMemberAttributes) - - const memberService = new MemberService(mockIRepositoryOptions) - - const member1 = { - username: { - [PlatformType.GITHUB]: 'anil', - }, - displayName: 'Anil', - joinedAt: '2021-05-27T15:14:30Z', - attributes: {}, - } - - const createdMember = await MemberRepository.create(member1, mockIRepositoryOptions) - - const a1 = { - timestamp: moment(0).utc().toString(), - type: 'activity', - member: createdMember.id, - platform: PlatformType.GITHUB, - sourceId: '#sourceId1', - } - - const aRepeated = { - timestamp: '2021-06-27T15:14:30Z', - type: 'activity', - member: createdMember.id, - platform: PlatformType.GITHUB, - sourceId: '#sourceId2', - } - - const a1Created = await ActivityRepository.create(a1, mockIRepositoryOptions) - - const aCreatedRepeated1 = await ActivityRepository.create(aRepeated, mockIRepositoryOptions) - - const member2 = { - username: { - [PlatformType.GITHUB]: 'anil', - }, - displayName: 'Anil', - joinedAt: '2021-05-27T15:14:30Z', - } - - const createdMember2 = await MemberRepository.create(member2, mockIRepositoryOptions) - - aRepeated.member = createdMember2.id - - const aSameTsDifferentType = await ActivityRepository.create( - { - timestamp: moment(0).utc().toString(), - type: 'activity', - member: createdMember2.id, - platform: 'different', - sourceId: '#sourceId3', - }, - mockIRepositoryOptions, - ) - - // Merge - await memberService.merge(createdMember.id, createdMember2.id) - - const foundMergedActivities = (await memberService.findById(createdMember.id)).activities - .map((a) => a.get({ plain: true }).id) - .sort() - - const expected = [aCreatedRepeated1.id, a1Created.id, aSameTsDifferentType.id].sort() - expect(foundMergedActivities).toStrictEqual(expected) - }) }) describe('addToNoMerge method', () => { From b5cc46d8f787d12e52f8ae565d27a9756072bfd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Wed, 1 Mar 2023 08:15:38 +0100 Subject: [PATCH 7/7] fixed linting --- backend/src/services/__tests__/memberService.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/services/__tests__/memberService.test.ts b/backend/src/services/__tests__/memberService.test.ts index df22b2639a..bb67d00dce 100644 --- a/backend/src/services/__tests__/memberService.test.ts +++ b/backend/src/services/__tests__/memberService.test.ts @@ -1,4 +1,3 @@ -import moment from 'moment' import SequelizeTestUtils from '../../database/utils/sequelizeTestUtils' import MemberService from '../memberService' import MemberRepository from '../../database/repositories/memberRepository'