Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion backend/src/bin/scripts/continue-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ const options = [
description:
'The unique ID of integration run that you would like to continue processing. Use comma delimiter when sending multiple integration runs.',
},
{
name: 'disableFiringCrowdWebhooks',
alias: 'd',
typeLabel: '{underline disableFiringCrowdWebhooks}',
type: Boolean,
defaultOption: false,
description: 'Should it disable firing outgoing crowd webhooks?',
},
{
name: 'help',
alias: 'h',
Expand Down Expand Up @@ -55,6 +63,8 @@ if (parameters.help && !parameters.run) {
setImmediate(async () => {
const options = await SequelizeRepository.getDefaultIRepositoryOptions()

const fireCrowdWebhooks = !parameters.disableFiringCrowdWebhooks

const runRepo = new IntegrationRunRepository(options)

const runIds = parameters.run.split(',')
Expand All @@ -75,7 +85,16 @@ if (parameters.help && !parameters.run) {
await runRepo.restart(run.id)
}

await sendNodeWorkerMessage(run.tenantId, new NodeWorkerIntegrationProcessMessage(run.id))
if (!fireCrowdWebhooks) {
log.info(
'fireCrowdWebhooks is false - This continue-run will not trigger outgoing crowd webhooks!',
)
}

await sendNodeWorkerMessage(
run.tenantId,
new NodeWorkerIntegrationProcessMessage(run.id, null, fireCrowdWebhooks),
)
}
}

Expand Down
14 changes: 12 additions & 2 deletions backend/src/bin/scripts/process-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ const options = [
type: Boolean,
defaultValue: false,
},
{
name: 'disableFiringCrowdWebhooks',
alias: 'd',
typeLabel: '{underline disableFiringCrowdWebhooks}',
type: Boolean,
defaultOption: false,
description: 'Should it disable firing outgoing crowd webhooks?',
},
{
name: 'platform',
alias: 'p',
Expand Down Expand Up @@ -70,6 +78,8 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) {
const onboarding = parameters.onboarding
const options = await SequelizeRepository.getDefaultIRepositoryOptions()

const fireCrowdWebhooks = !parameters.disableFiringCrowdWebhooks

const runRepo = new IntegrationRunRepository(options)

if (parameters.platform) {
Expand All @@ -96,7 +106,7 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) {

await sendNodeWorkerMessage(
integration.tenantId,
new NodeWorkerIntegrationProcessMessage(run.id),
new NodeWorkerIntegrationProcessMessage(run.id, null, fireCrowdWebhooks),
)
}
},
Expand Down Expand Up @@ -128,7 +138,7 @@ if (parameters.help || (!parameters.integration && !parameters.platform)) {

await sendNodeWorkerMessage(
integration.tenantId,
new NodeWorkerIntegrationProcessMessage(run.id),
new NodeWorkerIntegrationProcessMessage(run.id, null, fireCrowdWebhooks),
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ export class IntegrationRunProcessor extends LoggingBase {
`Processing bulk operation with ${operation.records.length} records!`,
)
stepContext.limitCount += operation.records.length
await bulkOperations(operation.type, operation.records, userContext)
await bulkOperations(
operation.type,
operation.records,
userContext,
req.fireCrowdWebhooks ?? true,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,70 +380,35 @@ export class GithubIntegrationService extends IntegrationServiceBase {
}

case 'pull_request': {
// handle case of multiple reviewers (either by assigning a team to it, or with multiple selection in one go)
if (
payload.action === 'review_requested' &&
(payload.pull_request.requested_reviewers.length > 0 ||
payload.pull_request.requested_teams.length > 0)
) {
if (payload.pull_request.requested_reviewers.length > 0) {
// multiple reviewers sent in one payload
for (const reviewer of payload.pull_request.requested_reviewers) {
const reviewRequestActivity = await GithubIntegrationService.parseWebhookPullRequest(
{ ...payload, requested_reviewer: reviewer },
context,
)

if (reviewRequestActivity) {
records.push(reviewRequestActivity)
}
}
} else if (payload.pull_request.requested_teams.length > 0) {
// a team sent as reviewer, first we need to find members in this team
for (const team of payload.pull_request.requested_teams as GithubWebhookTeam[]) {
const teamMembers = await new TeamsQuery(
team.node_id,
context.integration.token,
).getSinglePage('')

for (const teamMember of teamMembers.data) {
const reviewRequestActivity =
await GithubIntegrationService.parseWebhookPullRequest(
{ ...payload, requested_reviewer: teamMember },
context,
)

if (reviewRequestActivity) {
records.push(reviewRequestActivity)
}
}
}
}

break
}

// handle case of multiple assignees
if (payload.action === 'assigned' && payload.pull_request.assignees.length > 0) {
for (const assignee of payload.pull_request.assignees) {
payload.pull_request.assignee = assignee

const assignedActivity = await GithubIntegrationService.parseWebhookPullRequest(
payload,
// handle case of multiple reviewers (by assigning a team as a reviewer)
if (payload.action === 'review_requested' && payload.requested_team) {
// a team sent as reviewer, first we need to find members in this team
const team: GithubWebhookTeam = payload.requested_team
const teamMembers = await new TeamsQuery(
team.node_id,
context.integration.token,
).getSinglePage('')

for (const teamMember of teamMembers.data) {
const reviewRequestActivity = await GithubIntegrationService.parseWebhookPullRequest(
{ ...payload, requested_reviewer: teamMember },
context,
)

if (assignedActivity) {
records.push(assignedActivity)
if (reviewRequestActivity) {
records.push(reviewRequestActivity)
}
}

break
}

if (payload.action === 'closed' && payload.pull_request.merged) {
const revisedPayload = { ...payload, action: 'merged' }
revisedPayload.pull_request.state = 'merged'

const prMergedRecord = await GithubIntegrationService.parseWebhookPullRequest(
{ ...payload, action: 'merged' },
revisedPayload,
context,
)
if (prMergedRecord) {
Expand Down Expand Up @@ -831,8 +796,8 @@ export class GithubIntegrationService extends IntegrationServiceBase {
timestamp = payload.review.submitted_at
sourceParentId = payload.pull_request.node_id.toString()
sourceId = `gen-PRR_${payload.pull_request.node_id.toString()}_${
payload.review.user.login
}_${payload.review.submitted_at}`
payload.sender.login
}_${moment(payload.review.submitted_at).utc().toISOString()}`
body = payload.review.body
break
}
Expand All @@ -843,7 +808,7 @@ export class GithubIntegrationService extends IntegrationServiceBase {

const review = payload.review
const pull = payload.pull_request
const member = await GithubIntegrationService.parseWebhookMember(review.user.login, context)
const member = await GithubIntegrationService.parseWebhookMember(payload.sender.login, context)

if (member) {
return {
Expand All @@ -862,6 +827,7 @@ export class GithubIntegrationService extends IntegrationServiceBase {
score: scoreGrid.score,
isContribution: scoreGrid.isContribution,
attributes: {
reviewState: (payload.review?.state as string).toUpperCase(),
state: pull.state,
authorAssociation: pull.author_association,
labels: pull.labels.map((l) => l.name),
Expand Down Expand Up @@ -906,7 +872,7 @@ export class GithubIntegrationService extends IntegrationServiceBase {
timestamp = payload.pull_request.closed_at
sourceParentId = payload.pull_request.node_id.toString()
sourceId = `gen-CE_${payload.pull_request.node_id.toString()}_${
payload.pull_request.user.login
payload.sender.login
}_${moment(payload.pull_request.closed_at).utc().toISOString()}`
break
}
Expand All @@ -916,13 +882,11 @@ export class GithubIntegrationService extends IntegrationServiceBase {
scoreGrid = GitHubGrid.pullRequestAssigned
timestamp = payload.pull_request.updated_at
sourceParentId = payload.pull_request.node_id.toString()
sourceId = `gen-AE_${payload.pull_request.node_id.toString()}_${
payload.pull_request.user.login
}_${payload.pull_request.assignee.login}_${moment(payload.pull_request.updated_at)
.utc()
.toISOString()}`
sourceId = `gen-AE_${payload.pull_request.node_id.toString()}_${payload.sender.login}_${
payload.assignee.login
}_${moment(payload.pull_request.updated_at).utc().toISOString()}`
objectMember = await GithubIntegrationService.parseWebhookMember(
payload.pull_request.assignee.login,
payload.assignee.login,
context,
)
objectMemberUsername = objectMember.username[PlatformType.GITHUB].username
Expand All @@ -934,11 +898,9 @@ export class GithubIntegrationService extends IntegrationServiceBase {
scoreGrid = GitHubGrid.pullRequestReviewRequested
timestamp = payload.pull_request.updated_at
sourceParentId = payload.pull_request.node_id.toString()
sourceId = `gen-RRE_${payload.pull_request.node_id.toString()}_${
payload.pull_request.user.login
}_${payload.requested_reviewer.login}_${moment(payload.pull_request.updated_at)
.utc()
.toISOString()}`
sourceId = `gen-RRE_${payload.pull_request.node_id.toString()}_${payload.sender.login}_${
payload.requested_reviewer.login
}_${moment(payload.pull_request.updated_at).utc().toISOString()}`
objectMember = await GithubIntegrationService.parseWebhookMember(
payload.requested_reviewer.login,
context,
Expand Down Expand Up @@ -1146,6 +1108,7 @@ export class GithubIntegrationService extends IntegrationServiceBase {
channel: pullRequest.channel,
title: '',
attributes: {
reviewState: record.state,
state: (pullRequest.attributes as any).state,
additions: (pullRequest.attributes as any).additions,
deletions: (pullRequest.attributes as any).deletions,
Expand Down Expand Up @@ -1256,7 +1219,7 @@ export class GithubIntegrationService extends IntegrationServiceBase {
timestamp: moment(record.createdAt).utc().toDate(),
body: record.bodyText,
url: record.url,
channel: record.pullRequest.url,
channel: repo.url,
title: '',
attributes: {
state: record.pullRequest.state.toLowerCase(),
Expand Down Expand Up @@ -1425,6 +1388,10 @@ export class GithubIntegrationService extends IntegrationServiceBase {
let type: GithubActivityType
let scoreGrid: gridEntry
let timestamp: string
let sourceId: string
let sourceParentId: string
let body: string = ''
let title: string = ''

switch (payload.action) {
case 'edited':
Expand All @@ -1433,20 +1400,30 @@ export class GithubIntegrationService extends IntegrationServiceBase {
type = GithubActivityType.ISSUE_OPENED
scoreGrid = GitHubGrid.issueOpened
timestamp = payload.issue.created_at
sourceParentId = null
sourceId = payload.issue.node_id.toString()
body = payload.issue.body
title = payload.issue.title
break

case 'closed':
type = GithubActivityType.ISSUE_CLOSED
scoreGrid = GitHubGrid.issueClosed
timestamp = payload.issue.closed_at
sourceParentId = payload.issue.node_id.toString()
sourceId = `gen-CE_${payload.issue.node_id.toString()}_${payload.sender.login}_${moment(
payload.issue.closed_at,
)
.utc()
.toISOString()}`
break

default:
return undefined
}

const issue = payload.issue
const member = await GithubIntegrationService.parseWebhookMember(issue.user.login, context)
const member = await GithubIntegrationService.parseWebhookMember(payload.sender.login, context)

if (member) {
return {
Expand All @@ -1456,12 +1433,12 @@ export class GithubIntegrationService extends IntegrationServiceBase {
timestamp: moment(timestamp).utc().toDate(),
platform: PlatformType.GITHUB,
tenant: context.integration.tenantId,
sourceId: issue.node_id.toString(),
sourceParentId: null,
sourceId,
sourceParentId,
url: issue.html_url,
title: issue.title,
title,
channel: payload.repository.html_url,
body: issue.body,
body,
attributes: {
state: issue.state,
},
Expand Down Expand Up @@ -1501,11 +1478,65 @@ export class GithubIntegrationService extends IntegrationServiceBase {
score: GitHubGrid.issueOpened.score,
isContribution: GitHubGrid.issueOpened.isContribution,
})

// parse issue events
out.push(
...(await GithubIntegrationService.parseIssueEvents(
record.timelineItems.nodes,
out[out.length - 1],
context,
)),
)
}

return out
}

private static async parseIssueEvents(
records: any[],
issue: AddActivitiesSingle,
context: IStepContext,
): Promise<AddActivitiesSingle[]> {
const out: AddActivitiesSingle[] = []

for (const record of records) {
switch (record.__typename) {
case GithubPullRequestEvents.CLOSE:
if (record.actor.login) {
const member = await GithubIntegrationService.parseMember(record.actor, context)
out.push({
username: member.username[PlatformType.GITHUB].username,
tenant: context.integration.tenantId,
platform: PlatformType.GITHUB,
type: GithubActivityType.ISSUE_CLOSED,
sourceId: `gen-CE_${issue.sourceId}_${record.actor.login}_${moment(record.createdAt)
.utc()
.toISOString()}`,
sourceParentId: issue.sourceId,
timestamp: moment(record.createdAt).utc().toDate(),
body: '',
url: issue.url,
channel: issue.channel,
title: '',
attributes: {
state: (issue.attributes as any).state,
},
member,
score: GitHubGrid.issueClosed.score,
isContribution: GitHubGrid.issueClosed.isContribution,
})
}

break
default:
context.logger.warn(
`Unsupported issue event: ${record.__typename}. This event will not be parsed.`,
)
}
}
return out
}

private static async parseIssueComments(
records: any[],
repo: Repo,
Expand Down
Loading