Skip to content

Commit

Permalink
feat: support proxy concurrency (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Mar 20, 2024
1 parent 3729513 commit d7ad35a
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 54 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

cache-from: type=gha
cache-to: type=gha,mode=max
# cache-from: type=gha
# cache-to: type=gha,mode=max
- name: Image digest
run: echo ${{ steps.docker_build.outputs.digest }}
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ can add jobs with any options you like and instantiate workers, also with any Bu

- [x] Initial support for adding and processing jobs for any queue.
- [x] Queue getters (retrieve jobs in any status from any queue).
- [ ] Support redundancy (multiple proxies running in parallel).
- [ ] Queue actions: Pause, Resume, Clean and Obliterate.
- [x] Support redundancy (multiple proxies running in parallel).
- [x] Job processing actions: update progress, add logs.
- [ ] Queue actions: Pause, Resume, Clean and Obliterate.
- [ ] Job actions: promote, retry, remove.
- [ ] Support for adding flows.
- [ ] Dynamic rate-limit.
- [ ] Manually consume jobs.
- [ ] Listen to global queue events.

Although the service is not yet feature complete, you are very welcome to try it out and give us
feedback and report any issues you may find.
Expand Down
Binary file modified bun.lockb
Binary file not shown.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
},
"dependencies": {
"@sinclair/typebox": "^0.31.17",
"bullmq": "^5.3.2",
"bullmq": "^5.4.3",
"chalk": "^5.3.0",
"ioredis": "^5.3.2",
"semver": "^7.6.0"
Expand Down
5 changes: 4 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ export const config = {
debugEnabled: process.env.DEBUG === "true",
minQueueNameLength: process.env.MIN_QUEUE_NAME_LENGTH ? parseInt(process.env.MIN_QUEUE_NAME_LENGTH) : 3,
maxQueueNameLength: process.env.MAX_QUEUE_NAME_LENGTH ? parseInt(process.env.MAX_QUEUE_NAME_LENGTH) : 100,
workerMetadataKey: process.env.WORKER_METADATA_KEY || "bullmq-proxy:workers",
workerMetadataKey: process.env.WORKER_METADATA_KEY || "bmqp:w:meta",
workerMetadataStream: process.env.WORKER_METADATA_KEY || "bmpq:w:stream",
maxLenWorkerMetadataStream:
process.env.MAX_LEN_WORKER_METADATA_STREAM ? parseInt(process.env.MAX_LEN_WORKER_METADATA_STREAM) : 100,
}
31 changes: 27 additions & 4 deletions src/controllers/http/worker-http-controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Redis } from 'ioredis';
import { describe, it, jest, mock, expect, beforeAll, afterAll } from "bun:test";
import { WorkerHttpController } from './worker-http-controller';
import { config } from '../../config';

const fakeAddValidReq = {
json: () => Promise.resolve({
Expand All @@ -16,7 +17,6 @@ const fakeAddValidReq = {
describe('WorkerHttpController.init', () => {

it('should initialize workers from Redis metadata', async () => {

await expect(WorkerHttpController.init(new Redis(), new Redis({
maxRetriesPerRequest: null,
}))).resolves.toBeUndefined;
Expand All @@ -32,18 +32,18 @@ mock.module('node-fetch', () => jest.fn(() => Promise.resolve({
describe('WorkerHttpController.addWorker', () => {

let redisClient: Redis;
beforeAll(() => {
beforeAll(async () => {
redisClient = new Redis({
maxRetriesPerRequest: null
});
await WorkerHttpController.loadScripts(redisClient);
});

afterAll(async () => {
await redisClient.quit();
});

it('should add a worker with valid metadata', async () => {

const response = await WorkerHttpController.addWorker({
req: fakeAddValidReq,
redisClient,
Expand All @@ -53,6 +53,17 @@ describe('WorkerHttpController.addWorker', () => {
expect(response).toBeDefined();
expect(await response.text()).toBe("OK");
expect(response!.status).toBe(200); // Assuming 200 is the success status code

// Verify worker was added in Redis
const workerMetadataKey = config.workerMetadataKey;
const workerMetadata = await redisClient.hgetall(workerMetadataKey);
expect(workerMetadata).toBeDefined();
expect(workerMetadata.validQueue).toBeDefined();

// Verify event was added in Redis
const workerMetadataStream = config.workerMetadataStream;
const streamLength = await redisClient.xlen(workerMetadataStream);
expect(streamLength).toBeGreaterThan(0);
});

it('should return a 400 response for invalid metadata', async () => {
Expand All @@ -73,10 +84,11 @@ describe('WorkerHttpController.addWorker', () => {

describe('WorkerHttpController.removeWorker', () => {
let redisClient: Redis;
beforeAll(() => {
beforeAll(async () => {
redisClient = new Redis({
maxRetriesPerRequest: null
});
await WorkerHttpController.loadScripts(redisClient);
});

it('should remove a worker successfully', async () => {
Expand All @@ -98,6 +110,17 @@ describe('WorkerHttpController.removeWorker', () => {
const responseRemove = await WorkerHttpController.removeWorker(opts);
expect(responseRemove).toBeDefined();
expect(responseRemove!.status).toBe(200); // Assuming 200 indicates success

// Verify worker was removed from Redis
const workerMetadataKey = config.workerMetadataKey;
const workerMetadata = await redisClient.hgetall(workerMetadataKey);
expect(workerMetadata).toBeDefined();
expect(workerMetadata.validQueue).toBeUndefined();

// Verify event was added in Redis
const workerMetadataStream = config.workerMetadataStream;
const streamLength = await redisClient.xlen(workerMetadataStream);
expect(streamLength).toBeGreaterThan(0);
});

it('should return 404 for non existing workers', async () => {
Expand Down
183 changes: 156 additions & 27 deletions src/controllers/http/worker-http-controller.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createHash } from "crypto";

import { Job, Worker } from "bullmq";
import { Redis, Cluster } from "ioredis";
Expand All @@ -10,12 +11,16 @@ import { config } from "../../config";
const debugEnabled = config.debugEnabled;

const workers: { [queueName: string]: Worker } = {};
const metadatasShas: { [queueName: string]: string } = {};

const workerMetadataKey = config.workerMetadataKey;
const workerMetadataStream = config.workerMetadataStream;
const workerStreamBlockingTime = 5000;

const abortController = new AbortController();
export const gracefulShutdownWorkers = async () => {
info(`Closing workers...`);

abortController.abort();
const closingWorkers = Object.keys(workers).map(async (queueName) => workers[queueName].close());
await Promise.all(closingWorkers);
info('Workers closed');
Expand All @@ -24,7 +29,7 @@ export const gracefulShutdownWorkers = async () => {
const workerFromMetadata = (queueName: string, workerMetadata: WorkerMetadata, connection: Redis | Cluster): Worker => {
const { endpoint: workerEndpoint, opts: workerOptions } = workerMetadata;

debugEnabled && debug(`Starting worker for queue ${queueName} with endpoint ${workerMetadata.endpoint.url} and options ${workerMetadata.opts || 'default'}`);
debugEnabled && debug(`Starting worker for queue ${queueName} with endpoint ${workerMetadata.endpoint.url} and options ${JSON.stringify(workerMetadata.opts) || 'default'}`);

const worker = new Worker(queueName, async (job: Job, token?: string) => {
debugEnabled && debug(`Processing job ${job.id} from queue ${queueName} with endpoint ${workerEndpoint.url}`);
Expand Down Expand Up @@ -72,10 +77,111 @@ const workerFromMetadata = (queueName: string, workerMetadata: WorkerMetadata, c
return worker;
};

let lastEventId: string | undefined;

export const workerStreamListener = async (redisClient: Redis | Cluster, abortSignal: AbortSignal) => {
const streamBlockingClient = redisClient.duplicate();
let running = true;

abortSignal.addEventListener('abort', () => {
running = false;
streamBlockingClient.disconnect();
});

while (running) {
const streams = await streamBlockingClient.xread('BLOCK', workerStreamBlockingTime, 'STREAMS', workerMetadataStream, lastEventId || '0');

// If we got no events, continue to the next iteration
if (!streams || streams.length === 0) {
continue;
}

const stream = streams[0];

debugEnabled && debug(`Received ${streams.length} event${streams.length > 1 ? "s" : ""} from stream ${workerMetadataStream}`);

const [_streamName, events] = stream;

for (const [eventId, fields] of events) {

lastEventId = eventId;
const queueName = fields[1];
const existingWorker = workers[queueName];
const existingSha = metadatasShas[queueName];

const workerMetadataRaw = await redisClient.hget(workerMetadataKey, queueName);

// If workerMetadatadaVersion is older than the event id, we need to update the worker
if (workerMetadataRaw) {
const workerMetadataSha256 = createHash('sha256').update(workerMetadataRaw).digest('hex');

if ((existingSha !== workerMetadataSha256)) {
const workerMetadata = JSON.parse(workerMetadataRaw);
workers[queueName] = workerFromMetadata(queueName, workerMetadata, redisClient);
metadatasShas[queueName] = workerMetadataSha256;
if (existingWorker) {
await existingWorker.close();
}
}
} else {
// worker has been removed
debugEnabled && debug(`Worker for queue ${queueName} has been removed`);

if (existingWorker) {
await existingWorker.close();
delete workers[queueName];
delete metadatasShas[queueName];
}
}
}
}
}

export const WorkerHttpController = {
init: (redisClient: Redis | Cluster, workersRedisClient: Redis | Cluster) => {
loadScripts: async (redisClient: Redis | Cluster) => {
const luaScripts = {
updateWorkerMetadata: `
local workerMetadataKey = KEYS[1]
local workerMetadataStream = KEYS[2]
local queueName = ARGV[1]
local workerMetadata = ARGV[2]
local streamMaxLen = ARGV[3]
redis.call('HSET', workerMetadataKey, queueName, workerMetadata)
local eventId = redis.call('XADD', workerMetadataStream, 'MAXLEN', streamMaxLen, '*', 'worker', queueName)
return eventId
`,
removeWorkerMetadata: `
local workerMetadataKey = KEYS[1]
local workerMetadataStream = KEYS[2]
local queueName = ARGV[1]
local streamMaxLen = ARGV[2]
local removedWorker = redis.call('HDEL', workerMetadataKey, queueName)
if removedWorker == 1 then
local eventId = redis.call('XADD', workerMetadataStream, 'MAXLEN', streamMaxLen, '*', 'worker', queueName)
return { removedWorker, eventId }
end
`
}

for (const [scriptName, script] of Object.entries(luaScripts)) {
redisClient.defineCommand(scriptName, { numberOfKeys: 2, lua: script });
}
},

/**
* Load workers from Redis and start them.
*
* @param redisClient
* @param workersRedisClient
*/
loadWorkers: async (redisClient: Redis | Cluster, workersRedisClient: Redis | Cluster) => {
// Load workers from Redis and start them
debugEnabled && debug('Loading workers from Redis...');
const result = await redisClient.xrevrange(config.workerMetadataStream, '+', '-', 'COUNT', 1);
if (result.length > 0) {
[[lastEventId]] = result
}
const stream = redisClient.hscanStream(workerMetadataKey, { count: 10 });
stream.on('data', (result: string[]) => {
for (let i = 0; i < result.length; i += 2) {
Expand All @@ -84,10 +190,11 @@ export const WorkerHttpController = {

const workerMetadata = JSON.parse(value) as WorkerMetadata;
workers[queueName] = workerFromMetadata(queueName, workerMetadata, workersRedisClient);
metadatasShas[queueName] = createHash('sha256').update(value).digest('hex');
}
});

return new Promise<void>((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
stream.on('end', () => {
debugEnabled && debug('Workers loaded');
resolve();
Expand All @@ -98,6 +205,11 @@ export const WorkerHttpController = {
});
});
},
init: async (redisClient: Redis | Cluster, workersRedisClient: Redis | Cluster) => {
await WorkerHttpController.loadScripts(redisClient);
await WorkerHttpController.loadWorkers(redisClient, workersRedisClient);
workerStreamListener(workersRedisClient, abortController.signal);
},

/**
* Add a new worker to the system. A worker is a BullMQ worker that processes
Expand All @@ -123,23 +235,25 @@ export const WorkerHttpController = {
}

const { queue: queueName } = workerMetadata;
const { redisClient, workersRedisClient } = opts;

// Replace worker if it already exists
const existingWorker = workers[queueName];
const worker = workerFromMetadata(queueName, workerMetadata, workersRedisClient);
workers[queueName] = worker;
const { redisClient } = opts;

// Upsert worker metadata in Redis for the worker to be able to reconnect after a restart
// Upsert worker metadata and notify all listeners about the change.
try {
await redisClient.hset(workerMetadataKey, queueName, JSON.stringify(workerMetadata));
const eventId = await (<any>redisClient)['updateWorkerMetadata'](
workerMetadataKey,
workerMetadataStream,
queueName,
JSON.stringify(workerMetadata),
config.maxLenWorkerMetadataStream
);

lastEventId = eventId as string;

return new Response('OK', { status: 200 });
} catch (err) {
return new Response('Failed to store worker metadata in Redis', { status: 500 });
} finally {
if (existingWorker) {
await existingWorker.close();
}
const errMsg = `Failed to store worker metadata in Redis: ${err}`;
debugEnabled && debug(errMsg);
return new Response(errMsg, { status: 500 });
}
},

Expand Down Expand Up @@ -172,21 +286,36 @@ export const WorkerHttpController = {
const { queueName } = opts.params;
const { redisClient } = opts;

const worker = workers[queueName];
delete workers[queueName];
try {
if (worker) {
await worker.close();
}

const removedWorker = await redisClient.hdel(workerMetadataKey, queueName);
if (removedWorker === 0 && !worker) {
const result = await (<any>redisClient)['removeWorkerMetadata'](
workerMetadataKey,
workerMetadataStream,
queueName,
config.maxLenWorkerMetadataStream
);
if (!result && !workers[queueName]) {
return new Response('Worker not found', { status: 404 });
}

lastEventId = result[1];

return new Response('OK', { status: 200 });
} catch (err) {
return new Response('Failed to remove worker', { status: 500 });
} catch (_err) {
const err = _err as Error;
debugEnabled && debug(`Failed to remove worker: ${err}`);
return new Response(`Failed to remove worker ${err.toString()}`, { status: 500 });
}
},

/**
* Cleans the proxy metadata from the Redis host.
* @param redisClient
* @returns
*/
cleanMetadata: async (redisClient: Redis | Cluster) => {
const multi = redisClient.multi();
multi.del(workerMetadataKey);
multi.del(workerMetadataStream);
return multi.exec();
}
}

0 comments on commit d7ad35a

Please sign in to comment.