/
testHelper.js
131 lines (105 loc) · 3.11 KB
/
testHelper.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
const Db = require('../src/db')
const PgBoss = require('../')
const plans = require('../src/plans')
const { COMPLETION_JOB_PREFIX } = plans
const crypto = require('crypto')
const sha1 = (value) => crypto.createHash('sha1').update(value).digest('hex')
module.exports = {
dropSchema,
start,
stop,
getDb,
getArchivedJobById,
countJobs,
findJobs,
COMPLETION_JOB_PREFIX,
getConfig,
getConnectionString,
tryCreateDb,
init
}
function getConnectionString () {
const config = getConfig()
return `postgres://${config.user}:${config.password}@${config.host}:${config.port}/${config.database}`
}
function getConfig (options = {}) {
const config = require('./config.json')
config.host = process.env.POSTGRES_HOST || config.host
config.port = process.env.POSTGRES_PORT || config.port
config.password = process.env.POSTGRES_PASSWORD || config.password
if (options.testKey) {
config.schema = `pgboss${sha1(options.testKey).slice(-10)}`
}
config.schema = config.schema || 'pgboss'
const result = { ...config }
return Object.assign(result, options)
}
async function init () {
const { database } = getConfig()
await tryCreateDb(database)
await createPgCrypto(database)
}
async function getDb (database) {
const config = getConfig()
config.database = database || config.database
const db = new Db(config)
await db.open()
return db
}
async function createPgCrypto (database) {
const db = await getDb(database)
await db.executeSql('create extension if not exists pgcrypto')
await db.close()
}
async function dropSchema (schema) {
const db = await getDb()
await db.executeSql(`DROP SCHEMA IF EXISTS ${schema} CASCADE`)
await db.close()
}
async function findJobs (schema, where, values) {
const db = await getDb()
const jobs = await db.executeSql(`select * from ${schema}.job where ${where}`, values)
await db.close()
return jobs
}
async function getArchivedJobById (schema, id) {
const response = await findArchivedJobs(schema, 'id = $1', [id])
return response.rows.length ? response.rows[0] : null
}
async function findArchivedJobs (schema, where, values) {
const db = await getDb()
const result = await db.executeSql(`select * from ${schema}.archive where ${where}`, values)
await db.close()
return result
}
async function countJobs (schema, where, values) {
const db = await getDb()
const result = await db.executeSql(`select count(*) as count from ${schema}.job where ${where}`, values)
await db.close()
return parseFloat(result.rows[0].count)
}
async function tryCreateDb (database) {
const db = await getDb('postgres')
try {
await db.executeSql(`CREATE DATABASE ${database}`)
} catch {} finally {
await db.close()
}
}
async function start (options) {
try {
options = getConfig(options)
const boss = new PgBoss(options)
boss.on('error', err => console.log({ schema: options.schema, message: err.message }))
await boss.start()
return boss
} catch (err) {
// this is nice for occaisional debugging, Mr. Linter
if (err) {
throw err
}
}
}
async function stop (boss, timeout = 4000) {
await boss.stop({ timeout })
}