Skip to content

Commit

Permalink
feat: initial python package (#1673)
Browse files Browse the repository at this point in the history
* feat: initial python package

* chore: correct python actions

* style: delete white spaces

* feat(python): add isPaused method

* chore: add missing async

* feat(python): add more features to the python package

* chore: avoid trigger npm releases for python changes

* chore(python): better module handling

* fix(python): some lint errors

---------

Co-authored-by: rogger andré valverde flores <rogger.valverde@uni.pe>
  • Loading branch information
manast and roggervalf committed Feb 15, 2023
1 parent af77b92 commit a97b22f
Show file tree
Hide file tree
Showing 21 changed files with 1,042 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Expand Up @@ -30,4 +30,4 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
NPM_TOKEN: ${{ secrets.NPM_TOKEN }}
run: npx semantic-release
run: npx multi-semantic-release --ignore-packages=python/**
51 changes: 50 additions & 1 deletion .github/workflows/test.yml
@@ -1,7 +1,7 @@
# This workflow will do a clean install of node dependencies, build the source code and run tests across different versions of node
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-nodejs-with-github-actions

name: Node.js Tests
name: Tests

on:
push:
Expand Down Expand Up @@ -86,3 +86,52 @@ jobs:
- run: yarn install --ignore-engines --frozen-lockfile --non-interactive
- run: yarn build
- run: yarn test

python:
runs-on: ubuntu-latest

name: testing python@${{ matrix.python-version }}, redis@${{ matrix.redis-version }}

strategy:
matrix:
node-version: [lts/*]
redis-version: [7-alpine]
python-version: ['3.10']

steps:
- name: Checkout repository
uses: actions/checkout@ac593985615ec2ede58e132d2e21d2b1cbd6127c # v3

- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@8c91899e586c5b171469028077307d293428b516 # tag=v3
with:
node-version: ${{ matrix.node-version }}
cache: 'yarn'
- name: Start Redis
uses: supercharge/redis-github-action@4b67a313c69bc7a90f162e8d810392fffe10d3b5 # tag=1.4.0
with:
redis-version: ${{ matrix.redis-version }}
- run: yarn install --ignore-engines --frozen-lockfile --non-interactive
- run: yarn build
- run: yarn copy:lua:python

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install flake8 mypy types-redis
pip install -r python/requirements.txt
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 ./python --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 ./python --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
cd python
./run_tests.sh
29 changes: 29 additions & 0 deletions .npmignore
@@ -0,0 +1,29 @@
.DS_Store
node_modules
/dist
/rawScripts
/src/scripts
# Log files
npm-debug.log*
yarn-debug.log*
yarn-error.log*

# Coverage files
.nyc_output
coverage
coverage/*

# Editor directories and files
.idea
.vscode
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw*
temp
docs/gitbook/api
package-lock.json

# Ignore python code from npm
python
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -25,6 +25,7 @@
"clean:temp:files": "rimraf dist/cjs/bullmq.d.ts dist/esm/bullmq.d.ts dist/tsdoc-metadata.json",
"copy:includes:lua": "copyfiles -f ./src/commands/includes/*.lua ./dist/cjs/commands/includes && copyfiles -f ./src/commands/includes/*.lua ./dist/esm/commands/includes",
"copy:lua": "copyfiles -f ./src/commands/*.lua ./dist/cjs/commands && copyfiles -f ./src/commands/*.lua ./dist/esm/commands",
"copy:lua:python": "copyfiles -f ./rawScripts/*.lua ./python/bullmq/commands",
"copy:master:type": "copyfiles -f ./dist/esm/classes/master.d.ts ./dist/cjs/classes",
"coverage": "nyc --reporter=text --reporter=lcovonly yarn test",
"cm": "git cz",
Expand Down
5 changes: 5 additions & 0 deletions python/.gitignore
@@ -0,0 +1,5 @@
__pycache__
bullmq/commands
build
bullmq.egg-info
dist
53 changes: 53 additions & 0 deletions python/README.md
@@ -0,0 +1,53 @@
# BullMQ For Python

This is the official BullMQ Python library. It is a close port of the NodeJS version of the library.
Python Queues are interoperable with NodeJS Queues, as both libraries use the same .lua scripts that
power all the functionality.

## Features

Currently, the library does not support all the features available in the NodeJS version. The following
have been ported so far:

- [ ] Add jobs to queues.

- [x] Regular jobs.
- [ ] Delayed jobs.
- [ ] Job priority.
- [ ] Repeatable.

- [ ] Workers
- [ ] Job events.
- [ ] Job progress.
- [ ] Job retries.
- [ ] Job backoff.
- [ ] Getters.

## Installation

```bash
pip install bullmq
```

## Usage

```python
from bullmq import Queue

queue = Queue('my-queue')

job = await queue.add('my-job', {'foo': 'bar'})

```

## Documentation

The documentation is available at [https://docs.bullmq.io](https://docs.bullmq.io)

## License

MIT

## Copyright

Copyright (c) 2018-2023, Taskforce.sh Inc. and other contributors.
12 changes: 12 additions & 0 deletions python/bullmq/__init__.py
@@ -0,0 +1,12 @@
"""
BullMQ
A background job processor and message queue for Python based on Redis.
"""
__version__ = "0.1.0"
__author__ = 'Taskforce.sh Inc.'
__credits__ = 'Taskforce.sh Inc.'

from bullmq.queue import Queue
from bullmq.job import Job
from bullmq.worker import Worker
9 changes: 9 additions & 0 deletions python/bullmq/error_code.py
@@ -0,0 +1,9 @@
from enum import Enum

class ErrorCode(Enum):
JobNotExist = -1
JobLockNotExist = -2
JobNotInState = -3
JobPendingDependencies = -4
ParentJobNotExist = -5
JobLockMismatch = -6
18 changes: 18 additions & 0 deletions python/bullmq/event_emitter.py
@@ -0,0 +1,18 @@
# Credits: https://gist.github.com/marc-x-andre/1c55b3fafd1d00cfdaa205ec53a08cf3
from typing import Dict


class EventEmitter:

def __init__(self):
self._callbacks: Dict[str, callable] = {}

def on(self, event_name, function):
self._callbacks[event_name] = self._callbacks.get(event_name, []) + [function]
return function

def emit(self, event_name, *args, **kwargs):
[function(*args, **kwargs) for function in self._callbacks.get(event_name, [])]

def off(self, event_name, function):
self._callbacks.get(event_name, []).remove(function)
100 changes: 100 additions & 0 deletions python/bullmq/job.py
@@ -0,0 +1,100 @@
from typing import Any
import json
import time

from redis import Redis
from typing import Dict, List, Union, Any

optsDecodeMap = {
'fpof': 'failParentOnFailure',
'kl': 'keepLogs',
}

optsEncodeMap = {v: k for k, v in optsDecodeMap.items()}

class Job:
"""
Instantiate a Queue object
"""
def __init__(self, client: Redis, name: str, data: Any, opts = {}):
self.name = name
self.id = opts.get("jobId", None)
self.progress = 0
self.timestamp = opts.get("timestamp", round(time.time() * 1000))
self.opts = opts
self.delay = opts.get("delay", 0)
self.attempts = opts.get("attempts", 1)
self.attemptsMade = 0
self.data = data
self.removeOnComplete = opts.get("removeOnComplete", True)
self.removeOnFail = opts.get("removeOnFail", False)
self.processedOn = 0
self.finishedOn = 0
self.returnvalue = None
self.failedReason = None
self.repeatJobKey = None
self.stacktrace: List[str] = []

def fromJSON(client: Redis, rawData, jobId = None):
data = json.loads(rawData.get("data", '{}'))
opts = optsFromJSON(json.loads(rawData.get("opts", '{}')))

job = Job(client, rawData.get("name"), data, opts)
job.id = jobId or rawData.get("id", b'').decode("utf-8")

job.progress = json.loads(rawData.get("progress", '0'))
job.delay = int(rawData.get("delay", "0"))
job.timestamp = int(rawData.get("timestamp", "0"))

if rawData.get("finishedOn"):
job.finishedOn = int(rawData.get("finishedOn"))

if rawData.get("processedOn"):
job.processedOn = int(rawData.get("processedOn"))

if rawData.get("rjk"):
job.repeatJobKey = rawData.get("rjk")

job.failedReason = rawData.get("failedReason")
job.attemptsMade = int(rawData.get("attemptsMade", "0"))

returnvalue = rawData.get("returnvalue")
if type(returnvalue) == str:
job.returnvalue = getReturnValue(returnvalue)

job.stacktrace = json.loads(rawData.get("stacktrace", "[]"))

# if (json.parentKey) {
# job.parentKey = json.parentKey;
# }

# if (json.parent) {
# job.parent = JSON.parse(json.parent);
# }

return job

Job.fromJSON = staticmethod(fromJSON)

def optsFromJSON(rawOpts):
# opts = json.loads(rawOpts)
opts = rawOpts

optionEntries = opts.items()

options = {}
for item in optionEntries:
attributeName = item[0]
value = item[1]
if attributeName in optsDecodeMap:
options[optsDecodeMap[attributeName]] = value
else:
options[attributeName] = value

return options

def getReturnValue(value: Any):
try:
json.loads(value)
except Exception as err:
return value
72 changes: 72 additions & 0 deletions python/bullmq/queue.py
@@ -0,0 +1,72 @@
import redis.asyncio as redis

from bullmq.scripts import Scripts
from bullmq.job import Job
from bullmq.redis_connection import RedisConnection

class Queue:
"""
Instantiate a Queue object
"""
def __init__(self, name: str, redisOpts={}, opts={}):
""" "Initialize a connection" """

self.name = name
self.redisConnection = RedisConnection(redisOpts)
self.client = self.redisConnection.conn
self.opts = opts

self.prefix = opts.get("prefix") or "bull"

self.scripts = Scripts(self.prefix, name, self.redisConnection.conn)

"""
Add an item to the queue.
@param name: The name of the queue
@param data: The data to add to the queue (must be JSON serializable)
"""
async def add(self, name: str, data, opts = {}):
""" Add an item to the queue """
job = Job(self.client, name, data, opts)
jobId = await self.scripts.addJob(job)
job.id = jobId
return job

"""
Pauses the processing of this queue globally
"""
def pause(self):
return self.scripts.pause(True)

def resume(self):
return self.scripts.pause(False)

async def isPaused(self):
pausedKeyExists = await self.conn.hexists(self.opts.get("prefix") or "bull" + ":" + self.name + ":meta", "paused")
return pausedKeyExists == 1

"""
Remove everything from the queue.
"""
async def obliterate(self, force: bool = False):
""" "Obliterate the queue" """
await self.pause()
while True:
cursor = await self.scripts.obliterate(1000, force)
if cursor == 0 or cursor == None or cursor == "0":
break

"""
Closes the queue and the underlying connection to Redis.
"""
def close(self):
""" "Close the connection" """
return self.redisConnection.close()

async def fromId(queue: Queue, jobId: str):
key = queue.prefix + ":" + queue.name + ":" + jobId
rawData = await queue.client.hgetall(key)
return Job.fromJSON(queue.client, rawData, jobId)

Job.fromId = staticmethod(fromId)

0 comments on commit a97b22f

Please sign in to comment.