Skip to content

Commit

Permalink
Postgres disconnect working
Browse files Browse the repository at this point in the history
  • Loading branch information
heynemann committed Nov 8, 2016
1 parent 37fd66f commit 403eeac
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 8 deletions.
40 changes: 37 additions & 3 deletions src/api/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import Logger from '../extensions/logger'
import { AppHandler, AppsHandler } from './handlers/app'
import HealthcheckHandler from './handlers/healthcheck'
import { TemplateHandler, TemplatesHandler } from './handlers/template'
import { connect as redisConnect } from '../extensions/redis'
import { connect as pgConnect } from '../extensions/postgresql'
import { connect as kafkaClientConnect } from '../extensions/kafkaClient'
import { connect as redisConnect, disconnect as redisDisconnect } from '../extensions/redis'
import { connect as pgConnect, disconnect as pgDisconnect } from '../extensions/postgresql'
import { connect as kafkaClientConnect, disconnect as kafkaClientDisconnect } from '../extensions/kafkaClient'
import { connect as kafkaProducerConnect } from '../extensions/kafkaProducer'

process.setMaxListeners(60)
Expand Down Expand Up @@ -87,6 +87,11 @@ export default class MarathonApp {
}
}

async stopRedis() {
await redisDisconnect(this.redisClient)
this.redisClient = null
}

async configurePostgreSQL() {
try {
this.db = await pgConnect(
Expand All @@ -99,6 +104,11 @@ export default class MarathonApp {
}
}

async stopPostgreSQL() {
await pgDisconnect(this.db)
this.db = null
}

async configureKafka() {
try {
this.logger.debug('Connecting API Kafka client...')
Expand All @@ -117,6 +127,12 @@ export default class MarathonApp {
}
}

async stopKafka() {
await kafkaClientDisconnect(this.apiKafkaClient)
this.apiKafkaClient = null
this.apiKafkaProducer = null
}

async initializeServices() {
try {
this.logger.debug('Starting redis configuration...')
Expand All @@ -130,6 +146,20 @@ export default class MarathonApp {
}
}

async stopServices() {
try {
this.logger.debug('Stopping redis...')
await this.stopRedis()
this.logger.debug('Stopping PostgreSQL...')
await this.stopPostgreSQL()
this.logger.debug('Stopping Kafka...')
await this.stopKafka()
} catch (err) {
this.exit(err)
}
}


configureMiddleware() {
this.koaApp.use(koaBodyparser())
this.koaApp.use(async (ctx, next) => {
Expand Down Expand Up @@ -179,4 +209,8 @@ export default class MarathonApp {
this.logger.info(`Listening on port ${PORT}...`)
this.koaApp.listen(PORT)
}

async stop() {
await this.stopServices()
}
}
10 changes: 10 additions & 0 deletions src/extensions/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ export async function connect(url, clientId, logger) {
logr.info('Successfully connected to kafka.')
return kafkaClient
}

export async function disconnect(client) {
const hasDisconnected = new Promise((resolve) => {
client.close(() => {
resolve()
})
})

await hasDisconnected
}
17 changes: 15 additions & 2 deletions src/extensions/postgresql.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ export async function connect(pgUrl, options, logger) {
}
}

export async function disconnect(client) {
await client.connectionManager.disconnect()
export async function disconnect(db) {
const hasDisconnected = new Promise((resolve) => {
const manager = db.client.connectionManager
if (manager.pool) {
manager.pool.drain(() => {
manager.pool.destroyAllNow(() => {
resolve()
})
})
} else {
resolve()
}
})

await hasDisconnected
}
25 changes: 24 additions & 1 deletion src/worker/app.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Logger from '../extensions/logger'
import { connect as redisConnect } from '../extensions/redis'
import { connect as pgConnect } from '../extensions/postgresql'
import { connect as kafkaClientConnect } from '../extensions/kafkaClient'
import { connect as kafkaClientConnect, disconnect as kafkaClientDisconnect } from '../extensions/kafkaClient'
import { connect as kafkaProducerConnect } from '../extensions/kafkaProducer'

const timeout = ms => new Promise(resolve => setTimeout(resolve, ms))
Expand Down Expand Up @@ -77,6 +77,12 @@ export default class MarathonWorkerApp {
}
}

async stopKafka() {
await kafkaClientDisconnect()
this.apiKafkaClient = null
this.apiKafkaProducer = null
}

async initializeServices() {
try {
this.logger.debug('Starting redis configuration...')
Expand All @@ -90,6 +96,19 @@ export default class MarathonWorkerApp {
}
}

async stopServices() {
try {
// this.logger.debug('Starting redis configuration...')
// await this.configureRedis()
// this.logger.debug('Starting PostgreSQL configuration...')
// await this.configurePostgreSQL()
this.logger.debug('Starting Kafka configuration...')
await this.stopKafka()
} catch (err) {
this.exit(err)
}
}

async initializeWorker() {
await this.initializeServices()
}
Expand Down Expand Up @@ -128,4 +147,8 @@ export default class MarathonWorkerApp {
if (!res) await timeout(timeoutInMs)
}
}

async stop() {
await this.stopServices()
}
}
4 changes: 4 additions & 0 deletions test/unit/api/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ export async function beforeEachFunc(self) {
self.request = sap.agent(self.app.koaApp.listen())
await self.app.run()
}

export async function afterEachFunc(self) {
await self.app.stop()
}
6 changes: 5 additions & 1 deletion test/unit/api/handlers/appTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// http://www.opensource.org/licenses/mit-license
// Copyright © 2016 Top Free Games <backend@tfgco.com>

import { expect, beforeEachFunc } from '../common'
import { expect, beforeEachFunc, afterEachFunc } from '../common'
import uuid from 'uuid'

describe('API', () => {
Expand All @@ -14,6 +14,10 @@ describe('API', () => {
await beforeEachFunc(this)
})

afterEach(async function () {
await afterEachFunc(this)
})

describe('GET', () => {
it('should return 200 and an empty list of apps if there are no apps', async function () {
await this.app.db.App.destroy({ truncate: true, cascade: true })
Expand Down
6 changes: 5 additions & 1 deletion test/unit/api/handlers/healthcheckTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// http://www.opensource.org/licenses/mit-license
// Copyright © 2016 Top Free Games <backend@tfgco.com>

import { expect, beforeEachFunc } from '../common'
import { expect, beforeEachFunc, afterEachFunc } from '../common'

describe('API', () => {
describe('Handlers', () => {
Expand All @@ -13,6 +13,10 @@ describe('API', () => {
await beforeEachFunc(this)
})

afterEach(async function () {
await afterEachFunc(this)
})

it('should return 200 if all services up', async function () {
const res = await this.request.get('/healthcheck')
expect(res.status).to.equal(200)
Expand Down
4 changes: 4 additions & 0 deletions test/unit/worker/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ export async function beforeEachFunc(self) {
self.worker = new MarathonWorkerApp(config)
await self.worker.initializeWorker()
}

export async function afterEachFunc(self) {
await self.worker.stop()
}

0 comments on commit 403eeac

Please sign in to comment.