Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Identity - Data Processor

Creates user in u-bahn when they sign up on Topcoder. Also updates their availability status in u-bahn based on their account activation status in Topcoder

## Dependencies

- Nodejs(v12+)
Expand Down Expand Up @@ -33,6 +35,7 @@ The following parameters can be set in config files or in env variables:
- ATTRIBUTE_GROUP_NAME: The attribute group name
- SKILL_PROVIDER_NAME: The skill provider name. Not in use anymore. Retained for any future use
- ORGANIZATION_NAME: The organization name
- MEMBER_PROFILE_URL_PREFIX: The member's profile url prefix. Defaults to `'https://www.topcoder.com/members/'` - don't forget the `/` at the end

There is a `/health` endpoint that checks for the health of the app. This sets up an expressjs server and listens on the environment variable `PORT`. It's not part of the configuration file and needs to be passed as an environment variable

Expand Down
68 changes: 29 additions & 39 deletions VERIFICATION.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,6 @@
# Verification

```
{
"topic":"identity.notification.create",
"originator":"u-bahn-api",
"timestamp":"2019-07-08T00:00:00.000Z",
"mime-type":"application/json",
"payload":{
"id":"90064000",
"modifiedBy":null,
"modifiedAt":"2021-01-05T14:01:40.336Z",
"createdBy":null,
"createdAt":"2021-01-05T14:01:40.336Z",
"handle":"theuserhandle",
"email":"foo@bar.com",
"firstName":"theuserfirstname",
"lastName":"theuserlastname",
"credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},
"profiles":null,
"status":"A",
"country":{
"isoAlpha3Code": "IND"
},
"regSource":"null",
"utmSource":"null",
"utmMedium":"null",
"utmCampaign":"null",
"roles":null,
"ssoLogin":false,
"active":true,
"profile":null,
"emailActive":true
}
}
```

Additionally, you will be entering the messages into only one topic:
You will be entering the messages into only one topic:

```
docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic identity.notification.create
Expand All @@ -45,6 +10,31 @@ docker exec -it identity-data-processor_kafka /opt/kafka/bin/kafka-console-produ
2. write message:
`{"recipients":[],"notificationType":"useractivation"}`
3. Watch the app console, It will show error message.
4. write message:
`{"topic":"identity.notification.create","originator":"u-bahn-api","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"90064000","modifiedBy":null,"modifiedAt":"2021-01-05T14:01:40.336Z","createdBy":null,"createdAt":"2021-01-05T14:01:40.336Z","handle":"theuserhandle","email":"foo@bar.com","firstName":"theuserfirstname","lastName":"theuserlastname","credential":{"activationCode":"FOOBAR2","resetToken":null,"hasPassword":false},"profiles":null,"status":"A","country":{"isoAlpha3Code":"IND"},"regSource":"null","utmSource":"null","utmMedium":"null","utmCampaign":"null","roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}`
5. Watch the app console, It will show message successfully handled.
4. Write message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":null,"createdBy":null,"createdAt":null,"handle":"theuserhandle","email":"theuserhandle@gmail.com","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"U","country":{"code":"040","name":"Austria","isoAlpha2Code":"AT","isoAlpha3Code":"AUT"},"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":false,"profile":null,"emailActive":false}}`
5. Watch the app console. It will show message successfully handled. The log should look like:
```
info: user: theuserhandle created
debug: Sleeping for 1000 ms
info: external profile: 36ed815b-3da1-49f1-a043-aaed0a4e81ad created
debug: Sleeping for 1000 ms
info: user attribute: isAvailable created
debug: Sleeping for 1000 ms
info: user attribute: company created
debug: Sleeping for 1000 ms
info: user attribute: title created
debug: Sleeping for 1000 ms
info: user attribute: location created
debug: Sleeping for 1000 ms
info: user attribute: email created
debug: EXIT handle
debug: Successfully processed message
debug: Commiting offset after processing message
```
6. Now, write the following message: `{"topic":"identity.notification.create","originator":"identity-service","timestamp":"2019-07-08T00:00:00.000Z","mime-type":"application/json","payload":{"id":"10000001","modifiedBy":null,"modifiedAt":"2021-04-07T15:02:18.72Z","createdBy":null,"createdAt":"2021-04-07T15:02:18.72Z","handle":"theuserhandle","email":"theuserhandle@gmail.com","firstName":"User","lastName":"Member","credential":{"activationCode":"ABCDEFGHIJK","resetToken":null,"hasPassword":true},"profiles":null,"status":"A","country":null,"regSource":null,"utmSource":null,"utmMedium":null,"utmCampaign":null,"roles":null,"ssoLogin":false,"active":true,"profile":null,"emailActive":true}}`
7. Watch the app console. It will show message successfully handled. The log should look like:
```
info: user attribute: isAvailable updated
debug: EXIT handle
debug: Successfully processed message
debug: Commiting offset after processing message
```
4 changes: 3 additions & 1 deletion config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ module.exports = {
ORGANIZATION_NAME: process.env.ORGANIZATION_NAME,
ATTRIBUTE_GROUP_NAME: process.env.ATTRIBUTE_GROUP_NAME,
SKILL_PROVIDER_NAME: process.env.SKILL_PROVIDER_NAME,
MEMBERS_API_URL: process.env.MEMBERS_API_URL || 'https://api.topcoder-dev.com/v5/members'
MEMBERS_API_URL: process.env.MEMBERS_API_URL || 'https://api.topcoder-dev.com/v5/members',

MEMBER_PROFILE_URL_PREFIX: process.env.MEMBER_PROFILE_URL_PREFIX || 'https://www.topcoder.com/members/'
}
2 changes: 1 addition & 1 deletion src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const dataHandler = (messageSet, topic, partition) => Promise.each(messageSet, a
return
}
try {
await ProcessorService.processCreate(messageJSON)
await ProcessorService.handle(messageJSON)
logger.debug('Successfully processed message')
} catch (err) {
logger.logFullError(err)
Expand Down
51 changes: 50 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,30 @@ async function getTopcoderToken () {
return topcoderM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
}

/**
* Retrieves the user from u-bahn using their handle
* Returns null if no user exists, the user id otherwise
* @param {String} handle The member handle
* @param {String} token The auth token
*/
async function getUserId (handle, token) {
const res = await axios.get(`${config.UBAHN_API_URL}/users`, {
headers: {
Authorization: `Bearer ${token}`
},
params: {
handle
}
})
const user = res.data.filter(u => u.handle === handle)[0]

if (user) {
return user.id
}

return null
}

/**
* Create a new User
* @param {Object} body
Expand Down Expand Up @@ -178,6 +202,17 @@ async function createUserAttribute (userId, attributeId, value, token) {
await axios.post(`${config.UBAHN_API_URL}/users/${userId}/attributes`, { attributeId, value }, { headers: { Authorization: `Bearer ${token}` } })
}

/**
* Create user attribute
* @param {String} userId
* @param {String} attributeId
* @param {String} value
* @param {String} token
*/
async function updateUserAttribute (userId, attributeId, value, token) {
await axios.patch(`${config.UBAHN_API_URL}/users/${userId}/attributes/${attributeId}`, { value }, { headers: { Authorization: `Bearer ${token}` } })
}

/**
* Create external profile
* @param {String} userId
Expand All @@ -188,6 +223,17 @@ async function createExternalProfile (userId, body, token) {
await axios.post(`${config.UBAHN_API_URL}/users/${userId}/externalProfiles`, body, { headers: { Authorization: `Bearer ${token}` } })
}

/**
* Update external profile
* @param {String} userId
* @param {String} organizationId
* @param {Object} body
* @param {String} token
*/
async function updateExternalProfile (userId, organizationId, body, token) {
await axios.patch(`${config.UBAHN_API_URL}/users/${userId}/externalProfiles/${organizationId}`, body, { headers: { Authorization: `Bearer ${token}` } })
}

/**
* Create user skill
* * Unused for now. Retained for any future use
Expand All @@ -214,5 +260,8 @@ module.exports = {
createUser,
createUserAttribute,
createExternalProfile,
createUserSkill
createUserSkill,
getUserId,
updateUserAttribute,
updateExternalProfile
}
90 changes: 70 additions & 20 deletions src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,68 @@ const _ = require('lodash')
const Joi = require('@hapi/joi')
const logger = require('../common/logger')
const helper = require('../common/helper')
const config = require('config')

/**
* Process identity create entity message
* @param {Object} message the kafka message
*/
async function processCreate (message) {
async function handle (message) {
// Check if the user already exists in u-bahn
// If yes, then proceed to only update the availability status
// If not, then proceed to create the user and other associated data in u-bahn
const ubahnToken = await helper.getUbahnToken()
const userId = await helper.getUserId(message.payload.handle, ubahnToken)

if (userId) {
await processUpdate(message, userId, ubahnToken)
} else {
await processCreate(message, ubahnToken)
}
}

handle.schema = {
message: Joi.object().keys({
topic: Joi.string().required(),
originator: Joi.string().required(),
timestamp: Joi.date().required(),
'mime-type': Joi.string().required(),
payload: Joi.object().keys({
id: Joi.string().required(),
handle: Joi.string().required(),
firstName: Joi.string().required(),
lastName: Joi.string().required(),
email: Joi.string().email().required(),
country: Joi.object().keys({
isoAlpha3Code: Joi.string().required()
}).unknown(true).allow(null),
active: Joi.boolean()
}).required().unknown(true)
}).required().unknown(true)
}

/**
* Create the user and associated data in u-bahn
* @param {Object} message the kafka message
* @param {String} ubahnToken the auth token
*/
async function processCreate (message, ubahnToken) {
const organizationId = await helper.getOrganizationId(ubahnToken)
const attributes = await helper.getAttributes(ubahnToken)
const location = message.payload.country.isoAlpha3Code

const userId = await helper.createUser(_.pick(message.payload, 'handle', 'firstName', 'lastName'), ubahnToken)
logger.info(`user: ${message.payload.handle} created`)
helper.sleep()
await helper.createExternalProfile(userId, { organizationId, uri: 'uri', externalId: message.payload.id, isInactive: false }, ubahnToken)
await helper.createExternalProfile(userId, {
organizationId,
uri: `${config.MEMBER_PROFILE_URL_PREFIX}${message.payload.handle}`,
externalId: message.payload.id,
isInactive: !message.payload.active
}, ubahnToken)
logger.info(`external profile: ${organizationId} created`)
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), 'true', ubahnToken)
await helper.createUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken)
logger.info('user attribute: isAvailable created')
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'company'), 'Topcoder', ubahnToken)
Expand All @@ -34,28 +78,34 @@ async function processCreate (message) {
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'location'), location, ubahnToken)
logger.info('user attribute: location created')

// Custom attribute. May or may not exist
if (_.get(attributes, 'email')) {
helper.sleep()
await helper.createUserAttribute(userId, _.get(attributes, 'email'), message.payload.email, ubahnToken)
logger.info('user attribute: email created')
}
}

processCreate.schema = {
message: Joi.object().keys({
topic: Joi.string().required(),
originator: Joi.string().required(),
timestamp: Joi.date().required(),
'mime-type': Joi.string().required(),
payload: Joi.object().keys({
id: Joi.string().required(),
handle: Joi.string().required(),
firstName: Joi.string().required(),
lastName: Joi.string().required(),
country: Joi.object().keys({
isoAlpha3Code: Joi.string().required()
}).required().unknown(true)
}).required().unknown(true)
}).required().unknown(true)
/**
* Updates the user's availability status in u-bahn
* @param {Object} message the kafka message
* @param {String} userId the u-bahn user id
* @param {String} ubahnToken the auth token
*/
async function processUpdate (message, userId, ubahnToken) {
const organizationId = await helper.getOrganizationId(ubahnToken)
const attributes = await helper.getAttributes(ubahnToken)

await helper.updateExternalProfile(userId, organizationId, { isInactive: !message.payload.active }, ubahnToken)
logger.info('user attribute: isAvailable updated')

await helper.updateUserAttribute(userId, _.get(attributes, 'isAvailable'), message.payload.active.toString(), ubahnToken)
logger.info('user attribute: isAvailable updated')
}

module.exports = {
processCreate
handle
}

logger.buildService(module.exports)