Skip to content

Commit

Permalink
Copy over bull connector source code
Browse files Browse the repository at this point in the history
  • Loading branch information
jasrusable committed Jun 16, 2021
1 parent b42f120 commit aedf05c
Show file tree
Hide file tree
Showing 13 changed files with 6,392 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dist
node_modules
58 changes: 58 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module.exports = {
env: {
es6: true,
node: true,
},
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/recommended',
'plugin:@typescript-eslint/recommended-requiring-type-checking',
'airbnb-typescript/base',
'plugin:unicorn/recommended',
'plugin:sonarjs/recommended',
'plugin:prettier/recommended',
'prettier/@typescript-eslint',
],
globals: {
Atomics: 'readonly',
SharedArrayBuffer: 'readonly',
},
parser: '@typescript-eslint/parser',
parserOptions: {
ecmaVersion: 2018,
sourceType: 'module',
project: './tsconfig.json',
createDefaultProgram: true,
tsconfigRootDir: __dirname,
},
plugins: ['prettier', '@typescript-eslint', 'unicorn', 'sonarjs'],
settings: {
'import/resolver': {
typescript: {},
},
},
rules: {
'@typescript-eslint/explicit-function-return-type': 'off',
'@typescript-eslint/explicit-module-boundary-types': 'off',
'@typescript-eslint/no-explicit-any': 'off',
'@typescript-eslint/no-unsafe-assignment': 'off',
'@typescript-eslint/no-unsafe-member-access': 'off',
'@typescript-eslint/no-unsafe-return': 'off',
'import/prefer-default-export': 'off',
'no-console': 'off',
'prettier/prettier': 'error',
'sonarjs/prefer-immediate-return': 'off',
'unicorn/prevent-abbreviations': 'off',
'@typescript-eslint/restrict-template-expressions': 'off',
'@typescript-eslint/no-unsafe-call': 'off',
'unicorn/no-reduce': 'off',
'@typescript-eslint/no-throw-literal': 'off',
'import/no-cycle': 'off',
'sonarjs/cognitive-complexity': 'off',
'unicorn/no-null': 'off',
'@typescript-eslint/no-misused-promises': 'off',
'sonarjs/no-duplicate-string': 'off',
'unicorn/no-nested-ternary': 'off',
'no-nested-ternary': 'off',
},
};
7 changes: 7 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
coverage
docs
docker-compose.yaml
.eslintrc.js
.github
.prettierignore
.prettierrc.js
7 changes: 7 additions & 0 deletions .prettierrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module.exports = {
trailingComma: 'all',
tabWidth: 2,
semi: true,
singleQuote: true,
printWidth: 80,
};
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,42 @@
# queuemetrics-connector-bullmq
# QueueMetrics.io BullMQ Connector

This is a small service which enables you to connect your [BullMQ](https://github.com/taskforcesh/bullmq) queues to [QueueMetrics.io](https://queuemetrics.io). It acts as a proxy between your Bull queues and the [QueueMetrics.io](https://queuemetrics.io) API, providing bi-directional data transfer which enables advanced queue insights and management with [QueueMetrics.io](https://queuemetrics.io).

This connector is designed to be used in production-grade environments and does not require the sharing of passwords or SSH tunnels to function. It is also very useful to use the connector in local development environments for rapid queue development and debugging.

The connector is very lightweight and efficient, and uses minimal resources.

## Installation

```sh
npm install queuemetrics-connector-bullmq
```

## Usage

```sh
Usage: queuemetrics-connector-bullmq [options]

Options:
-V, --version output the version number
-n, --connector-name <connection-name> Connector name. Defaults to 'Default connector'. (default: "Default connector")
-a, --api-key <api-key> QueueMetrics.io organization API key. Get this from https://dashboard.queuemetrics.io
-h, --host <host> Redis host. Defaults to localhost. (default: "localhost")
-p, --port <port> Redis port. Defaults to 6379. (default: "6379")
-d, --database <database> Redis database. Defaults to 0. (default: "0")
-w, --password <password> Redis password, can also be supplied by setting REDIS_PASSWORD environment variable.
--tls [tls] Activate secured TLS connection to Redis
-u, --uri [uri] Redis URI.
-s, --sentinels [host:port] Comma-separated list of sentinel host/port pairs
-m, --master [name] Name of master node used in sentinel configuration
-b, --backend <backend> QueueMetrics backend. Defaults to wss://api.queuemetrics.io (default: "wss://api.queuemetrics.io")
--help display help for command
```

### Example usage

```sh
npx queuemetrics-connector-bullmq -u redis://<your-redis-host>:<your-redis-port>/<your-redis-db> -a <your-queuemetrics-api-key>
```

Your API key can be obtained from your [Queuemetrics.io Dashboard](https://dashboard.queuemetrics.io/)
145 changes: 145 additions & 0 deletions bin/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env node
/* eslint-disable @typescript-eslint/no-var-requires */
/* eslint-disable import/no-dynamic-require */
/* eslint-disable @typescript-eslint/no-floating-promises */
/* eslint-disable @typescript-eslint/ban-ts-comment */
import program from 'commander';
import Redis from 'ioredis';
import { io } from 'socket.io-client';
import {
setIntervalAsync,
SetIntervalAsyncTimer,
clearIntervalAsync,
} from 'set-interval-async/dynamic';
// @ts-ignore
import redisUrlParse from 'redis-url-parse';
import { debug, RedisConfig } from '../src/utils';
import { updateQueuesCache } from '../src/queues';
import { registerRequestHandlers } from '../src/request-handlers';

const pkg = require(`../package.json`);

program.version(pkg.version);

program
.requiredOption(
'-n, --connector-name <connection-name>',
"Connector name. Defaults to 'Default connector'.",
process.env.CONNECTOR_NAME || 'Default connector',
)
.requiredOption(
'-a, --api-key <api-key>',
'QueueMetrics.io organization API key. Get this from https://dashboard.queuemetrics.io',
process.env.API_KEY,
)
.option(
'-h, --host <host>',
'Redis host. Defaults to localhost.',
process.env.REDIS_HOST || 'localhost',
)
.option(
'-p, --port <port>',
'Redis port. Defaults to 6379.',
process.env.REDIS_PORT || '6379',
)
.option(
'-d, --database <database>',
'Redis database. Defaults to 0.',
process.env.REDIS_DB || '0',
)
.option(
'-w, --password <password>',
'Redis password, can also be supplied by setting REDIS_PASSWORD environment variable.',
process.env.REDIS_PASSWORD,
)
.option('--tls [tls]', 'Activate secured TLS connection to Redis')
.option('-u, --uri [uri]', 'Redis URI.', process.env.REDIS_URI)
.option(
'-s, --sentinels [host:port]',
'Comma-separated list of sentinel host/port pairs',
process.env.REDIS_SENTINELS,
)
.option(
'-m, --master [name]',
'Name of master node used in sentinel configuration',
process.env.REDIS_MASTER,
)
.option(
'-b, --backend <backend>',
'QueueMetrics backend. Defaults to wss://api.queuemetrics.io',
process.env.BACKEND || 'wss://api.queuemetrics.io',
)
.parse(process.argv);

(() => {
const opts = program.opts();

const { connectorName } = opts;
const { apiKey } = opts;

const redisConfigFromUri: any = (opts.uri as string | undefined)
? (redisUrlParse(opts.uri) as Record<string, unknown>)
: undefined;

const redisConfig: RedisConfig = {
host: redisConfigFromUri.host || opts.host,
port: Number(String(redisConfigFromUri.port) || opts.port),
db: Number(redisConfigFromUri.database || opts.database),
password: redisConfigFromUri.password || opts.password,
tls:
program.tls || opts.uri?.startsWith('rediss://')
? {
rejectUnauthorized: false,
requestCert: true,
agent: false,
}
: undefined,
};

const redis = new Redis({
host: redisConfig.host,
port: redisConfig.port,
db: redisConfig.db,
password: redisConfig.password,
tls: redisConfig.tls,
});

const websocketUri = `${program.backend}`;

console.log(`Attempting to connect to ${websocketUri}`);

const socket = io(websocketUri, { reconnectionDelayMax: 1000 });

let timer: SetIntervalAsyncTimer;

registerRequestHandlers({ redis, socket });

socket.on('connect', () => {
console.log(`Socket connected to ${websocketUri}`);

socket.on('disconnect', () => {
console.log(`Socket disconnected from ${websocketUri}`);
if (timer) {
clearIntervalAsync(timer);
}
});

debug('Emitting initialize-connector-connection');
socket.emit(
'initialize-connector-connection',
{
apiKey,
connectorType: 'bullmq',
connectorName,
connectorVersion: pkg.version,
},
async () => {
debug('Acknowledged initialize-connector-connection');
await updateQueuesCache({ redis, redisConfig, socket, apiKey });
timer = setIntervalAsync(() => {
return updateQueuesCache({ redis, redisConfig, socket, apiKey });
}, 1000);
},
);
});
})();
Loading

0 comments on commit aedf05c

Please sign in to comment.