Skip to content

Commit

Permalink
Merge pull request #1 from wlatanowicz/wl/refactor-custom-backends
Browse files Browse the repository at this point in the history
Custom backends
  • Loading branch information
wlatanowicz committed Aug 7, 2023
2 parents 0e8a2b3 + acce2dc commit 3e3571f
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 100 deletions.
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,63 @@ The Mule
===

[![tests](https://github.com/wlatanowicz/themule/actions/workflows/tests.yml/badge.svg)](https://github.com/wlatanowicz/themule/actions/workflows/tests.yml)

TheMule is an async task runner for python.

About
---

The purpose of this library is to run long running, heavy tasks in the background using cloud-native solutions. It's modular design allows you to use different backends for individual background tasks.


TheMule vs. Celery
---

TheMule is not intended to replace Celery. They play well together: Celery used to run small and fast tasks, while TheMule is used to run heavy and slow task.


Design principal | TheMule | Celery
---------------------|---------------------------------|--------
Worker lifecycle | One docker container per task | Long living worker process
Application codebase | Main application and worker can run from different docker images, but the syntax is simpler when they share the codebase | Main application and worker share codebase
Task assignment | Tasks can run on different backends; Tasks assignment is done in backend | Tasks can be assigned to workers with routing
Task queueing | Backend-specific | External queueing service (Redis, RabbitMQ)


Available Backends
===

TheMule comes with pre-made backends and you can write your own.


AWS Batch
---

Installation: `pip install themule[aws_batch]`

Class path: `themule.backends.AwsBatchBackend`

Configuration:

Job parameter | Env variable | Required | Default | Description
---|---|---|--|--
aws_batch_queue_name | THEMULE_AWS_BATCH_QUEUE_NAME | Yes | - | The name of AWS Batch queue
aws_batch_job_definition | THEMULE_AWS_BATCH_JOB_DEFINITION | Yes | - | The name of AWS Batch job definition


Local Docker
---

Installation: `pip install themule[docker]`

Class path: `themule.backends.LocalDockerBackend`

Configuration:

Job parameter | Env variable | Required | Default | Description
---|---|---|--|--
docker_image | THEMULE_DOCKER_IMAGE | Yes | - | The name of the Docker image
docker_entrypoint | THEMULE_DOCKER_ENTRYPOINT | No | None | Override of the container's entrypoint
docker_pass_environment | THEMULE_DOCKER_PASS_ENVIRONMENT | No | True | Passes application's env to worker container if true
docker_environment | THEMULE_DOCKER_ENVIRONMENT | No | None | Additional container environment variables
docker_auto_remove | THEMULE_DOCKER_AUTO_REMOVE | No | True | Removes the container after worker exit if true
10 changes: 10 additions & 0 deletions examples/app/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from time import sleep

from themule import job


@job()
def do_something(job_number):
print(f"Starting job {job_number}")
sleep(60)
print(f"Job finished {job_number}")
9 changes: 9 additions & 0 deletions examples/app/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from time import sleep

from jobs import do_something

if __name__ == "__main__":
for i in range(5):
print(f"Submitting job {i}...")
do_something.submit(job_number=i)
sleep(30)
13 changes: 13 additions & 0 deletions examples/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.10

COPY . /themule

RUN pip install -e "/themule/[docker]"

COPY ./examples/app/ /app/

WORKDIR /app

ENV PYTHONUNBUFFERED=1

CMD ["python3", "/app/main.py"]
15 changes: 15 additions & 0 deletions examples/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: '3.0'
services:
main:
build:
context: ../..
dockerfile: examples/docker/Dockerfile
image: themule-example-image
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
- THEMULE_BACKEND=themule.backends.LocalDockerBackend
- THEMULE_DOCKER_IMAGE=themule-example-image
- THEMULE_DOCKER_AUTO_REMOVE=false
- DOCKER_HOST=unix:///var/run/docker.sock
- PYTHONPATH=/app
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
boto3>=1.23.5
click>=8.1.3
django-environ>=0.9.0
1 change: 1 addition & 0 deletions requirements/extras/aws_batch.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3>=1.23.5
1 change: 1 addition & 0 deletions requirements/extras/docker.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker>=6.1.3
16 changes: 16 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ def get_about():
about = get_about()


BUNDLES = (
"aws_batch",
"docker",
)


def extras(*p):
"""Parse requirement in the requirements/extras/ directory."""
return reqs("extras", *p)

def extras_require():
"""Get map of all extra requirements."""
return {x: extras(x + ".txt") for x in BUNDLES}


setup(
name="themule",
version=about["__version__"],
Expand All @@ -64,6 +79,7 @@ def get_about():
packages=find_packages(exclude=["tests*",]),
zip_safe=False,
install_requires=reqs("base.txt"),
extras_require=extras_require(),
tests_require=reqs("tests.txt"),
classifiers=[
"Programming Language :: Python :: 3.9",
Expand Down
161 changes: 161 additions & 0 deletions themule/backends.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

from .conf import NOTSET, settings
from .job import StartedJob

if TYPE_CHECKING:
from .job import Job
from .serializers import BaseSerializer


DEFAULT_BACKEND = "themule.backends.AwsBatchBackend"


class BaseBackend:
OPTION_PREFIX = "base"

def __init__(self, **options) -> None:
pass

def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
raise NotImplementedError()

def get_path(self):
return f"{self.__module__}.{self.__class__.__name__}"

def get_option_value(self, options, option, default=NOTSET, cast=None):
return settings.get_value_for_job(
options,
self.OPTION_PREFIX,
option,
default=default,
cast=cast,
)


class AwsBatchBackend(BaseBackend):
OPTION_PREFIX = "aws_batch"

def __init__(self, **options) -> None:
self.queue_name = self.get_option_value(options, "queue_name")
self.job_definition = self.get_option_value(options, "job_definition")

def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
try:
import boto3
except ImportError:
raise ValueError("AWS support not installed")

serialized_job = serializer.serialize(job)

client = boto3.client("batch")
response = client.submit_job(
jobName=str(job.id),
jobQueue=self.queue_name,
jobDefinition=self.job_definition,
containerOverrides={
"command": [
"themule",
"execute-job",
"--serializer",
serializer.get_path(),
serialized_job,
],
},
)

job_id = str(response.get("jobId"))

return StartedJob(
self.get_path(),
job,
job_id,
)


class LocalDockerBackend(BaseBackend):
OPTION_PREFIX = "docker"

def __init__(self, **options) -> None:
self.docker_image = self.get_option_value(options, "image")
self.entrypoint = self.get_option_value(
options, "entrypoint", default=None, cast=list
)
self.environment = self.get_option_value(
options, "environment", default={}, cast=dict
)
self.pass_environment = self.get_option_value(
options, "pass_environment", default=True, cast=bool
)
self.auto_remove = self.get_option_value(
options, "auto_remove", default=True, cast=bool
)

def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
try:
import docker
except ImportError:
raise ValueError("Docker support not installed")

serialized_job = serializer.serialize(job)

environment = self.environment
if self.pass_environment:
environment = {
**os.environ,
**environment,
}

docker_command = [
"themule",
"execute-job",
"--serializer",
serializer.get_path(),
serialized_job,
]

client = docker.from_env()
container = client.containers.run(
self.docker_image,
docker_command,
entrypoint=self.entrypoint,
environment=environment,
auto_remove=self.auto_remove,
detach=True,
)

job_id = container.id

return StartedJob(
self.get_path(),
job,
job_id,
)


class LocalProcess(BaseBackend):
def submit_job(self, job: Job, serializer: BaseSerializer) -> StartedJob:
import subprocess

serialized_job = serializer.serialize(job)

command = [
"themule",
"execute-job",
"--serializer",
serializer.get_path(),
serialized_job,
]

process = subprocess.Popen(command)

job_id = process.pid

return StartedJob(
self.get_path(),
job,
job_id,
)
34 changes: 0 additions & 34 deletions themule/boto_client.py

This file was deleted.

Loading

0 comments on commit 3e3571f

Please sign in to comment.