Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add payment queue and jobs. #1165

Merged
merged 11 commits into from
Jun 16, 2020
10 changes: 10 additions & 0 deletions src/common/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,13 @@ export const QUEUE_JOB = {

// Migration
migration: 'migration',

// Payment
payout: 'payout',
payTo: 'payTo',

// Appreciation
appreciation: 'appreciation',
}

export const QUEUE_NAME = {
Expand All @@ -339,6 +346,9 @@ export const QUEUE_NAME = {
likecoin: 'likecoin',
user: 'user',
migration: 'migration',
payout: 'payout',
payTo: 'payTo',
appreciation: 'appreciation',
}

export const QUEUE_CONCURRENCY = {
Expand Down
9 changes: 9 additions & 0 deletions src/common/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,12 @@ export class PaymentPasswordNotSetError extends ApolloError {
})
}
}

export class PaymentQueueJobDataError extends ApolloError {
constructor(message: string) {
super(message, 'PAYMENT_QUEUE_JOB_DATA_ERROR')
Object.defineProperty(this, 'name', {
value: 'PaymentQueueJobDataError',
})
}
}
2 changes: 1 addition & 1 deletion src/connectors/paymentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ export class PaymentService extends BaseService {
if (!result || !result[0]) {
return 0
}
return Math.max(parseInt(result[0].amount || 0, 10), 0)
return parseInt(result[0].amount || 0, 10)
}

countPendingPayouts = async ({ userId }: { userId: string }) => {
Expand Down
154 changes: 154 additions & 0 deletions src/connectors/queue/appreciation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import Queue from 'bull'

import {
APPRECIATION_TYPES,
NODE_TYPES,
QUEUE_JOB,
QUEUE_NAME,
QUEUE_PRIORITY,
} from 'common/enums'
import { environment } from 'common/environment'
import {
ActionLimitExceededError,
ArticleNotFoundError,
ForbiddenError,
UserNotFoundError,
} from 'common/errors'
import logger from 'common/logger'
import { toGlobalId } from 'common/utils'

import { BaseQueue } from './baseQueue'
import { likeCoinQueue } from './likecoin'

interface AppreciationParams {
amount: number
articleId: string
senderId: string
}

class AppreciationQueue extends BaseQueue {
constructor() {
super(QUEUE_NAME.appreciation)
this.addConsumers()
}

/**
* Producer for appreciation.
*
*/
appreciate = ({ amount, articleId, senderId }: AppreciationParams) => {
return this.q.add(
QUEUE_JOB.appreciation,
{ amount, articleId, senderId },
{
priority: QUEUE_PRIORITY.NORMAL,
removeOnComplete: true,
}
)
}

/**
* Consumers. Process a job at a time, so concurrency set as 1.
*
* @see https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueprocess
*/
private addConsumers = () => {
this.q.process(QUEUE_JOB.appreciation, 1, this.handleAppreciation)
}

/**
* Appreciation handler.
*
*/
private handleAppreciation: Queue.ProcessCallbackFunction<unknown> = async (
job,
done
) => {
try {
const { amount, articleId, senderId } = job.data as AppreciationParams

if (!articleId || !senderId) {
throw new Error('appreciation job has no required data')
}

const article = await this.articleService.baseFindById(articleId)
if (!article) {
throw new ArticleNotFoundError('article does not exist')
}
if (article.authorId === senderId) {
throw new ForbiddenError('cannot appreciate your own article')
}

// check appreciate left
const appreciateLeft = await this.articleService.appreciateLeftByUser({
articleId,
userId: senderId,
})
if (appreciateLeft <= 0) {
throw new ActionLimitExceededError('too many appreciations')
}

// check if amount exceeded limit. if yes, then use the left amount.
const validAmount = Math.min(amount, appreciateLeft)

const [author, sender] = await Promise.all([
this.userService.baseFindById(article.authorId),
this.userService.baseFindById(senderId),
])

if (!author || !sender) {
throw new UserNotFoundError('user not found')
}

if (!author.likerId || !sender.likerId) {
throw new ForbiddenError('user has no liker id')
}

// insert appreciation record
await this.articleService.appreciate({
articleId: article.id,
senderId,
recipientId: article.authorId,
amount: validAmount,
type: APPRECIATION_TYPES.like,
})

// insert record to LikeCoin
likeCoinQueue.like({
likerId: sender.likerId,
likerIp: sender.ip,
authorLikerId: author.likerId,
url: `${environment.siteDomain}/@${author.userName}/${article.slug}-${article.mediaHash}`,
amount: validAmount,
})

// trigger notifications
this.notificationService.trigger({
event: 'article_new_appreciation',
actorId: sender.id,
recipientId: author.id,
entities: [
{
type: 'target',
entityTable: 'article',
entity: article,
},
],
})

// invalidate cache
if (this.cacheService) {
this.cacheService.invalidateFQC(NODE_TYPES.article, article.id)
this.cacheService.invalidateFQC(NODE_TYPES.user, article.authorId)
}

job.progress(100)
done(null, job.data)
} catch (error) {
logger.error(error)
done(error)
}
}
}

export const appreciationQueue = new AppreciationQueue()
3 changes: 3 additions & 0 deletions src/connectors/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ export * from './likecoin'
export * from './user'
export * from './refreshViews'
export * from './emails'
export * from './payout'
export * from './payTo'
export * from './appreciation'
Loading