Skip to content

Commit

Permalink
Merge pull request #20 from VirtueMe/namedjobs
Browse files Browse the repository at this point in the history
add bull named jobs
  • Loading branch information
icebob committed Mar 27, 2018
2 parents 0ba5f18 + 3f7a8c8 commit 01ac785
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 13 deletions.
17 changes: 15 additions & 2 deletions packages/moleculer-bull/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,21 @@ module.exports = function createService(url, queueOpts) {
_.forIn(this.schema.queues, (fn, name) => {
if(typeof fn === "function")
this.getQueue(name).process(fn.bind(this));
else
this.getQueue(name).process(fn.concurrency, fn.process.bind(this));
else {
let args = [];

if (fn.name) {
args.push(fn.name);
}

if (fn.concurrency) {
args.push(fn.concurrency);
}

args.push(fn.process.bind(this));

this.getQueue(name).process.apply(null, args);
}
});
}

Expand Down
41 changes: 30 additions & 11 deletions packages/moleculer-bull/test/unit/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,55 @@ describe("Test BullService created handler", () => {
const opts = { a: 5 };
const url = "redis://localhost";

const concurrency = {
concurrency: 100,
process: jest.fn(),
};
const named = {
name: "name",
process: jest.fn(),
};

const namedconcurrency = {
name: "name",
concurrency: 100,
process: jest.fn(),
}

const broker = new ServiceBroker();
const service = broker.createService({
mixins: [BullService(url, opts)],

queues: {
"task.first": jest.fn(),
"task.second": jest.fn(),
"task.concurrency": {
concurrency: 100,
process(job) {
return jest.fn();
},
},
"task.concurrency": concurrency,
"task.name": named,
"task.name.concurrency": namedconcurrency,
}
});

it("should be created queues", () => {
expect(service).toBeDefined();
expect(Object.keys(service.$queues).length).toBe(3);
expect(Object.keys(service.$queues).length).toBe(5);
expect(service.$queues["task.first"]).toBeDefined();
expect(service.$queues["task.second"]).toBeDefined();
expect(service.$queues["task.concurrency"]).toBeDefined();
expect(service.$queues["task.name"]).toBeDefined();

expect(Queue).toHaveBeenCalledTimes(3);
expect(Queue).toHaveBeenCalledTimes(5);
expect(Queue).toHaveBeenCalledWith("task.first", url, opts);
expect(Queue).toHaveBeenCalledWith("task.second", url, opts);
expect(Queue).toHaveBeenCalledWith("task.concurrency", url, opts);

expect(processCB).toHaveBeenCalledTimes(3);
expect(Queue).toHaveBeenCalledWith("task.name", url, opts);
expect(Queue).toHaveBeenCalledWith("task.name.concurrency", url, opts);

expect(processCB).toHaveBeenCalledTimes(5);
expect(processCB).toHaveBeenCalledWith(expect.anything());
expect(processCB).toHaveBeenCalledWith(expect.anything());
expect(processCB).toHaveBeenCalledWith(concurrency.concurrency, expect.anything());
expect(processCB).toHaveBeenCalledWith(named.name, expect.anything());
expect(processCB).toHaveBeenCalledWith(namedconcurrency.name, namedconcurrency.concurrency, expect.anything());
});

});
Expand Down Expand Up @@ -163,4 +183,3 @@ describe("Test BullService createJob return a promise", () => {
});

});

0 comments on commit 01ac785

Please sign in to comment.