Skip to content

Commit

Permalink
feat(queue): improve jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jlenon7 committed May 12, 2024
1 parent 842bf92 commit 58e7746
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 108 deletions.
5 changes: 1 addition & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,7 @@
"#src/validators/update.validator"
],
"jobs": [
"#src/jobs/userconfirm.job",
"#src/jobs/useremail.job",
"#src/jobs/useremailpassword.job",
"#src/jobs/userpassword.job"
"#src/jobs/mail.job"
]
}
}
11 changes: 8 additions & 3 deletions src/helpers/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ import { Service } from '@athenna/ioc'
import { Database, type DatabaseImpl } from '@athenna/database'

class VanillaQueue {
public connection: string
private queueName = 'default'
private queues: Record<string, any[]> = {
default: [],
deadletter: []
}

public constructor(connection: string) {
this.connection = connection
}

public async truncate() {
Object.keys(this.queues).forEach(key => (this.queues[key] = []))
}
Expand Down Expand Up @@ -179,11 +184,11 @@ class DatabaseQueue {

@Service({ alias: 'App/Helpers/Queue', type: 'singleton' })
export class QueueImpl {
public driver: any = new VanillaQueue()
public driver: any = new VanillaQueue('vanilla')

public connection(name: string) {
if (name === 'vanilla') {
this.driver = new VanillaQueue()
if (name === 'vanilla' || name === 'default') {
this.driver = new VanillaQueue('vanilla')
}

if (name === 'database') {
Expand Down
19 changes: 19 additions & 0 deletions src/jobs/base.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Queue } from '#src/providers/facades/queue'

export class BaseJob {
public static connection() {
return 'default'
}

public static queue() {
const connection = this.connection()

return Config.get(`queue.connections.${connection}.queue`)
}

public queue() {
const Job = this.constructor as typeof BaseJob

return Queue.connection(Job.connection()).queue(Job.queue())
}
}
31 changes: 31 additions & 0 deletions src/jobs/mail.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Mail } from '@athenna/mail'
import type { User } from '#src/models/user'
import { BaseJob } from '#src/jobs/base.job'

type Item = {
view: string
subject: string
user: User
email: string
password: string
token: string
}

export class MailJob extends BaseJob {
public static queue() {
return 'mail'
}

public async handle(item: Item) {
await Mail.from('noreply@athenna.io')
.to(item.user.email)
.subject(item.subject)
.view(item.view, {
user: item.user,
email: item.email,
password: item.password,
token: item.token
})
.send()
}
}
20 changes: 0 additions & 20 deletions src/jobs/userconfirm.job.ts

This file was deleted.

22 changes: 0 additions & 22 deletions src/jobs/useremail.job.ts

This file was deleted.

23 changes: 0 additions & 23 deletions src/jobs/useremailpassword.job.ts

This file was deleted.

22 changes: 0 additions & 22 deletions src/jobs/userpassword.job.ts

This file was deleted.

3 changes: 1 addition & 2 deletions src/providers/queueworker.provider.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Log } from '@athenna/logger'
import { ServiceProvider } from '@athenna/ioc'
import { Exec, Module } from '@athenna/common'
import { Queue } from '#src/providers/facades/queue'

export default class QueueWorkerProvider extends ServiceProvider {
public intervals = []
Expand All @@ -17,7 +16,7 @@ export default class QueueWorkerProvider extends ServiceProvider {
const job = this.container.transient(Job, alias).use(alias)

const interval = setInterval(async () => {
const queue = Queue.queue(queueName)
const queue = job.queue()

if (queue.isEmpty()) {
return
Expand Down
8 changes: 6 additions & 2 deletions src/services/auth.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { Uuid } from '@athenna/common'
import { Service } from '@athenna/ioc'
import { Config } from '@athenna/config'
import { User } from '#src/models/user'
import { Queue } from '#src/providers/facades/queue'
import { UnauthorizedException } from '@athenna/http'
import type { UserService } from '#src/services/user.service'
import { Queue } from '#src/providers/facades/queue'

@Service()
export class AuthService {
Expand Down Expand Up @@ -47,7 +47,11 @@ export class AuthService {

const user = await this.userService.create(data)

await Queue.queue('user:confirm').add(user)
await Queue.connection('vanilla').queue('mail').add({
user,
view: 'mail/confirm',
subject: 'Athenna Account Confirmation'
})

return user
}
Expand Down
25 changes: 19 additions & 6 deletions src/services/user.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import { Role } from '#src/models/role'
import { RoleUser } from '#src/models/roleuser'
import { RoleEnum } from '#src/enums/role.enum'
import { NotFoundException } from '@athenna/http'
import { Queue } from '#src/providers/facades/queue'
import { Json, type PaginationOptions } from '@athenna/common'
import { Queue } from '#src/providers/facades/queue'

@Service()
export class UserService {
Expand Down Expand Up @@ -62,25 +62,38 @@ export class UserService {

switch (`${isEmailEqual}:${isPasswordEqual}`) {
case 'false:true':
await Queue.queue('user:email').add({ user, token, email: data.email })
await Queue.connection('vanilla').queue('mail').add({
user,
token,
email: data.email,
view: 'mail/change-email',
subject: 'Athenna Email Change'
})

break
case 'true:false':
// TODO create a password_resets table to save the password
data.password = await bcrypt.hash(data.password, 10)

await Queue.queue('user:password').add({
await Queue.connection('vanilla').queue('mail').add({
user,
token,
password: data.password
password: data.password,
view: 'mail/change-password',
subject: 'Athenna Email Change'
})
break
case 'false:false':
// TODO create a password_resets table to save the password
data.password = await bcrypt.hash(data.password, 10)

await Queue.queue('user:email:password').add({
await Queue.connection('vanilla').queue('mail').add({
user,
token,
email: data.email,
password: data.password
password: data.password,
view: 'mail/change-email-password',
subject: 'Athenna Email & Password Change'
})
}

Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/auth.controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export default class AuthControllerTest extends BaseE2ETest {
}
})

const queue = Queue.queue('user:confirm')
const queue = Queue.queue('mail')

assert.deepEqual(await queue.length(), 1)
assert.isTrue(await User.exists({ email: 'test@athenna.io' }))
Expand Down
6 changes: 3 additions & 3 deletions tests/e2e/user.controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export default class UserControllerTest extends BaseE2ETest {

await user.refresh()

const queue = Queue.queue('user:email')
const queue = Queue.queue('mail')

assert.deepEqual(await queue.length(), 1)
assert.deepEqual(user.name, 'Customer Updated')
Expand All @@ -177,7 +177,7 @@ export default class UserControllerTest extends BaseE2ETest {

await user.refresh()

const queue = Queue.queue('user:password')
const queue = Queue.queue('mail')

assert.deepEqual(await queue.length(), 1)
assert.deepEqual(user.name, 'Customer Updated')
Expand All @@ -204,7 +204,7 @@ export default class UserControllerTest extends BaseE2ETest {

await user.refresh()

const queue = Queue.queue('user:email:password')
const queue = Queue.queue('mail')

assert.deepEqual(await queue.length(), 1)
assert.deepEqual(user.name, 'Customer Updated')
Expand Down

0 comments on commit 58e7746

Please sign in to comment.