Skip to content

Commit

Permalink
feat(queue): add mail send operation to a queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jlenon7 committed May 2, 2024
1 parent d85ffc3 commit c8c180b
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 9 deletions.
1 change: 1 addition & 0 deletions bin/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ await Runner.setTsEnv()
.addAssertPlugin()
.addPlugin(request())
.addPlugin(command())
.setForceExit()
.addPath('tests/e2e/**/*.ts')
.addPath('tests/unit/**/*.ts')
.setCliArgs(process.argv.slice(2))
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
},
"athenna": {
"services": [
"#src/helpers/queue",
"#src/services/user.service",
"#src/services/auth.service"
],
Expand All @@ -146,7 +147,8 @@
"@athenna/view/providers/ViewProvider",
"@athenna/mail/providers/MailProvider",
"@athenna/mail/providers/SmtpServerProvider",
"@athenna/view/providers/ViewProvider"
"@athenna/view/providers/ViewProvider",
"#src/providers/queueworker.provider"
],
"controllers": [
"#src/controllers/user.controller",
Expand Down
4 changes: 4 additions & 0 deletions src/config/mail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ export default {
driver: 'smtp',
host: Env('MAIL_HOST', 'localhost'),
port: Env('MAIL_PORT', 587),
auth: {
user: Env('MAIL_USERNAME'),
pass: Env('MAIL_PASSWORD')
},
tls: {
rejectUnauthorized: false
}
Expand Down
146 changes: 146 additions & 0 deletions src/helpers/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { Log } from '@athenna/logger'
import { Service } from '@athenna/ioc'
import { File, Path } from '@athenna/common'

class VanillaQueue<T = any> {
private queueName = 'default'

private getFile() {
const path = Path.storage('queues.json')

return new File(path, JSON.stringify({ default: [], deadletter: [] }))
}

public async truncate() {
const path = Path.storage('queues.json')

return new File(path, '').setContent(
JSON.stringify({ default: [], deadletter: [] })
)
}

public async queue(name: string) {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

this.queueName = name

if (!queues[name]) {
queues[name] = []
}

file.setContentSync(JSON.stringify(queues))

return this
}

public async add(item: T) {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

queues[this.queueName].push(item)

file.setContentSync(JSON.stringify(queues))

return this
}

public async pop() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

if (!queues[this.queueName].length) {
return null
}

const item = queues[this.queueName].shift()

file.setContentSync(JSON.stringify(queues))

return item
}

public async peek() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

if (!queues[this.queueName].length) {
return null
}

return queues[this.queueName][0]
}

public async length() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

return queues[this.queueName].length
}

public async process(processor: (item: T) => any | Promise<any>) {
const data = await this.pop()

try {
await processor(data)
} catch (err) {
console.log(err)
Log.error(
`Adding data of ({yellow} "${this.queueName}") to deadletter queue due to:`,
err
)

const queue = await new QueueImpl().queue('deadletter')

await queue.add({ queue: this.queueName, data })
}
}

public async isEmpty() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

return !queues[this.queueName].length
}
}

@Service({ alias: 'App/Helpers/Queue' })
export class QueueImpl<T = any> {
public driver = new VanillaQueue<T>()

public async truncate() {
await this.driver.truncate()

return this
}

public async queue(name: string) {
await this.driver.queue(name)

return this
}

public async add(item: T) {
await this.driver.add(item)
}

public async pop() {
return this.driver.pop()
}

public async peek() {
return this.driver.peek()
}

public async length() {
return this.driver.length()
}

public async process(cb: (item: T) => any | Promise<any>) {
return this.driver.process(cb)
}

public async isEmpty() {
return this.driver.isEmpty()
}
}
4 changes: 4 additions & 0 deletions src/providers/facades/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { Facade } from '@athenna/ioc'
import type { QueueImpl } from '#src/helpers/queue'

export const Queue = Facade.createFor<QueueImpl>('App/Helpers/Queue')
46 changes: 46 additions & 0 deletions src/providers/queueworker.provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Mail } from '@athenna/mail'
import { Log } from '@athenna/logger'
import { ServiceProvider } from '@athenna/ioc'
import { Queue } from '#src/providers/facades/queue'

export default class QueueWorkerProvider extends ServiceProvider {
public intervals = []

public async boot() {
this.processByQueue('user:register', async user => {
return Mail.from('noreply@athenna.io')
.to(user.email)
.subject('Athenna Account Activation')
.view('mail/register', { user })
.send()
})
}

public async shutdown() {
this.intervals.forEach(interval => clearInterval(interval))
}

public processByQueue(queueName: string, processor: any) {
const interval = setInterval(async () => {
const queue = await Queue.queue(queueName)

if (await queue.isEmpty()) {
return
}

Log.info(`Processing jobs of ({yellow} "${queueName}") queue`)

await queue.process(processor)

const jobsLength = await queue.length()

if (jobsLength) {
Log.info(
`Still has ({yellow} ${jobsLength}) jobs to process on ({yellow} "${queueName}")`
)
}
}, 5000)

this.intervals.push(interval)
}
}
9 changes: 2 additions & 7 deletions src/services/auth.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import bcrypt from 'bcrypt'
import jwt from 'jsonwebtoken'
import { Mail } from '@athenna/mail'
import { Log } from '@athenna/logger'
import { Uuid } from '@athenna/common'
import { Service } from '@athenna/ioc'
import { Config } from '@athenna/config'
import type { User } from '#src/models/user'
import { Queue } from '#src/providers/facades/queue'
import { UnauthorizedException } from '@athenna/http'
import type { UserService } from '#src/services/user.service'

Expand Down Expand Up @@ -47,12 +47,7 @@ export class AuthService {

const user = await this.userService.create(data)

// TODO Move this to a queue
Mail.from('noreply@athenna.io')
.to(user.email)
.subject('Athenna Account Activation')
.view('mail/register', { user })
.send()
await Queue.queue('user:register').then(q => q.add(user))

return user
}
Expand Down
1 change: 1 addition & 0 deletions storage/queues.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"default":[],"deadletter":[],"user:register":[]}
5 changes: 5 additions & 0 deletions tests/e2e/auth.controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Config } from '@athenna/config'
import { SmtpServer } from '@athenna/mail'
import { Database } from '@athenna/database'
import { RoleUser } from '#src/models/roleuser'
import { Queue } from '#src/providers/facades/queue'
import { BaseHttpTest } from '@athenna/core/testing/BaseHttpTest'
import { Test, type Context, AfterAll, BeforeAll } from '@athenna/test'

Expand All @@ -18,6 +19,7 @@ export default class AuthControllerTest extends BaseHttpTest {

@AfterAll()
public async afterAll() {
await Queue.truncate()
await SmtpServer.close()
await User.truncate()
await Role.truncate()
Expand Down Expand Up @@ -131,6 +133,9 @@ export default class AuthControllerTest extends BaseHttpTest {
}
})

const queue = await Queue.queue('user:register')

assert.deepEqual(await queue.length(), 1)
assert.isTrue(await User.exists({ email: 'test@athenna.io' }))
response.assertStatusCode(201)
response.assertBodyContains({
Expand Down
4 changes: 3 additions & 1 deletion tests/unit/auth.service.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import bcrypt from 'bcrypt'
import { Mail } from '@athenna/mail'
import { Uuid } from '@athenna/common'
import { Queue } from '#src/providers/facades/queue'
import { UserService } from '#src/services/user.service'
import { AuthService } from '#src/services/auth.service'
import { NotFoundException, UnauthorizedException } from '@athenna/http'
import { Test, type Context, Mock, AfterEach, BeforeEach } from '@athenna/test'
import { Mail } from '@athenna/mail'

export default class AuthServiceTest {
private userService: UserService
Expand Down Expand Up @@ -83,6 +84,7 @@ export default class AuthServiceTest {
}

Mail.when('send').resolve(undefined)
Queue.when('queue').resolve({ add: () => {} })
Mock.when(this.userService, 'create').resolve(userToRegister)

const authService = new AuthService(this.userService)
Expand Down

0 comments on commit c8c180b

Please sign in to comment.