Skip to content

Commit

Permalink
feat(queue): implement database driver
Browse files Browse the repository at this point in the history
  • Loading branch information
jlenon7 committed May 11, 2024
1 parent 9663fb4 commit 426822f
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 77 deletions.
195 changes: 140 additions & 55 deletions src/helpers/queue.ts
Original file line number Diff line number Diff line change
@@ -1,126 +1,211 @@
import { Log } from '@athenna/logger'
import { Service } from '@athenna/ioc'
import { File, Path } from '@athenna/common'
import { Database, type DatabaseImpl } from '@athenna/database'

class VanillaQueue<T = any> {
class VanillaQueue {
private queueName = 'default'
private queues: Record<string, any[]> = {
default: [],
deadletter: []
}

public async truncate() {
Object.keys(this.queues).forEach(key => (this.queues[key] = []))
}

private getFile() {
const path = Path.storage('queues.json')
public queue(name: string) {
this.queueName = name

if (!this.queues[name]) {
this.queues[name] = []
}

return new File(path, JSON.stringify({ default: [], deadletter: [] }))
return this
}

public async truncate() {
const path = Path.storage('queues.json')
public async add(item: unknown) {
this.queues[this.queueName].push(item)

return new File(path, '').setContent(
JSON.stringify({ default: [], deadletter: [] }, null, 2)
)
return this
}

public async queue(name: string) {
const file = this.getFile()
const queues = file.getContentAsJsonSync()
public async pop() {
if (!this.queues[this.queueName].length) {
return null
}

this.queueName = name
return this.queues[this.queueName].shift()
}

if (!queues[name]) {
queues[name] = []
public async peek() {
if (!this.queues[this.queueName].length) {
return null
}

file.setContentSync(JSON.stringify(queues))
return this.queues[this.queueName][0]
}

return this
public async length() {
return this.queues[this.queueName].length
}

public async add(item: T) {
const file = this.getFile()
const queues = file.getContentAsJsonSync()
public async isEmpty() {
return !this.queues[this.queueName].length
}

queues[this.queueName].push(item)
public async process(processor: (item: unknown) => any | Promise<any>) {
const data = await this.pop()

file.setContentSync(JSON.stringify(queues))
try {
await processor(data)
} catch (err) {
Log.error(
`Adding data of ({yellow} "${this.queueName}") to deadletter queue due to:`,
err
)

this.queues.deadletter.push({ queue: this.queueName, data })
}
}
}

class DatabaseQueue {
private DB: DatabaseImpl
private dbConnection: string

private table: string
private queueName: string
private connection: string
private deadLetterQueueName: string

public constructor(connection: string) {
const {
table,
queue,
deadletter,
connection: dbConnection
} = Config.get(`database.connections.${connection}`)

this.table = table
this.queueName = queue
this.connection = connection
this.dbConnection = dbConnection
this.deadLetterQueueName = deadletter

this.DB = Database.connection(this.dbConnection)
}

public async truncate() {
await this.DB.truncate(this.table)
}

public queue(name: string) {
this.queueName = name

return this
}

public async add(item: unknown) {
await this.DB.table(this.table).create({
queue: this.queueName,
item
})

return this
}

public async pop() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()
const data = await this.DB.table(this.table)
.where('queue', this.queueName)
.orderBy('id', 'DESC')
.find()

if (!queues[this.queueName].length) {
return null
if (!data) {
return
}

const item = queues[this.queueName].shift()
await this.DB.table(this.table)
.where('id', data.id)
.where('queue', this.queueName)
.delete()

file.setContentSync(JSON.stringify(queues))

return item
return data.item
}

public async peek() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()
const data = await this.DB.table(this.table)
.where('queue', this.queueName)
.orderBy('id', 'DESC')
.find()

if (!queues[this.queueName].length) {
if (!data) {
return null
}

return queues[this.queueName][0]
return data.item
}

public async length() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()

return queues[this.queueName].length
public length() {
return this.DB.table(this.table).where('queue', this.queueName).count()
}

public async process(processor: (item: T) => any | Promise<any>) {
public async process(processor: (item: unknown) => any | Promise<any>) {
const data = await this.pop()

try {
await processor(data)
} catch (err) {
console.log(err)
Log.error(
`Adding data of ({yellow} "${this.queueName}") to deadletter queue due to:`,
err
)

const queue = await new QueueImpl().queue('deadletter')

await queue.add({ queue: this.queueName, data })
await this.DB.table(this.table).create({
queue: this.deadLetterQueueName,
formerQueue: this.queueName,
item: data
})
}
}

public async isEmpty() {
const file = this.getFile()
const queues = file.getContentAsJsonSync()
const count = await this.DB.table(this.table)
.where('queue', this.queueName)
.count()

return !queues[this.queueName].length
return count === '0'
}
}

@Service({ alias: 'App/Helpers/Queue' })
export class QueueImpl<T = any> {
public driver = new VanillaQueue<T>()
@Service({ alias: 'App/Helpers/Queue', type: 'singleton' })
export class QueueImpl {
public driver: any = new VanillaQueue()

public connection(name: string) {
if (name === 'vanilla') {
this.driver = new VanillaQueue()
}

if (name === 'database') {
this.driver = new DatabaseQueue('queue')
}

return this
}

public async truncate() {
await this.driver.truncate()

return this
}

public async queue(name: string) {
await this.driver.queue(name)
public queue(name: string) {
this.driver.queue(name)

return this
}

public async add(item: T) {
public async add(item: unknown) {
await this.driver.add(item)
}

Expand All @@ -136,7 +221,7 @@ export class QueueImpl<T = any> {
return this.driver.length()
}

public async process(cb: (item: T) => any | Promise<any>) {
public async process(cb: (item: unknown) => any | Promise<any>) {
return this.driver.process(cb)
}

Expand Down
6 changes: 3 additions & 3 deletions src/providers/queueworker.provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ export default class QueueWorkerProvider extends ServiceProvider {
processor: (data: any) => any | Promise<any>
) {
const interval = setInterval(async () => {
const queue = await Queue.queue(queueName)
const queue = Queue.queue(queueName)

if (await queue.isEmpty()) {
if (queue.isEmpty()) {
return
}

Log.info(`Processing jobs of ({yellow} "${queueName}") queue`)

await queue.process(processor)

const jobsLength = await queue.length()
const jobsLength = queue.length()

if (jobsLength) {
Log.info(
Expand Down
2 changes: 1 addition & 1 deletion src/services/auth.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class AuthService {

const user = await this.userService.create(data)

await Queue.queue('user:confirm').then(q => q.add(user))
await Queue.queue('user:confirm').add(user)

return user
}
Expand Down
21 changes: 12 additions & 9 deletions src/services/user.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,26 @@ export class UserService {

switch (`${isEmailEqual}:${isPasswordEqual}`) {
case 'false:true':
await Queue.queue('user:email').then(q =>
q.add({ user, token, email: data.email })
)
await Queue.queue('user:email').add({ user, token, email: data.email })
break
case 'true:false':
data.password = await bcrypt.hash(data.password, 10)

await Queue.queue('user:password').then(q =>
q.add({ user, token, password: data.password })
)
await Queue.queue('user:password').add({
user,
token,
password: data.password
})
break
case 'false:false':
data.password = await bcrypt.hash(data.password, 10)

await Queue.queue('user:email:password').then(q =>
q.add({ user, token, email: data.email, password: data.password })
)
await Queue.queue('user:email:password').add({
user,
token,
email: data.email,
password: data.password
})
}

data = Json.omit(data, ['email', 'password'])
Expand Down
4 changes: 0 additions & 4 deletions storage/queues.json

This file was deleted.

2 changes: 1 addition & 1 deletion tests/e2e/auth.controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export default class AuthControllerTest extends BaseE2ETest {
}
})

const queue = await Queue.queue('user:confirm')
const queue = Queue.queue('user:confirm')

assert.deepEqual(await queue.length(), 1)
assert.isTrue(await User.exists({ email: 'test@athenna.io' }))
Expand Down
6 changes: 3 additions & 3 deletions tests/e2e/user.controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ export default class UserControllerTest extends BaseE2ETest {

await user.refresh()

const queue = await Queue.queue('user:email')
const queue = Queue.queue('user:email')

assert.deepEqual(await queue.length(), 1)
assert.deepEqual(user.name, 'Customer Updated')
Expand All @@ -177,7 +177,7 @@ export default class UserControllerTest extends BaseE2ETest {

await user.refresh()

const queue = await Queue.queue('user:password')
const queue = Queue.queue('user:password')

assert.deepEqual(await queue.length(), 1)
assert.deepEqual(user.name, 'Customer Updated')
Expand All @@ -204,7 +204,7 @@ export default class UserControllerTest extends BaseE2ETest {

await user.refresh()

const queue = await Queue.queue('user:email:password')
const queue = Queue.queue('user:email:password')

assert.deepEqual(await queue.length(), 1)
assert.deepEqual(user.name, 'Customer Updated')
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/auth.service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export default class AuthServiceTest {
}

Mail.when('send').resolve(undefined)
Queue.when('queue').resolve({ add: () => {} })
Queue.when('queue').return({ add: () => {} })
Mock.when(this.userService, 'create').resolve(userToRegister)

const authService = new AuthService(this.userService)
Expand Down

0 comments on commit 426822f

Please sign in to comment.