From a97b22f518a9f6c5d9c30a77bfd03cafdcbc57ff Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Wed, 15 Feb 2023 23:20:55 +0100 Subject: [PATCH] feat: initial python package (#1673) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: initial python package * chore: correct python actions * style: delete white spaces * feat(python): add isPaused method * chore: add missing async * feat(python): add more features to the python package * chore: avoid trigger npm releases for python changes * chore(python): better module handling * fix(python): some lint errors --------- Co-authored-by: rogger andré valverde flores --- .github/workflows/release.yml | 2 +- .github/workflows/test.yml | 51 ++++++- .npmignore | 29 ++++ package.json | 1 + python/.gitignore | 5 + python/README.md | 53 ++++++++ python/bullmq/__init__.py | 12 ++ python/bullmq/error_code.py | 9 ++ python/bullmq/event_emitter.py | 18 +++ python/bullmq/job.py | 100 ++++++++++++++ python/bullmq/queue.py | 72 ++++++++++ python/bullmq/redis_connection.py | 20 +++ python/bullmq/scripts.py | 219 ++++++++++++++++++++++++++++++ python/bullmq/timer.py | 23 ++++ python/bullmq/worker.py | 175 ++++++++++++++++++++++++ python/requirements.txt | 11 ++ python/run_tests.sh | 4 + python/setup.py | 37 +++++ python/tests/__init__.py | 0 python/tests/queue_tests.py | 46 +++++++ python/tests/worker_tests.py | 157 +++++++++++++++++++++ 21 files changed, 1042 insertions(+), 2 deletions(-) create mode 100644 .npmignore create mode 100644 python/.gitignore create mode 100644 python/README.md create mode 100644 python/bullmq/__init__.py create mode 100644 python/bullmq/error_code.py create mode 100644 python/bullmq/event_emitter.py create mode 100644 python/bullmq/job.py create mode 100644 python/bullmq/queue.py create mode 100644 python/bullmq/redis_connection.py create mode 100644 python/bullmq/scripts.py create mode 100644 python/bullmq/timer.py create mode 100644 python/bullmq/worker.py create mode 100644 python/requirements.txt create mode 100755 python/run_tests.sh create mode 100644 python/setup.py create mode 100644 python/tests/__init__.py create mode 100644 python/tests/queue_tests.py create mode 100644 python/tests/worker_tests.py diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 203f908e41..f2e06eb8a9 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -30,4 +30,4 @@ jobs: env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} NPM_TOKEN: ${{ secrets.NPM_TOKEN }} - run: npx semantic-release + run: npx multi-semantic-release --ignore-packages=python/** diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 13192b8551..154aca3ed0 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,7 +1,7 @@ # This workflow will do a clean install of node dependencies, build the source code and run tests across different versions of node # For more information see: https://help.github.com/actions/language-and-framework-guides/using-nodejs-with-github-actions -name: Node.js Tests +name: Tests on: push: @@ -86,3 +86,52 @@ jobs: - run: yarn install --ignore-engines --frozen-lockfile --non-interactive - run: yarn build - run: yarn test + + python: + runs-on: ubuntu-latest + + name: testing python@${{ matrix.python-version }}, redis@${{ matrix.redis-version }} + + strategy: + matrix: + node-version: [lts/*] + redis-version: [7-alpine] + python-version: ['3.10'] + + steps: + - name: Checkout repository + uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3 + + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@8c91899e586c5b171469028077307d293428b516 # tag=v3 + with: + node-version: ${{ matrix.node-version }} + cache: 'yarn' + - name: Start Redis + uses: supercharge/redis-github-action@4b67a313c69bc7a90f162e8d810392fffe10d3b5 # tag=1.4.0 + with: + redis-version: ${{ matrix.redis-version }} + - run: yarn install --ignore-engines --frozen-lockfile --non-interactive + - run: yarn build + - run: yarn copy:lua:python + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 mypy types-redis + pip install -r python/requirements.txt + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 ./python --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 ./python --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + cd python + ./run_tests.sh diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000000..05e215cc7a --- /dev/null +++ b/.npmignore @@ -0,0 +1,29 @@ +.DS_Store +node_modules +/dist +/rawScripts +/src/scripts +# Log files +npm-debug.log* +yarn-debug.log* +yarn-error.log* + +# Coverage files +.nyc_output +coverage +coverage/* + +# Editor directories and files +.idea +.vscode +*.suo +*.ntvs* +*.njsproj +*.sln +*.sw* +temp +docs/gitbook/api +package-lock.json + +# Ignore python code from npm +python diff --git a/package.json b/package.json index dc56e3e60f..7d627c202d 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "clean:temp:files": "rimraf dist/cjs/bullmq.d.ts dist/esm/bullmq.d.ts dist/tsdoc-metadata.json", "copy:includes:lua": "copyfiles -f ./src/commands/includes/*.lua ./dist/cjs/commands/includes && copyfiles -f ./src/commands/includes/*.lua ./dist/esm/commands/includes", "copy:lua": "copyfiles -f ./src/commands/*.lua ./dist/cjs/commands && copyfiles -f ./src/commands/*.lua ./dist/esm/commands", + "copy:lua:python": "copyfiles -f ./rawScripts/*.lua ./python/bullmq/commands", "copy:master:type": "copyfiles -f ./dist/esm/classes/master.d.ts ./dist/cjs/classes", "coverage": "nyc --reporter=text --reporter=lcovonly yarn test", "cm": "git cz", diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000000..9cf086de63 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +bullmq/commands +build +bullmq.egg-info +dist \ No newline at end of file diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000000..3b6177a93c --- /dev/null +++ b/python/README.md @@ -0,0 +1,53 @@ +# BullMQ For Python + +This is the official BullMQ Python library. It is a close port of the NodeJS version of the library. +Python Queues are interoperable with NodeJS Queues, as both libraries use the same .lua scripts that +power all the functionality. + +## Features + +Currently, the library does not support all the features available in the NodeJS version. The following +have been ported so far: + +- [ ] Add jobs to queues. + + - [x] Regular jobs. + - [ ] Delayed jobs. + - [ ] Job priority. + - [ ] Repeatable. + +- [ ] Workers +- [ ] Job events. +- [ ] Job progress. +- [ ] Job retries. +- [ ] Job backoff. +- [ ] Getters. + +## Installation + +```bash +pip install bullmq +``` + +## Usage + +```python +from bullmq import Queue + +queue = Queue('my-queue') + +job = await queue.add('my-job', {'foo': 'bar'}) + +``` + +## Documentation + +The documentation is available at [https://docs.bullmq.io](https://docs.bullmq.io) + +## License + +MIT + +## Copyright + +Copyright (c) 2018-2023, Taskforce.sh Inc. and other contributors. diff --git a/python/bullmq/__init__.py b/python/bullmq/__init__.py new file mode 100644 index 0000000000..e541674d64 --- /dev/null +++ b/python/bullmq/__init__.py @@ -0,0 +1,12 @@ +""" +BullMQ + +A background job processor and message queue for Python based on Redis. +""" +__version__ = "0.1.0" +__author__ = 'Taskforce.sh Inc.' +__credits__ = 'Taskforce.sh Inc.' + +from bullmq.queue import Queue +from bullmq.job import Job +from bullmq.worker import Worker \ No newline at end of file diff --git a/python/bullmq/error_code.py b/python/bullmq/error_code.py new file mode 100644 index 0000000000..007df08db2 --- /dev/null +++ b/python/bullmq/error_code.py @@ -0,0 +1,9 @@ +from enum import Enum + +class ErrorCode(Enum): + JobNotExist = -1 + JobLockNotExist = -2 + JobNotInState = -3 + JobPendingDependencies = -4 + ParentJobNotExist = -5 + JobLockMismatch = -6 diff --git a/python/bullmq/event_emitter.py b/python/bullmq/event_emitter.py new file mode 100644 index 0000000000..6cb80f4043 --- /dev/null +++ b/python/bullmq/event_emitter.py @@ -0,0 +1,18 @@ +# Credits: https://gist.github.com/marc-x-andre/1c55b3fafd1d00cfdaa205ec53a08cf3 +from typing import Dict + + +class EventEmitter: + + def __init__(self): + self._callbacks: Dict[str, callable] = {} + + def on(self, event_name, function): + self._callbacks[event_name] = self._callbacks.get(event_name, []) + [function] + return function + + def emit(self, event_name, *args, **kwargs): + [function(*args, **kwargs) for function in self._callbacks.get(event_name, [])] + + def off(self, event_name, function): + self._callbacks.get(event_name, []).remove(function) diff --git a/python/bullmq/job.py b/python/bullmq/job.py new file mode 100644 index 0000000000..9c2dc3eef3 --- /dev/null +++ b/python/bullmq/job.py @@ -0,0 +1,100 @@ +from typing import Any +import json +import time + +from redis import Redis +from typing import Dict, List, Union, Any + +optsDecodeMap = { + 'fpof': 'failParentOnFailure', + 'kl': 'keepLogs', +} + +optsEncodeMap = {v: k for k, v in optsDecodeMap.items()} + +class Job: + """ + Instantiate a Queue object + """ + def __init__(self, client: Redis, name: str, data: Any, opts = {}): + self.name = name + self.id = opts.get("jobId", None) + self.progress = 0 + self.timestamp = opts.get("timestamp", round(time.time() * 1000)) + self.opts = opts + self.delay = opts.get("delay", 0) + self.attempts = opts.get("attempts", 1) + self.attemptsMade = 0 + self.data = data + self.removeOnComplete = opts.get("removeOnComplete", True) + self.removeOnFail = opts.get("removeOnFail", False) + self.processedOn = 0 + self.finishedOn = 0 + self.returnvalue = None + self.failedReason = None + self.repeatJobKey = None + self.stacktrace: List[str] = [] + +def fromJSON(client: Redis, rawData, jobId = None): + data = json.loads(rawData.get("data", '{}')) + opts = optsFromJSON(json.loads(rawData.get("opts", '{}'))) + + job = Job(client, rawData.get("name"), data, opts) + job.id = jobId or rawData.get("id", b'').decode("utf-8") + + job.progress = json.loads(rawData.get("progress", '0')) + job.delay = int(rawData.get("delay", "0")) + job.timestamp = int(rawData.get("timestamp", "0")) + + if rawData.get("finishedOn"): + job.finishedOn = int(rawData.get("finishedOn")) + + if rawData.get("processedOn"): + job.processedOn = int(rawData.get("processedOn")) + + if rawData.get("rjk"): + job.repeatJobKey = rawData.get("rjk") + + job.failedReason = rawData.get("failedReason") + job.attemptsMade = int(rawData.get("attemptsMade", "0")) + + returnvalue = rawData.get("returnvalue") + if type(returnvalue) == str: + job.returnvalue = getReturnValue(returnvalue) + + job.stacktrace = json.loads(rawData.get("stacktrace", "[]")) + + # if (json.parentKey) { + # job.parentKey = json.parentKey; + # } + + # if (json.parent) { + # job.parent = JSON.parse(json.parent); + # } + + return job + +Job.fromJSON = staticmethod(fromJSON) + +def optsFromJSON(rawOpts): + # opts = json.loads(rawOpts) + opts = rawOpts + + optionEntries = opts.items() + + options = {} + for item in optionEntries: + attributeName = item[0] + value = item[1] + if attributeName in optsDecodeMap: + options[optsDecodeMap[attributeName]] = value + else: + options[attributeName] = value + + return options + +def getReturnValue(value: Any): + try: + json.loads(value) + except Exception as err: + return value diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py new file mode 100644 index 0000000000..592467a2df --- /dev/null +++ b/python/bullmq/queue.py @@ -0,0 +1,72 @@ +import redis.asyncio as redis + +from bullmq.scripts import Scripts +from bullmq.job import Job +from bullmq.redis_connection import RedisConnection + +class Queue: + """ + Instantiate a Queue object + """ + def __init__(self, name: str, redisOpts={}, opts={}): + """ "Initialize a connection" """ + + self.name = name + self.redisConnection = RedisConnection(redisOpts) + self.client = self.redisConnection.conn + self.opts = opts + + self.prefix = opts.get("prefix") or "bull" + + self.scripts = Scripts(self.prefix, name, self.redisConnection.conn) + + """ + Add an item to the queue. + + @param name: The name of the queue + @param data: The data to add to the queue (must be JSON serializable) + """ + async def add(self, name: str, data, opts = {}): + """ Add an item to the queue """ + job = Job(self.client, name, data, opts) + jobId = await self.scripts.addJob(job) + job.id = jobId + return job + + """ + Pauses the processing of this queue globally + """ + def pause(self): + return self.scripts.pause(True) + + def resume(self): + return self.scripts.pause(False) + + async def isPaused(self): + pausedKeyExists = await self.conn.hexists(self.opts.get("prefix") or "bull" + ":" + self.name + ":meta", "paused") + return pausedKeyExists == 1 + + """ + Remove everything from the queue. + """ + async def obliterate(self, force: bool = False): + """ "Obliterate the queue" """ + await self.pause() + while True: + cursor = await self.scripts.obliterate(1000, force) + if cursor == 0 or cursor == None or cursor == "0": + break + + """ + Closes the queue and the underlying connection to Redis. + """ + def close(self): + """ "Close the connection" """ + return self.redisConnection.close() + +async def fromId(queue: Queue, jobId: str): + key = queue.prefix + ":" + queue.name + ":" + jobId + rawData = await queue.client.hgetall(key) + return Job.fromJSON(queue.client, rawData, jobId) + +Job.fromId = staticmethod(fromId) diff --git a/python/bullmq/redis_connection.py b/python/bullmq/redis_connection.py new file mode 100644 index 0000000000..2a3c39239b --- /dev/null +++ b/python/bullmq/redis_connection.py @@ -0,0 +1,20 @@ +import redis.asyncio as redis + +""" + RedisConnection class +""" +class RedisConnection: + def __init__(self, redisOpts = {}): + host = redisOpts.get("host") or "localhost" + port = redisOpts.get("port") or 6379 + db = redisOpts.get("db") or 0 + password = redisOpts.get("password") or None + + self.conn = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True) + + def disconnect(self): + """ "Disconnect from Redis" """ + return self.conn.disconnect() + def close(self): + """ "Close the connection" """ + return self.conn.close() diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py new file mode 100644 index 0000000000..2204ca907e --- /dev/null +++ b/python/bullmq/scripts.py @@ -0,0 +1,219 @@ +""" + This class is used to load and execute Lua scripts. + It is a wrapper around the Redis client. +""" +from typing import Any, Dict, List, Union +import time +import json +import msgpack +import os + +from redis import Redis + +from bullmq.job import Job +from bullmq.error_code import ErrorCode + +basePath = os.path.dirname(os.path.realpath(__file__)) + +class Scripts: + + def __init__(self, prefix: str, queueName: str, redisClient): + self.prefix = prefix + self.queueName = queueName + self.keys = {} + self.redisClient = redisClient + self.commands = { + "addJob": redisClient.register_script(self.getScript("addJob-8.lua")), + "obliterate": redisClient.register_script(self.getScript("obliterate-2.lua")), + "pause": redisClient.register_script(self.getScript("pause-4.lua")), + "moveToActive": redisClient.register_script(self.getScript("moveToActive-9.lua")), + "moveToFinished": redisClient.register_script(self.getScript("moveToFinished-12.lua")), + "extendLock": redisClient.register_script(self.getScript("extendLock-2.lua")), + "moveStalledJobsToWait": redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")), + } + + # loop all the names and add them to the keys object + names = ["", "active", "wait", "paused", "completed", "failed", "delayed", "stalled", "limiter", "priority", "id", "stalled-check", "meta", "events"] + for name in names: + self.keys[name] = self.toKey(name) + + def toKey(self, name): + return self.prefix + ":" + self.queueName + ":" + name + + def getScript(self, name): + "Get a script by name" + file = open(basePath + "/commands/" + name, "r") + data = file.read() + file.close() + return data + + def getKeys(self, keys: list): + def mapKey(key): + return self.keys[key] + return list(map(mapKey, keys)) + + def addJob(self, job: Job): + "Add an item to the queue" + + packedArgs = msgpack.packb([self.keys[""], job.id or "", job.name, job.timestamp], use_bin_type=True) + # We are still lacking some arguments here: + # ARGV[1] msgpacked arguments array + # [1] key prefix, + # [2] custom id (will not generate one automatically) + # [3] name + # [4] timestamp + # [5] parentKey? + # [6] waitChildrenKey key. + # [7] parent dependencies key. + # [8] parent? {id, queueKey} + # [9] repeat job key + + jsonData = json.dumps(job.data, separators=(',', ':')) + packedOpts = msgpack.packb(job.opts) + + keys = self.getKeys(['wait', 'paused', 'meta', 'id', 'delayed', 'priority', 'completed', 'events']) + + return self.commands["addJob"](keys=keys, args=[packedArgs, jsonData, packedOpts]) + + def pause(self, pause: bool = True): + "Pause or resume a queue" + + src = "wait" if pause else "paused" + dst = "paused" if pause else "wait" + + keys = self.getKeys([src, dst, 'meta', 'events']) + return self.commands["pause"](keys, args=["paused" if pause else "resumed"]) + + async def obliterate(self, count: int, force: bool = False): + "Remove a queue completely" + keys = self.getKeys(['meta', '']) + result = await self.commands["obliterate"](keys, args=[count, force or ""]) + if (result < 0): + if (result == -1): + raise Exception("Cannot obliterate non-paused queue") + if (result == -2): + raise Exception("Cannot obliterate queue with active jobs") + return result + + async def moveToActive(self, token: str, opts: dict, jobId: str = "") -> list[Any]: + "Add an item to the queue" + + timestamp = round(time.time() * 1000) + + lockDuration = opts.get("lockDuration", 0) + limiter = opts.get("limiter", None) + + keys = self.getKeys(['wait', 'active', 'priority', 'events', 'stalled', 'limiter', 'delayed', 'paused', 'meta']) + packedOpts = msgpack.packb({"token": token, "lockDuration": lockDuration, "limiter": limiter }, use_bin_type=True) + args = [self.keys[''], timestamp, jobId or "", packedOpts] + + result = await self.commands["moveToActive"](keys=keys, args=args) + + # Todo: up to 4 results in tuple (only 2 now) + return raw2NextJobData(result) + + + def moveToCompleted(self, job: Job, val: Any, removeOnComplete, token: str, opts: dict, fetchNext = True): + return self.moveToFinished(job, val, "returnvalue", removeOnComplete, "completed", token, opts, fetchNext) + + def moveToFailed(self, job: Job, failedReason: str, removeOnFailed, token: str, opts: dict, fetchNext = True): + return self.moveToFinished(job, failedReason, "failedReason", removeOnFailed, "failed", token, opts, fetchNext) + + async def moveToFinished(self, job: Job, val: Any, propVal: str, shouldRemove, target, token: str, opts: dict, fetchNext = True ) -> list[Any] | None: + timestamp = round(time.time() * 1000) + metricsKey = self.toKey('metrics:' + target); + + keys = self.getKeys(['wait', 'active', 'priority', 'events', 'stalled', 'limiter', 'delayed', 'paused', target]) + keys.append(self.toKey(job.id)) + keys.append(self.keys['meta']) + keys.append(metricsKey) + + def getKeepJobs(shouldRemove): + if shouldRemove == True: + return { "count": 0 } + + if type(shouldRemove) == int: + return { "count": shouldRemove } + + if type(shouldRemove) == dict: + return shouldRemove + + if shouldRemove == False or shouldRemove == None: + return { "count": -1 } + + def getMetricsSize(opts): + metrics = opts.get("metrics") + if metrics != None: + return metrics.get("maxDataPoints", "") + return None + + def getFailParentOnFailure(job): + opts = job.opts + if opts != None: + return opts.get("failParentOnFailure", False) + + keepJobs = getKeepJobs(shouldRemove) + + packedOpts = msgpack.packb({ + "token": token, + "keepJobs": keepJobs, + "limiter": opts.get("limiter"), + "lockDuration": opts.get("lockDuration"), + "attempts": job.attempts, + "attemptsMade": job.attemptsMade, + "maxMetricsSize": getMetricsSize(opts), + "fpof": getFailParentOnFailure(job), + }, use_bin_type=True) + + args = [job.id, timestamp, propVal, val or "", target, "", fetchNext and "fetch" or "" , self.keys[''], packedOpts] + result = await self.commands["moveToFinished"](keys=keys, args=args) + + if result != None: + if result < 0: + raise finishedErrors(result, job.id, 'finished', 'active'); + else: + # I do not like this as it is using a sideeffect + job.finishedOn = timestamp + return raw2NextJobData(result) + return None + + def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None): + keys = [self.toKey(jobId) + ":lock", self.keys['stalled']] + args = [token, duration, jobId] + return self.commands["extendLock"](keys, args, client) + + def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int): + keys = self.getKeys(['stalled', 'wait', 'active', 'failed', 'stalled-check', 'meta', 'paused', 'events']) + args = [maxStalledCount, self.keys[''], round(time.time() * 1000), stalledInterval] + return self.commands["moveStalledJobsToWait"](keys, args) + +def finishedErrors(code: int, jobId: str, command: str, state: str) -> TypeError: + if code == ErrorCode.JobNotExist.value: + return TypeError("Missing key for job " + jobId + "." + command) + elif code == ErrorCode.JobLockNotExist.value: + return TypeError("Missing lock for job " + jobId + "." + command) + elif code == ErrorCode.JobNotInState.value: + return TypeError("Job " + jobId + " is not in the state" + state + "." + command) + elif code == ErrorCode.JobPendingDependencies.value: + return TypeError("Job " + jobId + " has pending dependencies. " + command) + elif code == ErrorCode.ParentJobNotExist.value: + return TypeError("Missing key for parent job " + jobId + "." + command) + elif code == ErrorCode.JobLockMismatch.value: + return TypeError("Lock mismatch for job " + jobId + ". Cmd "+ command + " from " + state) + else: + return TypeError("Unknown code " + str(code) + " error for " + jobId + "." + command) + +def raw2NextJobData(raw: list[Any]) -> list[Any] | None: + if raw: + # TODO: return all the raw datas (up to 4) + if raw[0]: + return (array2obj(raw[0]), raw[1]) + else: + return (None, raw[1]) + return None + +def array2obj(arr: [str]) -> {str: str}: + obj = {} + for i in range(0, len(arr), 2): + obj[arr[i]] = arr[i + 1] + return obj diff --git a/python/bullmq/timer.py b/python/bullmq/timer.py new file mode 100644 index 0000000000..4030697a04 --- /dev/null +++ b/python/bullmq/timer.py @@ -0,0 +1,23 @@ +import asyncio + +# Credits: https://stackoverflow.com/questions/45419723/python-timer-with-asyncio-coroutine +class Timer: + def __init__(self, interval, callback, *args, **kwargs): + self.interval = interval + self.args = args + self.kwargs = kwargs + self.callback = callback + self._ok = True + self._task = asyncio.ensure_future(self._job()) + + async def _job(self): + try: + while self._ok: + await asyncio.sleep(self.interval) + await self.callback(*self.args, **self.kwargs) + except Exception as ex: + print(ex) + + def stop(self): + self._ok = False + self._task.cancel() diff --git a/python/bullmq/worker.py b/python/bullmq/worker.py new file mode 100644 index 0000000000..f44835c252 --- /dev/null +++ b/python/bullmq/worker.py @@ -0,0 +1,175 @@ +from typing import Callable +from uuid import uuid4 +import asyncio +import traceback +import time + +from redis import Redis + +from bullmq.scripts import Scripts +from bullmq.redis_connection import RedisConnection +from bullmq.event_emitter import EventEmitter +from bullmq.job import Job +from bullmq.timer import Timer + +class Worker(EventEmitter): + def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], opts = {}): + super().__init__() + self.name = name + self.processor = processor + self.opts = { + "concurrency": opts.get("concurrency", 1), + "lockDuration": opts.get("lockDuration", 30000), + "maxStalledCount": opts.get("maxStalledCount", 1), + "stalledInterval": opts.get("stalledInterval", 30000), + } + + redisOpts = opts.get("connection") or {} + + self.redisConnection = RedisConnection(redisOpts) + self.client = self.redisConnection.conn + + self.blockingRedisConnection = RedisConnection(redisOpts) + self.bclient = self.blockingRedisConnection.conn + + self.scripts = Scripts(opts.get("prefix") or "bull", name, self.client) + + self.closing = False + self.forceClosing = False + self.closed = False + self.running = False + self.processing = set() + self.jobs = set() + + if opts.get("autorun", True): + asyncio.ensure_future(self.run()) + + async def run(self): + if self.running: + raise Exception("Worker is already running") + + self.timer = Timer((self.opts.get("lockDuration") / 2) / 1000, self.extendLocks) + self.stalledCheckTimer = Timer(self.opts.get("stalledInterval") / 1000, self.runStalledJobsCheck) + self.running = True + job = None + + token = uuid4().hex + while not self.closed: + if not job and len(self.processing) < self.opts.get("concurrency") and not self.closing: + waitingJob = asyncio.ensure_future(self.getNextJob(token)) + self.processing.add(waitingJob) + + if job: + processingJob = asyncio.ensure_future(self.processJob(job, token)) + self.processing.add(processingJob) + + try: + job, pending = await getFirstCompleted(self.processing) + self.processing = pending + + if job == None or len(self.processing) == 0 and self.closing: + # We are done processing so we can close the queue + break + + except Exception as e: + # This should never happen or we will have an endless loop + print("ERROR:", e) + traceback.print_exc() + return + + self.running = False + self.timer.stop() + self.stalledCheckTimer.stop() + + async def getNextJob(self, token: str): + # First try to move a job from the waiting list to the active list + result = await self.scripts.moveToActive(token, self.opts) + job = None + jobId = None + delayUntil = None + if result: + job, jobId = result + + # If there are no jobs in the waiting list we keep waiting with BRPOPLPUSH + if job == None: + timeout = min(delayUntil - int(time.time() * 1000) if delayUntil else 5000, 5000) / 1000 + jobId = await self.bclient.brpoplpush(self.scripts.keys["wait"], self.scripts.keys["active"], timeout) + if jobId: + job, jobId = await self.scripts.moveToActive(token, self.opts, jobId) + + if job and jobId: + return Job.fromJSON(self.client, job, jobId) + + async def processJob(self, job: Job, token: str): + try: + self.jobs.add((job, token)) + result = await self.processor(job, token) + if not self.forceClosing: + await self.scripts.moveToCompleted(job, result, job.opts.get("removeOnComplete", True), token, self.opts, fetchNext=not self.closing) + self.emit("completed", job, result) + except Exception as err: + try: + print("Error processing job", err) + stacktrace = traceback.format_exc() + + if not self.forceClosing: + await self.scripts.moveToFailed(job, str(err), job.opts.get("removeOnFail", False), token, self.opts, fetchNext=not self.closing) + + # TODO: Store the stacktrace in the job + + self.emit("failed", job, err) + except Exception as err: + print("Error moving job to failed", err) + self.emit("error", err, job) + finally: + self.jobs.remove((job, token)) + + async def extendLocks(self): + # Renew all the locks for the jobs that are still active + try: + multi = self.client.pipeline() + for job, token in self.jobs: + await self.scripts.extendLock(job.id, token, self.opts.get("lockDuration"), multi) + result = await multi.execute() + + # result includes an object with locks that may not have been renewed. + # We should emit an error for each of those jobs. + # for jobId, err in result.items(): + # self.emit("error", "could not renew lock for job " + jobId) + + except Exception as e: + print("Error renewing locks", e) + traceback.print_exc() + + async def runStalledJobsCheck(self): + try: + failed, stalled = await self.scripts.moveStalledJobsToWait(self.opts.get("maxStalledCount"), self.opts.get("stalledInterval")) + for jobId in failed: + self.emit("failed", jobId, "job stalled more than allowable limit") + for jobId in stalled: + self.emit("stalled", jobId) + + except Exception as e: + print("Error checking stalled jobs", e) + self.emit('error', e) + + async def close(self, force: bool = False): + """ "Close the worker" """ + if force: + self.forceClosing = True + + self.closing = True + + await self.blockingRedisConnection.close() + await self.redisConnection.close() + +async def getFirstCompleted( taskSet: set): + jobSet, pending = await asyncio.wait(taskSet, return_when=asyncio.FIRST_COMPLETED) + for jobTask in jobSet: + try: + job = jobTask.result() + return (job, pending) + except Exception as e: + print("ERROR:", e) + traceback.print_exc() + return pending diff --git a/python/requirements.txt b/python/requirements.txt new file mode 100644 index 0000000000..5bf6023f15 --- /dev/null +++ b/python/requirements.txt @@ -0,0 +1,11 @@ +async-timeout==4.0.2 +certifi==2022.12.7 +distlib==0.3.6 +filelock==3.9.0 +msgpack==1.0.4 +pipenv==2023.2.4 +platformdirs==2.6.2 +redis==4.5.1 +six==1.16.0 +virtualenv==20.18.0 +virtualenv-clone==0.5.7 diff --git a/python/run_tests.sh b/python/run_tests.sh new file mode 100755 index 0000000000..057ef24345 --- /dev/null +++ b/python/run_tests.sh @@ -0,0 +1,4 @@ +#!/bin/bash +redis-cli flushall +python3 -m unittest -v tests.queue_tests +python3 -m unittest -v tests.worker_tests \ No newline at end of file diff --git a/python/setup.py b/python/setup.py new file mode 100644 index 0000000000..265a10a8cd --- /dev/null +++ b/python/setup.py @@ -0,0 +1,37 @@ +from setuptools import setup + +# To use a consistent encoding +from codecs import open +from os import path + +# Get the long description from the README file +with open(path.join(".", 'README.md'), encoding='utf-8') as f: + long_description = f.read() + +setup( + name='bullmq', + version='0.1.0', + description='BullMQ for Python', + long_description=long_description, + long_description_content_type="text/markdown", + url='https://bullmq.io', + author='Taskforce.sh Inc.', + author_email='manast@taskforce.sh', + license='MIT', + packages=['bullmq'], + package_data={'bullmq': ['commands/*.lua']}, + install_requires=['redis', + 'msgpack', + ], + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.5', + ], +) diff --git a/python/tests/__init__.py b/python/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/tests/queue_tests.py b/python/tests/queue_tests.py new file mode 100644 index 0000000000..d0a768a0f1 --- /dev/null +++ b/python/tests/queue_tests.py @@ -0,0 +1,46 @@ +""" +Tests for the queue class. + +https://bbc.github.io/cloudfit-public-docs/asyncio/testing.html +""" + +import asyncio +import unittest + +from bullmq import Queue; + +queueName = "__bullmq_test_queue__" + +class TestQueue(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + print("Setting up test queue") + # Delete test queue + queue = Queue(queueName) + await queue.pause() + await queue.obliterate() + await queue.close() + + async def test_add_job(self): + queue = Queue(queueName) + job = await queue.add("test-job", {"foo": "bar"}, {}) + + self.assertEqual(job.id, "1") + await queue.close() + + async def test_add_job_with_options(self): + queue = Queue(queueName) + data = {"foo": "bar"} + attempts = 3, + delay = 1000 + job = await queue.add("test-job", data=data , opts={"attempts": attempts, "delay": delay}) + + self.assertEqual(job.id, "1") + self.assertEqual(job.attempts, attempts) + self.assertEqual(job.delay, delay) + self.assertEqual(job.data, data) + + await queue.close() + +if __name__ == '__main__': + unittest.main() diff --git a/python/tests/worker_tests.py b/python/tests/worker_tests.py new file mode 100644 index 0000000000..9078383891 --- /dev/null +++ b/python/tests/worker_tests.py @@ -0,0 +1,157 @@ +""" +Tests for the worker class. + +https://bbc.github.io/cloudfit-public-docs/asyncio/testing.html +""" + +import asyncio +import unittest +from asyncio import Future + +from bullmq import Queue, Worker, Job; + +queueName = "__test_queue__" + +class TestWorker(unittest.IsolatedAsyncioTestCase): + + async def asyncSetUp(self): + print("Setting up test queue") + # Delete test queue + queue = Queue(queueName) + await queue.pause() + await queue.obliterate() + await queue.close() + + async def test_process_jobs(self): + queue = Queue(queueName) + data = {"foo": "bar"} + job = await queue.add("test-job", data, { "removeOnComplete": False }) + + async def process(job: Job, token: str): + print("Processing job", job) + return "done" + + worker = Worker(queueName, process) + + processing = Future() + worker.on("completed", lambda job, result: processing.set_result(None)) + + await processing + + completedJob = await Job.fromId(queue, job.id) + + self.assertEqual(completedJob.id, job.id) + self.assertEqual(completedJob.attemptsMade, 1) + self.assertEqual(completedJob.data, data) + self.assertEqual(completedJob.returnvalue, "done") + self.assertNotEqual(completedJob.finishedOn, None) + + await worker.close(); + await queue.close() + + + async def test_process_jobs_fail(self): + queue = Queue(queueName) + data = {"foo": "bar"} + job = await queue.add("test-job", data, { "removeOnComplete": False }) + + failedReason = "Failed" + + async def process(job: Job, token: str): + print("Processing job", job) + raise Exception(failedReason) + + worker = Worker(queueName, process) + + processing = Future() + worker.on("failed", lambda job, result: processing.set_result(None)) + + await processing + + failedJob = await Job.fromId(queue, job.id) + + self.assertEqual(failedJob.id, job.id) + self.assertEqual(failedJob.attemptsMade, 1) + self.assertEqual(failedJob.data, data) + self.assertEqual(failedJob.failedReason, failedReason) + self.assertEqual(failedJob.stacktrace, []) + self.assertEqual(failedJob.returnvalue, None) + self.assertEqual(failedJob.returnvalue, None) + self.assertNotEqual(failedJob.finishedOn, None) + + await worker.close() + await queue.close() + + async def test_process_renews_lock(self): + queue = Queue(queueName) + data = {"foo": "bar"} + job = await queue.add("test-job", data, { "removeOnComplete": False }) + + async def process(job: Job, token: str): + await asyncio.sleep(3) + return "done" + + worker = Worker(queueName, process, { "lockDuration": 1000 }) + + processing = Future() + worker.on("completed", lambda job, result: processing.set_result(None)) + + await processing + + completedJob = await Job.fromId(queue, job.id) + + self.assertEqual(completedJob.id, job.id) + self.assertEqual(completedJob.attemptsMade, 1) + self.assertEqual(completedJob.data, data) + self.assertEqual(completedJob.returnvalue, "done") + self.assertNotEqual(completedJob.finishedOn, None) + + await worker.close(); + await queue.close() + + async def test_process_stalled_jobs(self): + queue = Queue(queueName) + data = {"foo": "bar"} + job = await queue.add("test-job", data, { "removeOnComplete": False }) + + startProcessing = Future() + async def process1(job: Job, token: str): + await asyncio.sleep(2) + startProcessing.set_result(None) + await asyncio.sleep(2) + return "done1" + + worker = Worker(queueName, process1, { "lockDuration": 1000 }) + + await startProcessing + await worker.close(force=True) + + async def process2(job: Job, token: str): + return "done2" + + worker2 = Worker(queueName, process2, { "lockDuration": 1000, "stalledInterval": 1000 }) + + processing = Future() + worker2.on("completed", lambda job, result: processing.set_result(None)) + + stalled = Future() + worker2.on("stalled", lambda jobId: stalled.set_result(None)) + + + await stalled + await processing + + completedJob = await Job.fromId(queue, job.id) + + self.assertEqual(completedJob.id, job.id) + self.assertEqual(completedJob.attemptsMade, 2) + self.assertEqual(completedJob.data, data) + self.assertEqual(completedJob.returnvalue, "done2") + self.assertNotEqual(completedJob.finishedOn, None) + + await worker2.close(); + await queue.close() + + +if __name__ == '__main__': + unittest.main()