Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 288b59d

Browse files
author
James Cori
committed
Merge branch 'develop'
2 parents ffbdc8d + f62495f commit 288b59d

File tree

12 files changed

+385
-117
lines changed

12 files changed

+385
-117
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,4 @@ workflows:
7979
context : org-global
8080
filters:
8181
branches:
82-
only: master
82+
only: master

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
node_modules
2-
32
.env
3+
coverage
4+
.nyc_output

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ fi
2020
if [ "$UPDATE_CACHE" == 1 ]
2121
then
2222
docker cp app:/$APP_NAME/node_modules .
23-
fi
23+
fi

config/default.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ module.exports = {
1414
// Kafka group id
1515
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'autopilot-processor',
1616
NOTIFICATION_CREATE_TOPIC: process.env.NOTIFICATION_CREATE_TOPIC || 'challenge.notification.create',
17+
NOTIFICATION_UPDATE_TOPIC: process.env.NOTIFICATION_UPDATE_TOPIC || 'challenge.notification.update',
1718

18-
SCHEDULE_API_URL: process.env.SCHEDULE_API_URL,
19+
SCHEDULE_API_URL: process.env.SCHEDULE_API_URL || 'https://api.topcoder-dev.com/v5/schedules',
1920
CHALLENGE_API_URL: process.env.CHALLENGE_API_URL || 'https://api.topcoder-dev.com/v5/challenges',
2021

2122
AUTH0_URL: process.env.AUTH0_URL,

docker-kafka/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version: '3'
1+
version: "3"
22
services:
33
zookeeper:
44
image: wurstmeister/zookeeper
@@ -12,5 +12,5 @@ services:
1212
- "9092:9092"
1313
environment:
1414
KAFKA_ADVERTISED_HOST_NAME: localhost
15-
KAFKA_CREATE_TOPICS: "challenge.notification.create:1:1"
15+
KAFKA_CREATE_TOPICS: "challenge.notification.create:1:1,challenge.notification.update:1:1"
1616
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

src/app.js

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
2020
*/
2121
const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, async (m) => {
2222
const message = m.message.value.toString('utf8')
23-
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${
24-
m.offset}; Message: ${message}.`)
23+
logger.info(`Handle Kafka event message; Topic: ${topic}; Partition: ${partition}; Offset: ${m.offset}; Message: ${message}.`)
2524
let messageJSON
2625
try {
2726
messageJSON = JSON.parse(message)
@@ -43,8 +42,11 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
4342
}
4443

4544
try {
46-
await ProcessorService.processCreate(messageJSON)
47-
45+
if (topic === config.NOTIFICATION_CREATE_TOPIC) {
46+
await ProcessorService.processCreate(messageJSON)
47+
} else {
48+
await ProcessorService.processUpdate(messageJSON)
49+
}
4850
logger.debug('Successfully processed message')
4951
} catch (err) {
5052
logger.logFullError(err)
@@ -68,7 +70,7 @@ const check = () => {
6870
return connected
6971
}
7072

71-
const topics = [config.NOTIFICATION_CREATE_TOPIC]
73+
const topics = [config.NOTIFICATION_CREATE_TOPIC, config.NOTIFICATION_UPDATE_TOPIC]
7274

7375
logger.info('Starting kafka consumer')
7476
consumer

src/common/helper.js

Lines changed: 83 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ function getErrorWithStatus (message, statusCode) {
5353
/**
5454
* Get challenge by id
5555
* @param challengeId the challenge id
56+
* @returns {object} challenge
5657
*/
5758
async function getChallenge (challengeId) {
5859
const url = `${config.CHALLENGE_API_URL}/${challengeId}`
@@ -76,30 +77,64 @@ async function getChallenge (challengeId) {
7677
}
7778
}
7879

80+
/**
81+
* Get challenge by id from Schedule Api
82+
* @param challengeId the challenge id
83+
* @returns {Array} array of events
84+
*/
85+
async function getEventsFromScheduleApi (challengeId) {
86+
const url = `${config.SCHEDULE_API_URL}?externalId=${challengeId}`
87+
88+
logger.debug(`request GET ${url}`)
89+
try {
90+
const res = await axios.get(url)
91+
return res.data || []
92+
} catch (err) {
93+
return []
94+
}
95+
}
96+
7997
/**
8098
* Create events from challenge object
8199
* @param challenge the challenge object
82100
*/
83101
function getEventsFromPhases (challenge) {
84102
const events = []
85-
// for each phase, create 2 events for the scheduledStartDate and scheduledEndDate respectively
103+
const dateBasedEvents = {}
104+
86105
for (const phase of challenge.phases) {
87-
const event = {
88-
phaseId: phase.phaseId,
89-
challengeId: challenge.id
106+
if (!dateBasedEvents[phase.scheduledStartDate]) {
107+
dateBasedEvents[phase.scheduledStartDate] = []
108+
}
109+
if (!dateBasedEvents[phase.scheduledEndDate]) {
110+
dateBasedEvents[phase.scheduledEndDate] = []
111+
}
112+
if (new Date(phase.scheduledEndDate).getTime() >= Date.now() && !phase.isOpen) {
113+
dateBasedEvents[phase.scheduledStartDate].push({
114+
phaseId: phase.phaseId,
115+
isOpen: true
116+
})
117+
}
118+
if (new Date(phase.scheduledStartDate).getTime() <= Date.now() && phase.isOpen) {
119+
dateBasedEvents[phase.scheduledEndDate].push({
120+
phaseId: phase.phaseId,
121+
isOpen: false
122+
})
90123
}
91-
92-
events.push({
93-
...event,
94-
status: 'starting',
95-
scheduleTime: phase.scheduledStartDate
96-
})
97-
events.push({
98-
...event,
99-
status: 'closing',
100-
scheduleTime: phase.scheduledEndDate
101-
})
102124
}
125+
126+
_.each(dateBasedEvents, (eventData, scheduleTime) => {
127+
if (eventData.length > 0) {
128+
events.push({
129+
externalId: challenge.id,
130+
scheduleTime,
131+
payload: {
132+
phases: eventData
133+
}
134+
})
135+
}
136+
})
137+
103138
return events
104139
}
105140

@@ -111,39 +146,59 @@ async function createEventsInExecutor (events) {
111146
const url = config.SCHEDULE_API_URL
112147
const token = await getTopcoderM2Mtoken()
113148

114-
try {
115-
for (const event of events) {
149+
for (const event of events) {
150+
try {
116151
// schedule executor api payload
117152
const executorPayload = {
118-
url: `${config.CHALLENGE_API_URL}/${event.challengeId}`,
153+
url: `${config.CHALLENGE_API_URL}/${event.externalId}`,
154+
externalId: event.externalId,
119155
method: 'patch',
120156
scheduleTime: event.scheduleTime,
121157
headers: {
122158
'content-type': 'application/json',
123159
Authorization: `Bearer ${token}`
124160
},
125-
payload: JSON.stringify({
126-
phases: [{
127-
phaseId: event.phaseId,
128-
isOpen: event.status === 'starting'
129-
}]
130-
})
161+
payload: JSON.stringify(event.payload)
131162
}
132163

133164
// call executor api
134165
logger.debug(`request POST ${url}`)
135166
await axios.post(`${url}`, executorPayload)
167+
} catch (err) {
168+
logger.warn(`Failed to create event for external ID ${event.externalId}`)
169+
logger.error(err.message)
170+
}
171+
}
172+
}
173+
174+
/**
175+
* Delete events in executor app
176+
* @param events the events array
177+
*/
178+
async function deleteEventsInExecutor (events) {
179+
const url = config.SCHEDULE_API_URL
180+
for (const event of events) {
181+
// schedule executor api payload
182+
const executorPayload = {
183+
id: event.id
184+
}
185+
try {
186+
// call executor api
187+
logger.debug(`request DELETE ${url}`)
188+
await axios.delete(`${url}`, { data: executorPayload })
189+
} catch (err) {
190+
logger.warn(`Failed to delete event ${event.id}`)
191+
logger.error(err.message)
136192
}
137-
} catch (err) {
138-
logger.error(err.message)
139-
throw err
140193
}
141194
}
142195

143196
module.exports = {
144197
getKafkaOptions,
145198
getTopcoderM2Mtoken,
146199
getChallenge,
200+
getEventsFromScheduleApi,
147201
getEventsFromPhases,
148-
createEventsInExecutor
202+
createEventsInExecutor,
203+
deleteEventsInExecutor
149204
}

src/services/ProcessorService.js

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
* Processor Service
33
*/
44
const Joi = require('@hapi/joi')
5+
const _ = require('lodash')
56
const logger = require('../common/logger')
67
const helper = require('../common/helper')
78

89
Joi.id = () => Joi.string().uuid().required()
10+
const VALID_CHALLENGE_STATUSES = ['Active']
911

1012
/**
1113
* Process create entity message
@@ -15,6 +17,14 @@ Joi.id = () => Joi.string().uuid().required()
1517
async function processCreate (message) {
1618
// get challenge
1719
const challenge = await helper.getChallenge(message.payload.id)
20+
if (!VALID_CHALLENGE_STATUSES.includes(challenge.status)) {
21+
logger.info(`Not creating events for challenge status ${challenge.status}...`)
22+
return
23+
}
24+
if (!_.get(challenge, 'legacy.useSchedulingAPI')) {
25+
logger.info(`The legacy.useSchedulingAPI is not set on challenge ${challenge.id}...`)
26+
return
27+
}
1828
// create events
1929
const events = helper.getEventsFromPhases(challenge)
2030
// call the executor api
@@ -23,6 +33,30 @@ async function processCreate (message) {
2333
logger.info(`processing of the record completed, id: ${message.payload.id}`)
2434
}
2535

36+
/**
37+
* Process update entity message
38+
* @param {Object} message the kafka message
39+
* @returns {Promise}
40+
*/
41+
async function processUpdate (message) {
42+
const sourceChallenge = await helper.getChallenge(message.payload.id)
43+
if (!VALID_CHALLENGE_STATUSES.includes(sourceChallenge.status)) {
44+
logger.info(`Not creating events for challenge status ${sourceChallenge.status}...`)
45+
return
46+
}
47+
if (!_.get(sourceChallenge, 'legacy.useSchedulingAPI')) {
48+
logger.info(`The legacy.useSchedulingAPI is not set on challenge ${sourceChallenge.id}...`)
49+
return
50+
}
51+
const newEvents = helper.getEventsFromPhases(sourceChallenge)
52+
const oldEvents = await helper.getEventsFromScheduleApi(message.payload.id)
53+
logger.info(`Deleting existing events for challenge ${message.payload.id}`)
54+
await helper.deleteEventsInExecutor(oldEvents)
55+
logger.info(`Creating events for challenge ${message.payload.id}`)
56+
await helper.createEventsInExecutor(newEvents)
57+
logger.info(`processing of the record completed, id: ${message.payload.id}`)
58+
}
59+
2660
processCreate.schema = {
2761
message: Joi.object().keys({
2862
topic: Joi.string().required(),
@@ -34,9 +68,21 @@ processCreate.schema = {
3468
}).required().unknown(true)
3569
}).required()
3670
}
71+
processUpdate.schema = {
72+
message: Joi.object().keys({
73+
topic: Joi.string().required(),
74+
originator: Joi.string().required(),
75+
timestamp: Joi.date().required(),
76+
'mime-type': Joi.string().required(),
77+
payload: Joi.object().keys({
78+
id: Joi.id()
79+
}).required().unknown(true)
80+
}).required()
81+
}
3782

3883
module.exports = {
39-
processCreate
84+
processCreate,
85+
processUpdate
4086
}
4187

4288
logger.buildService(module.exports)

0 commit comments

Comments
 (0)