Skip to content

Commit

Permalink
Remove obsolete and useless code (#194)
Browse files Browse the repository at this point in the history
* Remove obsolete and useless code

* Fix documentation of sleep function
  • Loading branch information
jsone-studios committed Sep 16, 2020
1 parent 2155ec2 commit 92a45f4
Show file tree
Hide file tree
Showing 27 changed files with 106 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,5 +267,5 @@ const runtimeParameters = {
}

function sleep (ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
return new Promise(resolve => setTimeout(resolve, ms))
}
15 changes: 3 additions & 12 deletions notification/src/api/amqp/amqpHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
AMQP_PIPELINE_EXECUTION_TOPIC,
AMQP_PIPELINE_EXECUTION_SUCCESS_TOPIC
} from '../../env'
import { sleep } from '../../sleep'

/**
* This class handles the communication with the AMQP service (rabbitmq)
Expand Down Expand Up @@ -43,20 +44,11 @@ export class AmqpHandler {
return await this.initChannel(connection)
} catch (e) {
retry++
await this.backOff(ms)
await sleep(ms)
}
}
console.error('Could not connect to AMQP Broker')
return Promise.reject(new Error('Could not connect to AMQP Broker'))
}

/**
* Waits for a specific time period.
*
* @param ms Period to wait in seconds
*/
private backOff (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
throw new Error('Could not connect to AMQP Broker')
}

private async initChannel (connection: AMQP.Connection): Promise<void> {
Expand All @@ -76,7 +68,6 @@ export class AmqpHandler {
`Successfully initialized pipeline-executed queue "${AMQP_PIPELINE_EXECUTION_QUEUE}" ` +
`on topic "${AMQP_PIPELINE_EXECUTION_TOPIC}"`
)
return Promise.resolve()
}

private async handleEvent (msg: AMQP.ConsumeMessage | null): Promise<void> {
Expand Down
3 changes: 1 addition & 2 deletions notification/src/api/triggerEventHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class TriggerEventHandler {
*/
public async handleEvent (transformationEvent: TransformationEvent): Promise<void> {
if (!this.isValidTransformationEvent(transformationEvent)) {
return Promise.reject(new Error('Trigger event is not valid'))
throw new Error('Trigger event is not valid')
}

const dataLocation = `${NOTIFICATION_DATA_LOCATION_URL}/${transformationEvent.pipelineId}`
Expand All @@ -46,7 +46,6 @@ export class TriggerEventHandler {
}

await Promise.all(notificationJobs)
return Promise.resolve()
}

/**
Expand Down
69 changes: 20 additions & 49 deletions notification/src/notification-config/storageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { NotificationRepository } from './notificationRepository'
import { NotificationSummary } from './notificationSummary'
import { SlackConfig, WebhookConfig, FirebaseConfig } from './notificationConfig'
import { POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER, POSTGRES_PW, POSTGRES_DB } from '../env'
import { sleep } from '../sleep'

/**
* This class handles Requests to the notification database
Expand Down Expand Up @@ -47,7 +48,7 @@ export class StorageHandler implements NotificationRepository {

if (!this.dbConnection) {
console.error('Could not initialize StorageHandler')
return Promise.reject(new Error('Could not initialize StorageHandler'))
throw new Error('Could not initialize StorageHandler')
}

this.slackRepository = this.dbConnection.getRepository(SlackConfig)
Expand Down Expand Up @@ -79,55 +80,46 @@ export class StorageHandler implements NotificationRepository {
dbCon = await createConnection(this.connectionOptions).catch(() => { return null })
if (!dbCon) {
console.info(`Initializing database connection (${i}/${retries})`)
await this.backOff(ms)
await sleep(ms)
} else {
connected = true
break
}
}

if (!connected) {
return Promise.reject(new Error('Connection to database could not be established.'))
throw new Error('Connection to database could not be established.')
}

console.info('Connected to notification config database successful.')
return Promise.resolve(dbCon)
}

/**
* Waits for a specific time period (in seconds)
*
* @param ms Period to wait in seconds
*/
private backOff (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
return dbCon
}

public async getSlackConfig (id: number): Promise<SlackConfig> {
const config = await this.slackRepository.findOne(id)
if (!config) {
return Promise.reject(new Error(`Could not find slack config with id ${id}`))
throw new Error(`Could not find slack config with id ${id}`)
}

return Promise.resolve(config)
return config
}

public async getWebhookConfig (id: number): Promise<WebhookConfig> {
const config = await this.webhookRepository.findOne(id)
if (!config) {
return Promise.reject(new Error(`Could not find webhook config with id ${id}`))
throw new Error(`Could not find webhook config with id ${id}`)
}

return Promise.resolve(config)
return config
}

public async getFirebaseConfig (id: number): Promise<FirebaseConfig> {
const config = await this.firebaseRepository.findOne(id)
if (!config) {
return Promise.reject(new Error(`Could not find firebase config with id ${id}`))
throw new Error(`Could not find firebase config with id ${id}`)
}

return Promise.resolve(config)
return config
}

/**
Expand Down Expand Up @@ -175,10 +167,8 @@ export class StorageHandler implements NotificationRepository {
await transactionalEntityManager.delete(FirebaseConfig, condition)
}).catch(error => {
console.error(`Could not delete configs with pipelineId ${pipelineId}: ${error}`)
return Promise.reject(error)
throw error
})

return Promise.resolve()
}

/**
Expand All @@ -190,13 +180,8 @@ export class StorageHandler implements NotificationRepository {
public async getSlackConfigs (pipelineId: number): Promise<SlackConfig[]> {
console.debug(`Getting slack configs with pipelineId ${pipelineId} from database`)
await this.checkClassInvariant()
let slackConfigList: SlackConfig[] = []

try {
slackConfigList = await this.slackRepository.find({ pipelineId: pipelineId })
} catch (error) {
return Promise.reject(error)
}
const slackConfigList = await this.slackRepository.find({ pipelineId: pipelineId })

console.debug(`Sucessfully got ${slackConfigList.length} Slack config(s) from Database`)
return slackConfigList
Expand All @@ -212,13 +197,8 @@ export class StorageHandler implements NotificationRepository {
console.debug(`Getting webhook configs with pipelineId ${pipelineId} from database`)

await this.checkClassInvariant()
let webhookConfigs: WebhookConfig[] = []

try {
webhookConfigs = await this.webhookRepository.find({ pipelineId: pipelineId })
} catch (error) {
return Promise.reject(error)
}
const webhookConfigs = await this.webhookRepository.find({ pipelineId: pipelineId })

console.debug(`Successfully got ${webhookConfigs.length} webhookConfigs from Database`)
return webhookConfigs
Expand All @@ -233,13 +213,8 @@ export class StorageHandler implements NotificationRepository {
public async getFirebaseConfigs (pipelineId: number): Promise<FirebaseConfig[]> {
console.debug(`Getting firebase configs with pipelineId ${pipelineId} from database`)
await this.checkClassInvariant()
let firebaseConfigs: FirebaseConfig[] = []

try {
firebaseConfigs = await this.firebaseRepository.find({ pipelineId: pipelineId })
} catch (error) {
return Promise.reject(error)
}
const firebaseConfigs = await this.firebaseRepository.find({ pipelineId: pipelineId })

console.debug(`Successfully got ${firebaseConfigs.length} firebase configs from database`)
return firebaseConfigs
Expand Down Expand Up @@ -362,10 +337,9 @@ export class StorageHandler implements NotificationRepository {

const deleteResult = await this.slackRepository.delete(id)
if (!deleteResult.affected) {
return Promise.reject(new Error(`Something went wrong deleting slack config with id ${id}`))
throw new Error(`Something went wrong deleting slack config with id ${id}`)
}
console.debug(`Successfully deleted slack config with id ${id}`)
return Promise.resolve()
}

/**
Expand All @@ -381,10 +355,9 @@ export class StorageHandler implements NotificationRepository {

const deleteResult = await this.webhookRepository.delete(id)
if (!deleteResult.affected) {
return Promise.reject(new Error(`Something went wrong deleting webhook config with id ${id}`))
throw new Error(`Something went wrong deleting webhook config with id ${id}`)
}
console.debug(`Successfully deleted webhook config with id ${id}`)
return Promise.resolve()
}

/**
Expand All @@ -400,10 +373,9 @@ export class StorageHandler implements NotificationRepository {

const deleteResult = await this.firebaseRepository.delete(id)
if (!deleteResult.affected) {
return Promise.reject(new Error(`Something went wrong deleting firebase config with id ${id}`))
throw new Error(`Something went wrong deleting firebase config with id ${id}`)
}
console.debug(`Successfully deleted firebase config with id ${id}`)
return Promise.resolve()
}

/**
Expand All @@ -412,7 +384,7 @@ export class StorageHandler implements NotificationRepository {
*
* @returns true, if invariant correct, else false
*/
private checkClassInvariant (): Promise<void> {
private async checkClassInvariant (): Promise<void> {
let validState = true
const msg: string[] = []

Expand All @@ -438,8 +410,7 @@ export class StorageHandler implements NotificationRepository {

if (!validState) {
console.error(`Error the following member variables are not set: ${msg}`)
return Promise.reject(new Error(msg.join()))
throw new Error(msg.join())
}
return Promise.resolve()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export default class NotificationExecutor {
const conditionHolds = this.executor.evaluate(webhook.condition, data)
console.log('Condition is ' + conditionHolds)
if (!conditionHolds) { // no need to trigger notification
return Promise.resolve()
return
}

const callbackObject: WebhookCallback = {
Expand All @@ -49,7 +49,7 @@ export default class NotificationExecutor {
const conditionHolds = this.executor.evaluate(slack.condition, data)
console.log('Condition is ' + conditionHolds)
if (!conditionHolds) { // no need to trigger notification
return Promise.resolve()
return
}

let slackBaseUri = 'https://hooks.slack.com/services'
Expand All @@ -70,7 +70,7 @@ export default class NotificationExecutor {
const conditionHolds = this.executor.evaluate(firebaseConfig.condition, data)
console.log('Condition is ' + conditionHolds)
if (!conditionHolds) { // no need to trigger notification
return Promise.resolve()
return
}

let app: App
Expand Down
7 changes: 7 additions & 0 deletions notification/src/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Returns a Promise that waits for a specific time period (in milliseconds) and resolves afterwards
* @param ms Period to wait in milliseconds
*/
export function sleep (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
2 changes: 1 addition & 1 deletion scheduler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "dist/index.js",
"scripts": {
"start": "npm run transpile && npm run start:transpiled",
"start:transpiled": "node --enable-source-maps --max-http-header-size 10000000 dist/index.js",
"start:transpiled": "node --enable-source-maps dist/index.js",
"transpile": "tsc",
"lint": "./node_modules/.bin/eslint src --ext .ts,.js --fix",
"lint-ci": "./node_modules/.bin/eslint src --ext .ts,.js --max-warnings=0",
Expand Down
5 changes: 1 addition & 4 deletions scheduler/src/scheduling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from './clients/adapter-client'
import { EventType } from './interfaces/datasource-event'
import DatasourceConfig from './interfaces/datasource-config'
import { sleep } from './sleep'

jest.mock('./clients/adapter-client')
// Type assertion is ok here, because we have mocked the whole './clients/adapter-client' module
Expand Down Expand Up @@ -215,10 +216,6 @@ describe('Scheduler', () => {
})
})

function sleep (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}

function generateConfig (periodic: boolean, firstExecution: Date, interval: number): DatasourceConfig {
return {
id: 123,
Expand Down
7 changes: 2 additions & 5 deletions scheduler/src/scheduling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as Scheduling from './scheduling'
import DatasourceConfig from './interfaces/datasource-config'
import DatasourceEvent, { EventType } from './interfaces/datasource-event'

import { sleep } from './sleep'
import { MAX_TRIGGER_RETRIES } from './env'

const allJobs: Map<number, ExecutionJob> = new Map() // datasourceId -> job
Expand All @@ -31,7 +32,7 @@ export async function initializeJobs (retries = 30, retryBackoff = 3000): Promis
}
} catch (e) {
if (retries === 0) {
return Promise.reject(new Error('Failed to initialize datasource/pipeline scheduler.'))
throw new Error('Failed to initialize datasource/pipeline scheduler.')
}
if (e.code === 'ECONNREFUSED' || e.code === 'ENOTFOUND') {
console.error(`Failed to sync with Adapter Service on init (${retries}) . Retrying after ${retryBackoff}ms... `)
Expand Down Expand Up @@ -212,7 +213,3 @@ export function cancelJob (jobId: number): void {
job.scheduleJob.cancel()
}
}

function sleep (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
7 changes: 7 additions & 0 deletions scheduler/src/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Returns a Promise that waits for a specific time period (in milliseconds) and resolves afterwards
* @param ms Period to wait in milliseconds
*/
export function sleep (ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms))
}
5 changes: 2 additions & 3 deletions storage/integration-test/src/storage-mq.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,14 @@ const amqpConnect = async (amqpUrl, retries, backoff) => {
try {
const connection = await AMQP.connect(amqpUrl)
console.log(`Successfully establish connection to AMQP broker (${amqpUrl})`)
return Promise.resolve(connection)
return connection
} catch (error) {
console.info(`Error connecting to RabbitMQ: ${error}. Retrying in ${backoff} seconds`)
console.info(`Connecting to Amqp broker (${i}/${retries})`)
await sleep(backoff)
continue
}
}
return Promise.reject(new Error(`Could not establish connection to AMQP broker (${amqpUrl})`))
throw new Error(`Could not establish connection to AMQP broker (${amqpUrl})`)
}

const sleep = (ms) => {
Expand Down
5 changes: 2 additions & 3 deletions storage/integration-test/src/storage.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,14 @@ const amqpConnect = async (amqpUrl, retries, backoff) => {
try {
const connection = await AMQP.connect(amqpUrl)
console.log(`Successfully establish connection to AMQP broker (${amqpUrl})`)
return Promise.resolve(connection)
return connection
} catch (error) {
console.info(`Error connecting to RabbitMQ: ${error}. Retrying in ${backoff} seconds`)
console.info(`Connecting to Amqp broker (${i}/${retries})`)
await sleep(backoff)
continue
}
}
Promise.reject(new Error(`Could not establish connection to AMQP broker (${amqpUrl})`))
throw new Error(`Could not establish connection to AMQP broker (${amqpUrl})`)
}

const sleep = (ms) => {
Expand Down
2 changes: 1 addition & 1 deletion storage/storage-mq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "dis/index.js",
"scripts": {
"start": "npm run transpile && npm run start:transpiled",
"start:transpiled": "node --enable-source-maps --max-http-header-size 10000000 dist/index.js",
"start:transpiled": "node --enable-source-maps dist/index.js",
"transpile": "tsc",
"lint": "./node_modules/.bin/eslint src --ext .ts,.js --fix",
"lint-ci": "./node_modules/.bin/eslint src --ext .ts,.js --max-warnings=0",
Expand Down
Loading

0 comments on commit 92a45f4

Please sign in to comment.