diff --git a/api/index.html b/api/index.html
index abc1aa4..4739a56 100644
--- a/api/index.html
+++ b/api/index.html
@@ -483,29 +483,31 @@
In this example each task will be executed one after the other : first the task A will
be executed, then the task B and finally the task C.
Celery provides a scheduler used to periodically execute some tasks. This scheduler is named
the Celery beat.
-Director allows you to periodically schedule a whole workflow using a simple YAML syntax :
+Director allows you to periodically schedule a whole workflow using a simple YAML syntax.
+When a Celery worker will execute this code, an issue will be created in Sentry with the ZeroDivisionError
:
In order to group the issues by workflow's name or by project, Director associated some tags to the event :
diff --git a/guides/run-workflows/index.html b/guides/run-workflows/index.html
index 8d04a42..622054b 100644
--- a/guides/run-workflows/index.html
+++ b/guides/run-workflows/index.html
@@ -471,16 +471,18 @@ You can run a workflow using a POST request on the Director API. This is very
convenient if your applications are based on webservices.
This way the whole list of users will be updated every hours, and a manual update
can be done on a specific user :
The following workflows present different usecases and the output of the C task (see the
Build Workflows guide to understand the YAML format) :
director init [path]
- Create a new project.
diff --git a/quickstart/index.html b/quickstart/index.html
index 53b43ad..8bab586 100644
--- a/quickstart/index.html
+++ b/quickstart/index.html
@@ -500,6 +500,7 @@ Generate the project
You can now export the DIRECTOR_HOME environment variable
+
Info
Because you can have several projects in your machine, Director needs to know what
@@ -515,6 +516,7 @@
Generate the project
... # Other files containing other tasks.
+
The Python files in the tasks
folder will contain your Celery tasks :
from director import task
@@ -524,6 +526,7 @@ Generate the project
print("Extracting data")
+
While the workflows.yml
file will be used to combine them into workflows :
example.ETL:
tasks:
@@ -532,6 +535,7 @@ Generate the project
- LOAD
+
Director needs a connection to a relational database to store the dependencies
between your tasks.
@@ -542,6 +546,7 @@
DIRECTOR_DATABASE_URI="sqlite:////path/to/your/database.db" # Using SQLite
+
Generate the database:
$ director db upgrade
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
@@ -549,6 +554,7 @@
INFO [alembic.runtime.migration] Running upgrade -> 70631f8bcff3, Init database
+
Run the workflow
The workflow
command can be used to manage your workflows.
List
@@ -567,20 +573,24 @@ Run the workflow
+-----------------+----------+----------------------+
+
Run
$ director workflow run example.ETL
+
Execute the tasks
A Celery worker instance needs to be started to consume the broker and execute the tasks :
$ director celery worker --loglevel=INFO
+
Display the result
You can finally start the webserver to track the tasks evolution :
+
The WebUI is available by default on http://127.0.0.1:8000 :
diff --git a/search/search_index.json b/search/search_index.json
index 5b02dad..f18bbd6 100644
--- a/search/search_index.json
+++ b/search/search_index.json
@@ -1 +1 @@
-{"config":{"lang":["en"],"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"","text":"Celery Director Documentation Director is a simple and rapid framework used to manage tasks and build workflows using Celery. Features The objective is to make Celery easier to use by providing : a WebUI to track the tasks states, an API and a CLI to manage and execute the workflows, a YAML syntax used to combine tasks into workflows, the ability to periodically launch a whole workflow, and many others. Info Director is built on top of the excellent Celery library . All the orchestration engine has not been changed : we didn't want to reinvent the wheel but provide an easy tool to use Celery. It means that all your existing tasks can easily be migrated to Director. Furthermore the documentation of the tasks and all the features powered by Celery like the rate limiting , the task exception retrying or even the queue routing stay the same. Installation Install the latest version of Director with pip (requires Python 3.6 at least): pip install celery-director Usage Write your code in Python # tasks/orders.py from director import task from .utils import Order , Mail @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send () Build your workflows in YAML # workflows.yml product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL Run it You can simply test your workflow in local : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' And run it in production using the director API : $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http://localhost:8000/api/workflows Project layout .env # The configuration file. workflows.yml # The workflows definition. tasks / example.py # A file containing some tasks. ... # Other files containing other tasks. Commands director init [path] - Create a new project. director celery [worker|beat|flower] - Start Celery daemons. director webserver - Start the webserver. director workflow [list|show|run] - Manage your project workflows.","title":"Introduction"},{"location":"#celery-director-documentation","text":"Director is a simple and rapid framework used to manage tasks and build workflows using Celery.","title":"Celery Director Documentation"},{"location":"#features","text":"The objective is to make Celery easier to use by providing : a WebUI to track the tasks states, an API and a CLI to manage and execute the workflows, a YAML syntax used to combine tasks into workflows, the ability to periodically launch a whole workflow, and many others. Info Director is built on top of the excellent Celery library . All the orchestration engine has not been changed : we didn't want to reinvent the wheel but provide an easy tool to use Celery. It means that all your existing tasks can easily be migrated to Director. Furthermore the documentation of the tasks and all the features powered by Celery like the rate limiting , the task exception retrying or even the queue routing stay the same.","title":"Features"},{"location":"#installation","text":"Install the latest version of Director with pip (requires Python 3.6 at least): pip install celery-director","title":"Installation"},{"location":"#usage","text":"","title":"Usage"},{"location":"#write-your-code-in-python","text":"# tasks/orders.py from director import task from .utils import Order , Mail @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send ()","title":"Write your code in Python"},{"location":"#build-your-workflows-in-yaml","text":"# workflows.yml product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL","title":"Build your workflows in YAML"},{"location":"#run-it","text":"You can simply test your workflow in local : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' And run it in production using the director API : $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http://localhost:8000/api/workflows","title":"Run it"},{"location":"#project-layout","text":".env # The configuration file. workflows.yml # The workflows definition. tasks / example.py # A file containing some tasks. ... # Other files containing other tasks.","title":"Project layout"},{"location":"#commands","text":"director init [path] - Create a new project. director celery [worker|beat|flower] - Start Celery daemons. director webserver - Start the webserver. director workflow [list|show|run] - Manage your project workflows.","title":"Commands"},{"location":"api/","text":"API Documentation GET /api/workflows List the workflows instances. Example request: GET /api/workflows HTTP / 1.1 Host : example.com Accept : application/json Parameters: per_page (optional, default: 1000): the number of workflows to return page (optional, default: 1): the page to start Example response: HTTP / 1.1 200 OK [ { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : { \"foo\" : \"bar\" }, \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T13:56:51\" } ] GET /api/workflows/ Get the details of a specific workflow instance, including its tasks. Example request: GET /api/workflows/29e7ef80-fa1b-4b91-8ccb-ef01a91601db HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1.1 200 OK { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : {}, \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"tasks\" : [ { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"key\" : \"EXTRACT\" , \"previous\" : [], \"status\" : \"pending\" , \"task\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"updated\" : \"2020-02-06T13:56:51 }, { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"35a2d47b-8105-4d03-becb-7eb48f8c062e\" , \"key\" : \"TRANSFORM\" , \"previous\" : [ \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" ], \"status\" : \"pending\" , \"task\" : \"35a2d47b-8105-4d03-becb-7eb48f8c062e\" , \"updated\" : \"2020-02-06T13:56:51\" }, { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"e5a8eb49-0a8c-4063-ad08-a5e9e7bd49d2\" , \"key\" : \"LOAD\" , \"previous\" : [ \"35a2d47b-8105-4d03-becb-7eb48f8c062e\" ], \"status\" : \"pending\" , \"task\" : \"e5a8eb49-0a8c-4063-ad08-a5e9e7bd49d2\" , \"updated\" : \"2020-02-06T13:56:51\" } ], \"updated\" : \"2020-02-06T13:56:51\" } POST /api/workflows Execute a new workflow. Example request: POST /api/workflows HTTP / 1.1 Host : example.com Accept : application/json { \"project\": \"example\", \"name\": \"ETL\", \"paylod\": {} } Example response: HTTP / 1.1 201 CREATED { \"created\" : \"2020-02-06T14:01:02\" , \"fullname\" : \"example.ETL\" , \"id\" : \"43e70707-b661-42e1-a7df-5b98851ae340\" , \"name\" : \"ETL\" , \"payload\" : {}, \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T14:01:02\" } GET /api/ping Health endpoint used to monitor Director API. Example request: GET /api/ping HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1 . 1 200 OK { \"message\" : \"pong\" }","title":"API"},{"location":"api/#api-documentation","text":"","title":"API Documentation"},{"location":"api/#get-apiworkflows","text":"List the workflows instances. Example request: GET /api/workflows HTTP / 1.1 Host : example.com Accept : application/json Parameters: per_page (optional, default: 1000): the number of workflows to return page (optional, default: 1): the page to start Example response: HTTP / 1.1 200 OK [ { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : { \"foo\" : \"bar\" }, \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T13:56:51\" } ]","title":"GET /api/workflows"},{"location":"api/#get-apiworkflowsid","text":"Get the details of a specific workflow instance, including its tasks. Example request: GET /api/workflows/29e7ef80-fa1b-4b91-8ccb-ef01a91601db HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1.1 200 OK { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : {}, \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"tasks\" : [ { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"key\" : \"EXTRACT\" , \"previous\" : [], \"status\" : \"pending\" , \"task\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"updated\" : \"2020-02-06T13:56:51 }, { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"35a2d47b-8105-4d03-becb-7eb48f8c062e\" , \"key\" : \"TRANSFORM\" , \"previous\" : [ \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" ], \"status\" : \"pending\" , \"task\" : \"35a2d47b-8105-4d03-becb-7eb48f8c062e\" , \"updated\" : \"2020-02-06T13:56:51\" }, { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"e5a8eb49-0a8c-4063-ad08-a5e9e7bd49d2\" , \"key\" : \"LOAD\" , \"previous\" : [ \"35a2d47b-8105-4d03-becb-7eb48f8c062e\" ], \"status\" : \"pending\" , \"task\" : \"e5a8eb49-0a8c-4063-ad08-a5e9e7bd49d2\" , \"updated\" : \"2020-02-06T13:56:51\" } ], \"updated\" : \"2020-02-06T13:56:51\" }","title":"GET /api/workflows/<id>"},{"location":"api/#post-apiworkflows","text":"Execute a new workflow. Example request: POST /api/workflows HTTP / 1.1 Host : example.com Accept : application/json { \"project\": \"example\", \"name\": \"ETL\", \"paylod\": {} } Example response: HTTP / 1.1 201 CREATED { \"created\" : \"2020-02-06T14:01:02\" , \"fullname\" : \"example.ETL\" , \"id\" : \"43e70707-b661-42e1-a7df-5b98851ae340\" , \"name\" : \"ETL\" , \"payload\" : {}, \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T14:01:02\" }","title":"POST /api/workflows"},{"location":"api/#get-apiping","text":"Health endpoint used to monitor Director API. Example request: GET /api/ping HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1 . 1 200 OK { \"message\" : \"pong\" }","title":"GET /api/ping"},{"location":"quickstart/","text":"Quickstart Generate the project One of the Director goals is to facilitate the usage of Celery. For that we wanted to simplify the initialization of a Celery project by removing all the long an boring boilerplate steps (application creation, broker configuration...). So the first thing to do is to generate a project using the director init command : $ director init workflows [*] Project created in /home/director/workflows [*] Do not forget to initialize the database You can now export the DIRECTOR_HOME environment variable Info Because you can have several projects in your machine, Director needs to know what project you want to target when using the director command. So don't forget to set your DIRECTOR_HOME environment variable : $ export DIRECTOR_HOME=\"/home/director/workflows\" Director created the following structure for you containing a simple example : .env # The configuration file. workflows.yml # The workflows definition. tasks / etl.py # A file containing some tasks. ... # Other files containing other tasks. The Python files in the tasks folder will contain your Celery tasks : from director import task @task ( name = \"EXTRACT\" ) def extract ( * args , ** kwargs ): print ( \"Extracting data\" ) While the workflows.yml file will be used to combine them into workflows : example.ETL : tasks : - EXTRACT - TRANSFORM - LOAD Configure the database Director needs a connection to a relational database to store the dependencies between your tasks. Adjust the DIRECTOR_DATABASE_URI variable in the .env file and generate the database : Possible values for the DIRECTOR_DATABASE_URI variable: DIRECTOR_DATABASE_URI = \"postgresql://user:password@hostname:port/database\" # Using PostgreSQL DIRECTOR_DATABASE_URI = \"mysql+mysqlconnector://user:password@hostname:port/database\" # Using MySQL DIRECTOR_DATABASE_URI = \"sqlite:////path/to/your/database.db\" # Using SQLite Generate the database: $ director db upgrade INFO [ alembic.runtime.migration ] Context impl SQLiteImpl. INFO [ alembic.runtime.migration ] Will assume non-transactional DDL. INFO [ alembic.runtime.migration ] Running upgrade -> 70631f8bcff3, Init database Run the workflow The workflow command can be used to manage your workflows. List $ director workflow list +-----------------+----------+----------------------+ | Workflows ( 2 ) | Periodic | Tasks | +-----------------+----------+----------------------+ | example.ETL | -- | EXTRACT | | | | TRANSFORM | | | | LOAD | +-----------------+----------+----------------------+ | example.RANDOMS | -- | Group GROUP_RANDOMS: | | | | \u2514 RANDOM | | | | \u2514 RANDOM | | | | ADD | +-----------------+----------+----------------------+ Run $ director workflow run example.ETL Execute the tasks A Celery worker instance needs to be started to consume the broker and execute the tasks : $ director celery worker --loglevel = INFO Display the result You can finally start the webserver to track the tasks evolution : $ director webserver The WebUI is available by default on http://127.0.0.1:8000 : Change default parameters The webserver command forwards the arguments to gunicorn. For instance the binding can be easily changed : director webserver -b 0.0.0.0:5000 .","title":"Quickstart"},{"location":"quickstart/#quickstart","text":"","title":"Quickstart"},{"location":"quickstart/#generate-the-project","text":"One of the Director goals is to facilitate the usage of Celery. For that we wanted to simplify the initialization of a Celery project by removing all the long an boring boilerplate steps (application creation, broker configuration...). So the first thing to do is to generate a project using the director init command : $ director init workflows [*] Project created in /home/director/workflows [*] Do not forget to initialize the database You can now export the DIRECTOR_HOME environment variable Info Because you can have several projects in your machine, Director needs to know what project you want to target when using the director command. So don't forget to set your DIRECTOR_HOME environment variable : $ export DIRECTOR_HOME=\"/home/director/workflows\" Director created the following structure for you containing a simple example : .env # The configuration file. workflows.yml # The workflows definition. tasks / etl.py # A file containing some tasks. ... # Other files containing other tasks. The Python files in the tasks folder will contain your Celery tasks : from director import task @task ( name = \"EXTRACT\" ) def extract ( * args , ** kwargs ): print ( \"Extracting data\" ) While the workflows.yml file will be used to combine them into workflows : example.ETL : tasks : - EXTRACT - TRANSFORM - LOAD","title":"Generate the project"},{"location":"quickstart/#configure-the-database","text":"Director needs a connection to a relational database to store the dependencies between your tasks. Adjust the DIRECTOR_DATABASE_URI variable in the .env file and generate the database : Possible values for the DIRECTOR_DATABASE_URI variable: DIRECTOR_DATABASE_URI = \"postgresql://user:password@hostname:port/database\" # Using PostgreSQL DIRECTOR_DATABASE_URI = \"mysql+mysqlconnector://user:password@hostname:port/database\" # Using MySQL DIRECTOR_DATABASE_URI = \"sqlite:////path/to/your/database.db\" # Using SQLite Generate the database: $ director db upgrade INFO [ alembic.runtime.migration ] Context impl SQLiteImpl. INFO [ alembic.runtime.migration ] Will assume non-transactional DDL. INFO [ alembic.runtime.migration ] Running upgrade -> 70631f8bcff3, Init database","title":"Configure the database"},{"location":"quickstart/#run-the-workflow","text":"The workflow command can be used to manage your workflows. List $ director workflow list +-----------------+----------+----------------------+ | Workflows ( 2 ) | Periodic | Tasks | +-----------------+----------+----------------------+ | example.ETL | -- | EXTRACT | | | | TRANSFORM | | | | LOAD | +-----------------+----------+----------------------+ | example.RANDOMS | -- | Group GROUP_RANDOMS: | | | | \u2514 RANDOM | | | | \u2514 RANDOM | | | | ADD | +-----------------+----------+----------------------+ Run $ director workflow run example.ETL","title":"Run the workflow"},{"location":"quickstart/#execute-the-tasks","text":"A Celery worker instance needs to be started to consume the broker and execute the tasks : $ director celery worker --loglevel = INFO","title":"Execute the tasks"},{"location":"quickstart/#display-the-result","text":"You can finally start the webserver to track the tasks evolution : $ director webserver The WebUI is available by default on http://127.0.0.1:8000 : Change default parameters The webserver command forwards the arguments to gunicorn. For instance the binding can be easily changed : director webserver -b 0.0.0.0:5000 .","title":"Display the result"},{"location":"guides/build-workflows/","text":"Build Workflows Director separates the tasks logic from the workflows definition by providing a simple YAML syntax. Let's imagine the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): pass @task ( name = \"B\" ) def b ( * args , ** kwargs ): pass @task ( name = \"C\" ) def c ( * args , ** kwargs ): pass Chaining multiple tasks Chaining these tasks in the workflows.yml file is pretty simple : # Chain example # # +-------+ +-------+ +-------+ # | A +----->+ B +----->+ C | # +-------+ +-------+ +-------+ # example.CHAIN : tasks : - A - B - C In this example each task will be executed one after the other : first the task A will be executed, then the task B and finally the task C. Launch tasks in parallel Sometimes you need to execute some tasks in parallel to improve your workflow performance. The type: group keywords can be used to handle this canvas : # Group example # +-------+ # +-->+ B | # +-------+ | +-------+ # + A +--+ # +-------+ | +-------+ # +-->+ C | # +-------+ example.GROUP : tasks : - A - GROUP_1 : type : group tasks : - B - C In this example the group is named GROUP_1 but it can be anything. The important is to keep unique names in case of multiple groups in your workflow. Periodic workflows Celery provides a scheduler used to periodically execute some tasks. This scheduler is named the Celery beat . Director allows you to periodically schedule a whole workflow using a simple YAML syntax : example.CHAIN : tasks : - A - B - C periodic : schedule : 60 The periodic > schedule key takes an integer argument (unity is the second). So in this example the example.CHAIN worflow will be executed every 60 seconds . Please note that the scheduler must be started to handle periodic workflows : $ director celery beat Tip Celery also accepts the -B option when launching a worker : $ director celery worker --loglevel=INFO -B This way you can start your worker and scheduler instances using a single command. Please note this option is only to use during your development, otherwise use the celery beat command. Use of queues in Workflows With director, you can set queues for workflows. All workflow's tasks will use the same queue: example.ETL : tasks : - A - B - C queue : q1 You need the start Celery worker instance with the --queues option: $ director celery worker --loglevel = INFO --queues = q1","title":"Build Workflows"},{"location":"guides/build-workflows/#build-workflows","text":"Director separates the tasks logic from the workflows definition by providing a simple YAML syntax. Let's imagine the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): pass @task ( name = \"B\" ) def b ( * args , ** kwargs ): pass @task ( name = \"C\" ) def c ( * args , ** kwargs ): pass","title":"Build Workflows"},{"location":"guides/build-workflows/#chaining-multiple-tasks","text":"Chaining these tasks in the workflows.yml file is pretty simple : # Chain example # # +-------+ +-------+ +-------+ # | A +----->+ B +----->+ C | # +-------+ +-------+ +-------+ # example.CHAIN : tasks : - A - B - C In this example each task will be executed one after the other : first the task A will be executed, then the task B and finally the task C.","title":"Chaining multiple tasks"},{"location":"guides/build-workflows/#launch-tasks-in-parallel","text":"Sometimes you need to execute some tasks in parallel to improve your workflow performance. The type: group keywords can be used to handle this canvas : # Group example # +-------+ # +-->+ B | # +-------+ | +-------+ # + A +--+ # +-------+ | +-------+ # +-->+ C | # +-------+ example.GROUP : tasks : - A - GROUP_1 : type : group tasks : - B - C In this example the group is named GROUP_1 but it can be anything. The important is to keep unique names in case of multiple groups in your workflow.","title":"Launch tasks in parallel"},{"location":"guides/build-workflows/#periodic-workflows","text":"Celery provides a scheduler used to periodically execute some tasks. This scheduler is named the Celery beat . Director allows you to periodically schedule a whole workflow using a simple YAML syntax : example.CHAIN : tasks : - A - B - C periodic : schedule : 60 The periodic > schedule key takes an integer argument (unity is the second). So in this example the example.CHAIN worflow will be executed every 60 seconds . Please note that the scheduler must be started to handle periodic workflows : $ director celery beat Tip Celery also accepts the -B option when launching a worker : $ director celery worker --loglevel=INFO -B This way you can start your worker and scheduler instances using a single command. Please note this option is only to use during your development, otherwise use the celery beat command.","title":"Periodic workflows"},{"location":"guides/build-workflows/#use-of-queues-in-workflows","text":"With director, you can set queues for workflows. All workflow's tasks will use the same queue: example.ETL : tasks : - A - B - C queue : q1 You need the start Celery worker instance with the --queues option: $ director celery worker --loglevel = INFO --queues = q1","title":"Use of queues in Workflows"},{"location":"guides/enable-authentication/","text":"Enable authentication Director provide basic authentication. To enable it, you have to create users and set DIRECTOR_AUTH_ENABLED variable to true in the .env file. Manage user You can manage users using the CLI. $ director user [ create | list | update | delete ] Create user example: $ director user create john","title":"Enable Authentication"},{"location":"guides/enable-authentication/#enable-authentication","text":"Director provide basic authentication. To enable it, you have to create users and set DIRECTOR_AUTH_ENABLED variable to true in the .env file.","title":"Enable authentication"},{"location":"guides/enable-authentication/#manage-user","text":"You can manage users using the CLI. $ director user [ create | list | update | delete ] Create user example: $ director user create john","title":"Manage user"},{"location":"guides/error-tracking/","text":"Error Tracking Director can send errors to Sentry . You can enable this feature by adding your SENTRY_DSN value in the DIRECTOR_SENTRY_DSN variable of the .env file : DIRECTOR_SENTRY_DSN = \"https://xyz@sentry.example.com/0\" Let's imagine the following workflow : # workflows.yml --- demo.SENTRY_ALERT : tasks : - WORKING_TASK - ERROR_TASK With the associated tasks : # tasks/example.py from director import task @task ( name = \"WORKING_TASK\" ) def working_task ( * args , ** kwargs ): return { \"hello\" : \"world\" } @task ( name = \"ERROR_TASK\" ) def error_task ( * args , ** kwargs ): print ( 1 / 0 ) When a Celery worker will execute this code, an issue will be created in Sentry with the ZeroDivisionError : In order to group the issues by workflow's name or by project, Director associated some tags to the event : Each event also contains additional data to better dig into the problem :","title":"Error Tracking"},{"location":"guides/error-tracking/#error-tracking","text":"Director can send errors to Sentry . You can enable this feature by adding your SENTRY_DSN value in the DIRECTOR_SENTRY_DSN variable of the .env file : DIRECTOR_SENTRY_DSN = \"https://xyz@sentry.example.com/0\" Let's imagine the following workflow : # workflows.yml --- demo.SENTRY_ALERT : tasks : - WORKING_TASK - ERROR_TASK With the associated tasks : # tasks/example.py from director import task @task ( name = \"WORKING_TASK\" ) def working_task ( * args , ** kwargs ): return { \"hello\" : \"world\" } @task ( name = \"ERROR_TASK\" ) def error_task ( * args , ** kwargs ): print ( 1 / 0 ) When a Celery worker will execute this code, an issue will be created in Sentry with the ZeroDivisionError : In order to group the issues by workflow's name or by project, Director associated some tags to the event : Each event also contains additional data to better dig into the problem :","title":"Error Tracking"},{"location":"guides/run-workflows/","text":"Run Workflows The next step after building your workflows is of course to execute them, and Director provides several methods for that. Using the CLI This method can be useful if you want to test your tasks and workflows when you are developing them : $ director workflow run ovh.MY_WORKFLOW Using the API You can run a workflow using a POST request on the Director API. This is very convenient if your applications are based on webservices. The request is a POST on the /api/workflows endpoint : $ curl -- header \"Content-Type: application/json\" \\ -- request POST \\ -- data '{\"project\":\"ovh\", \"name\": \"MY_WORKFLOW\", \"payload\": {}}' \\ http : // localhost : 8000 / api / workflows Technical explanation To really understand this feature it's important to know how native Celery works. Concretely Celery is constantly exchanging messages through a broker. Producers are sending tasks in it and workers, consuming the queue, are really executing the Python code. To make it work both side must be able to import the Python code (it means the producers and the consumer must be able to do from tasks import my_task ). Sometimes it's difficult to allow that in distributed environment. Director solves this problem by providing an endpoint used to execute a workflow and its tasks, so there's no more need to have the modules available in the producer's PYTHON_PATH. Using the periodic scheduling A workflow can also be execute periodically without any manual action from the user. Director provides a simple YAML syntax for that.","title":"Run Workflows"},{"location":"guides/run-workflows/#run-workflows","text":"The next step after building your workflows is of course to execute them, and Director provides several methods for that.","title":"Run Workflows"},{"location":"guides/run-workflows/#using-the-cli","text":"This method can be useful if you want to test your tasks and workflows when you are developing them : $ director workflow run ovh.MY_WORKFLOW","title":"Using the CLI"},{"location":"guides/run-workflows/#using-the-api","text":"You can run a workflow using a POST request on the Director API. This is very convenient if your applications are based on webservices. The request is a POST on the /api/workflows endpoint : $ curl -- header \"Content-Type: application/json\" \\ -- request POST \\ -- data '{\"project\":\"ovh\", \"name\": \"MY_WORKFLOW\", \"payload\": {}}' \\ http : // localhost : 8000 / api / workflows Technical explanation To really understand this feature it's important to know how native Celery works. Concretely Celery is constantly exchanging messages through a broker. Producers are sending tasks in it and workers, consuming the queue, are really executing the Python code. To make it work both side must be able to import the Python code (it means the producers and the consumer must be able to do from tasks import my_task ). Sometimes it's difficult to allow that in distributed environment. Director solves this problem by providing an endpoint used to execute a workflow and its tasks, so there's no more need to have the modules available in the producer's PYTHON_PATH.","title":"Using the API"},{"location":"guides/run-workflows/#using-the-periodic-scheduling","text":"A workflow can also be execute periodically without any manual action from the user. Director provides a simple YAML syntax for that.","title":"Using the periodic scheduling"},{"location":"guides/use-payload/","text":"Use Payload Most of the time your workflow and their tasks will not be static but will depend on some payload to work. For example you can have the following workflow : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL This usecase is simple : the fist task creates an order about a specific product, then an email is sent to the customer about its order details. Send payload Of course the tasks need some data to work (the product and the user IDs for example). This is possible in Director using the payload field : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' or $ curl -- header \"Content-Type: application/json\" \\ -- request POST \\ -- data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http : // localhost : 8000 / api / workflows Handle payload You can handle the payload in the code using the kwargs dictionnary : @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send () As you can see the payload is forwarded to all the tasks contained in your workflow. Create the schema The previous example executes the workflow without validate its payload. Director provides a way to validate it using JsonSchema . Your schema needs to be stored in a schemas folder inside your DIRECTOR_HOME (you have to create the folder if it doesn't exist yet): $ cat schemas/order.json { \"type\" : \"object\" , \"properties\" : { \"user\" : { \"type\" : \"integer\" } , \"product\" : { \"type\" : \"integer\" } } , \"required\" : [ \"user\" , \"product\" ] } Then you can reference it in your workflow using the schema keyword : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL schema : order Tip You can host your schemas into subfolders (ie $DIRECTOR_HOME/schemas/foo/bar/baz.json ) and reference it in your YAML file with : schema: foo/bar/baz . From now the execution will be blocked if the payload is not valid : $ director workflow run product.ORDER '{\"user\": \"john\", \"product\": 1000}' Error: Payload is not valid - 'john' is not of type 'integer' Aborted! The API returns a 400 Bad request error. Periodic workflows Celery Director provides a YAML syntax to periodically schedule a workflow . If your workflow needs a payload to work, you can send it default values : users.UPDATE_CACHE : tasks : - UPDATE_CACHE periodic : schedule : 3600 payload : { \"user\" : False } The corresponding task can easily handle this default value : @task ( name = \"UPDATE_CACHE\" ) def update_cache ( * args , ** kwargs ): user = kwargs [ \"payload\" ][ \"user\" ] if not user : return update_all_users () return update_user ( user ) This way the whole list of users will be updated every hours, and a manual update can be done on a specific user : $ director workflow run users.UPDATE_CACHE '{\"user\": \"john.doe\"}'","title":"Use Payload"},{"location":"guides/use-payload/#use-payload","text":"Most of the time your workflow and their tasks will not be static but will depend on some payload to work. For example you can have the following workflow : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL This usecase is simple : the fist task creates an order about a specific product, then an email is sent to the customer about its order details.","title":"Use Payload"},{"location":"guides/use-payload/#send-payload","text":"Of course the tasks need some data to work (the product and the user IDs for example). This is possible in Director using the payload field : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' or $ curl -- header \"Content-Type: application/json\" \\ -- request POST \\ -- data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http : // localhost : 8000 / api / workflows","title":"Send payload"},{"location":"guides/use-payload/#handle-payload","text":"You can handle the payload in the code using the kwargs dictionnary : @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send () As you can see the payload is forwarded to all the tasks contained in your workflow.","title":"Handle payload"},{"location":"guides/use-payload/#create-the-schema","text":"The previous example executes the workflow without validate its payload. Director provides a way to validate it using JsonSchema . Your schema needs to be stored in a schemas folder inside your DIRECTOR_HOME (you have to create the folder if it doesn't exist yet): $ cat schemas/order.json { \"type\" : \"object\" , \"properties\" : { \"user\" : { \"type\" : \"integer\" } , \"product\" : { \"type\" : \"integer\" } } , \"required\" : [ \"user\" , \"product\" ] } Then you can reference it in your workflow using the schema keyword : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL schema : order Tip You can host your schemas into subfolders (ie $DIRECTOR_HOME/schemas/foo/bar/baz.json ) and reference it in your YAML file with : schema: foo/bar/baz . From now the execution will be blocked if the payload is not valid : $ director workflow run product.ORDER '{\"user\": \"john\", \"product\": 1000}' Error: Payload is not valid - 'john' is not of type 'integer' Aborted! The API returns a 400 Bad request error.","title":"Create the schema"},{"location":"guides/use-payload/#periodic-workflows","text":"Celery Director provides a YAML syntax to periodically schedule a workflow . If your workflow needs a payload to work, you can send it default values : users.UPDATE_CACHE : tasks : - UPDATE_CACHE periodic : schedule : 3600 payload : { \"user\" : False } The corresponding task can easily handle this default value : @task ( name = \"UPDATE_CACHE\" ) def update_cache ( * args , ** kwargs ): user = kwargs [ \"payload\" ][ \"user\" ] if not user : return update_all_users () return update_user ( user ) This way the whole list of users will be updated every hours, and a manual update can be done on a specific user : $ director workflow run users.UPDATE_CACHE '{\"user\": \"john.doe\"}'","title":"Periodic workflows"},{"location":"guides/write-tasks/","text":"Write Tasks Director is a wrapper around Celery, so creating tasks with it is almost the same as creating tasks for pure Celery. Create a task In pure Celery you had to create a Celery application object ( app = Celery(...) ) and use the app.task() decorator to transform Python function into Celery tasks. This work has already be done for you in Director, so you just have to transform your function using the director.task decorator : # tasks/example.py from director import task @task ( name = \"TASK_EXAMPLE\" ) def my_task ( * args , ** kwargs ): pass Warning The name parameter in the task decorator is mandatory. Because it will be used in the YAML file to combine tasks into workflows , this name must be unique . Task signature To simplify the tasks creation, and to allow multiple workflows to reuse the same task, the signature is always the same : (*args, **kwargs) . The kwargs dictionnary can be used to handle the payload while args contains the results of the task parents (of course args is empty if your task is at the beginning of a workflow). Technical explanation In Celery the developer can decide if a task is able to receive or not the result of its parents with 2 methods : s() and si() . The i means immutability and is intended to ignore the parents results. So normally, as a developer, you have to be carefull about the method to use and you also have to create your tasks signatures consequently. But Director has been created to simplify that ! As we decided to received the results of the parents in the args parameter we always use the s() method. Here is are some concrete examples based on the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): return { \"result\" : \"a_data\" } @task ( name = \"B\" ) def b ( * args , ** kwargs ): return { \"result\" : \"b_data\" } @task ( name = \"C\" ) def c ( * args , ** kwargs ): print ( args ) The following workflows present different usecases and the output of the C task (see the Build Workflows guide to understand the YAML format) : example.NO_PARENT : tasks : - C # Result : (None,) example.ONE_PARENT : tasks : - A - B - C # Result : ({'result': 'b_data'},) example.MULTIPLE_PARENT : tasks : - GROUP_1 : type : group tasks : - A - B - C # Result : ([{'result': 'a_data'}, {'result': 'b_data'}],) Bound Tasks Celery allows use to bind a task , providing the task instance itself as the first parameter. In this case the signature will must contain a first parameter just before args and kwargs : # tasks/example.py from director import task @task ( bind = True , name = \"BOUND_TASK\" ) def bound_task ( self , * args , ** kwargs ): print ( self . name ) Celery Task Options The task() decorator provided by Director is just a wrapper of the native app.task() decorator provided by Celery, so all the original options are still available. You can for example apply a rate_limit or even configure the max number of retries.","title":"Write Tasks"},{"location":"guides/write-tasks/#write-tasks","text":"Director is a wrapper around Celery, so creating tasks with it is almost the same as creating tasks for pure Celery.","title":"Write Tasks"},{"location":"guides/write-tasks/#create-a-task","text":"In pure Celery you had to create a Celery application object ( app = Celery(...) ) and use the app.task() decorator to transform Python function into Celery tasks. This work has already be done for you in Director, so you just have to transform your function using the director.task decorator : # tasks/example.py from director import task @task ( name = \"TASK_EXAMPLE\" ) def my_task ( * args , ** kwargs ): pass Warning The name parameter in the task decorator is mandatory. Because it will be used in the YAML file to combine tasks into workflows , this name must be unique .","title":"Create a task"},{"location":"guides/write-tasks/#task-signature","text":"To simplify the tasks creation, and to allow multiple workflows to reuse the same task, the signature is always the same : (*args, **kwargs) . The kwargs dictionnary can be used to handle the payload while args contains the results of the task parents (of course args is empty if your task is at the beginning of a workflow). Technical explanation In Celery the developer can decide if a task is able to receive or not the result of its parents with 2 methods : s() and si() . The i means immutability and is intended to ignore the parents results. So normally, as a developer, you have to be carefull about the method to use and you also have to create your tasks signatures consequently. But Director has been created to simplify that ! As we decided to received the results of the parents in the args parameter we always use the s() method. Here is are some concrete examples based on the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): return { \"result\" : \"a_data\" } @task ( name = \"B\" ) def b ( * args , ** kwargs ): return { \"result\" : \"b_data\" } @task ( name = \"C\" ) def c ( * args , ** kwargs ): print ( args ) The following workflows present different usecases and the output of the C task (see the Build Workflows guide to understand the YAML format) : example.NO_PARENT : tasks : - C # Result : (None,) example.ONE_PARENT : tasks : - A - B - C # Result : ({'result': 'b_data'},) example.MULTIPLE_PARENT : tasks : - GROUP_1 : type : group tasks : - A - B - C # Result : ([{'result': 'a_data'}, {'result': 'b_data'}],)","title":"Task signature"},{"location":"guides/write-tasks/#bound-tasks","text":"Celery allows use to bind a task , providing the task instance itself as the first parameter. In this case the signature will must contain a first parameter just before args and kwargs : # tasks/example.py from director import task @task ( bind = True , name = \"BOUND_TASK\" ) def bound_task ( self , * args , ** kwargs ): print ( self . name )","title":"Bound Tasks"},{"location":"guides/write-tasks/#celery-task-options","text":"The task() decorator provided by Director is just a wrapper of the native app.task() decorator provided by Celery, so all the original options are still available. You can for example apply a rate_limit or even configure the max number of retries.","title":"Celery Task Options"}]}
\ No newline at end of file
+{"config":{"lang":["en"],"prebuild_index":false,"separator":"[\\s\\-]+"},"docs":[{"location":"","text":"Celery Director Documentation Director is a simple and rapid framework used to manage tasks and build workflows using Celery. Features The objective is to make Celery easier to use by providing : a WebUI to track the tasks states, an API and a CLI to manage and execute the workflows, a YAML syntax used to combine tasks into workflows, the ability to periodically launch a whole workflow, and many others. Info Director is built on top of the excellent Celery library . All the orchestration engine has not been changed : we didn't want to reinvent the wheel but provide an easy tool to use Celery. It means that all your existing tasks can easily be migrated to Director. Furthermore the documentation of the tasks and all the features powered by Celery like the rate limiting , the task exception retrying or even the queue routing stay the same. Installation Install the latest version of Director with pip (requires Python 3.6 at least): pip install celery-director Usage Write your code in Python # tasks/orders.py from director import task from .utils import Order , Mail @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send () Build your workflows in YAML # workflows.yml product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL Run it You can simply test your workflow in local : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' And run it in production using the director API : $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http://localhost:8000/api/workflows Project layout .env # The configuration file. workflows.yml # The workflows definition. tasks / example.py # A file containing some tasks. ... # Other files containing other tasks. Commands director init [path] - Create a new project. director celery [worker|beat|flower] - Start Celery daemons. director webserver - Start the webserver. director workflow [list|show|run] - Manage your project workflows.","title":"Introduction"},{"location":"#celery-director-documentation","text":"Director is a simple and rapid framework used to manage tasks and build workflows using Celery.","title":"Celery Director Documentation"},{"location":"#features","text":"The objective is to make Celery easier to use by providing : a WebUI to track the tasks states, an API and a CLI to manage and execute the workflows, a YAML syntax used to combine tasks into workflows, the ability to periodically launch a whole workflow, and many others. Info Director is built on top of the excellent Celery library . All the orchestration engine has not been changed : we didn't want to reinvent the wheel but provide an easy tool to use Celery. It means that all your existing tasks can easily be migrated to Director. Furthermore the documentation of the tasks and all the features powered by Celery like the rate limiting , the task exception retrying or even the queue routing stay the same.","title":"Features"},{"location":"#installation","text":"Install the latest version of Director with pip (requires Python 3.6 at least): pip install celery-director","title":"Installation"},{"location":"#usage","text":"","title":"Usage"},{"location":"#write-your-code-in-python","text":"# tasks/orders.py from director import task from .utils import Order , Mail @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send ()","title":"Write your code in Python"},{"location":"#build-your-workflows-in-yaml","text":"# workflows.yml product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL","title":"Build your workflows in YAML"},{"location":"#run-it","text":"You can simply test your workflow in local : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' And run it in production using the director API : $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http://localhost:8000/api/workflows","title":"Run it"},{"location":"#project-layout","text":".env # The configuration file. workflows.yml # The workflows definition. tasks / example.py # A file containing some tasks. ... # Other files containing other tasks.","title":"Project layout"},{"location":"#commands","text":"director init [path] - Create a new project. director celery [worker|beat|flower] - Start Celery daemons. director webserver - Start the webserver. director workflow [list|show|run] - Manage your project workflows.","title":"Commands"},{"location":"api/","text":"API Documentation GET /api/workflows List the workflows instances. Example request: GET /api/workflows HTTP / 1.1 Host : example.com Accept : application/json Parameters: per_page (optional, default: 1000): the number of workflows to return page (optional, default: 1): the page to start Example response: HTTP / 1 . 1 200 OK [ { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : { \"foo\" : \"bar\" } , \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T13:56:51\" } ] GET /api/workflows/ Get the details of a specific workflow instance, including its tasks. Example request: GET /api/workflows/29e7ef80-fa1b-4b91-8ccb-ef01a91601db HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1 . 1 200 OK { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : {} , \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"tasks\" : [ { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"key\" : \"EXTRACT\" , \"previous\" : [], \"status\" : \"pending\" , \"task\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"updated\" : \"2020-02-06T13:56:51 }, { \" created \": \" 2020 - 02 - 06 T13 : 56 : 51 \", \" id \": \" 35 a2d47b - 8105 - 4 d03 - becb - 7 eb48f8c062e \", \" key \": \" TRANSFORM \", \" previous \": [ \" c8606f67 - 9923 - 4 c84 - bc41 - 69 efacb0c7cb \" ], \" status \": \" pending \", \" task \": \" 35 a2d47b - 8105 - 4 d03 - becb - 7 eb48f8c062e \", \" updated \": \" 2020 - 02 - 06 T13 : 56 : 51 \" }, { \" created \": \" 2020 - 02 - 06 T13 : 56 : 51 \", \" id \": \" e5a8eb49 - 0 a8c - 4063 - ad08 - a5e9e7bd49d2 \", \" key \": \" LOAD \", \" previous \": [ \" 35 a2d47b - 8105 - 4 d03 - becb - 7 eb48f8c062e \" ], \" status \": \" pending \", \" task \": \" e5a8eb49 - 0 a8c - 4063 - ad08 - a5e9e7bd49d2 \", \" updated \": \" 2020 - 02 - 06 T13 : 56 : 51 \" } ], \" updated \": \" 2020 - 02 - 06 T13 : 56 : 51 \" } POST /api/workflows Execute a new workflow. Example request: POST /api/workflows HTTP / 1.1 Host : example.com Accept : application/json { \"project\": \"example\", \"name\": \"ETL\", \"paylod\": {} } Example response: HTTP / 1 . 1 201 CREATED { \"created\" : \"2020-02-06T14:01:02\" , \"fullname\" : \"example.ETL\" , \"id\" : \"43e70707-b661-42e1-a7df-5b98851ae340\" , \"name\" : \"ETL\" , \"payload\" : {} , \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T14:01:02\" } GET /api/ping Health endpoint used to monitor Director API. Example request: GET /api/ping HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1 . 1 200 OK { \"message\" : \"pong\" }","title":"API"},{"location":"api/#api-documentation","text":"","title":"API Documentation"},{"location":"api/#get-apiworkflows","text":"List the workflows instances. Example request: GET /api/workflows HTTP / 1.1 Host : example.com Accept : application/json Parameters: per_page (optional, default: 1000): the number of workflows to return page (optional, default: 1): the page to start Example response: HTTP / 1 . 1 200 OK [ { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : { \"foo\" : \"bar\" } , \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T13:56:51\" } ]","title":"GET /api/workflows"},{"location":"api/#get-apiworkflowsid","text":"Get the details of a specific workflow instance, including its tasks. Example request: GET /api/workflows/29e7ef80-fa1b-4b91-8ccb-ef01a91601db HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1 . 1 200 OK { \"created\" : \"2020-02-06T13:56:51\" , \"fullname\" : \"example.ETL\" , \"id\" : \"29e7ef80-fa1b-4b91-8ccb-ef01a91601db\" , \"name\" : \"ETL\" , \"payload\" : {} , \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"tasks\" : [ { \"created\" : \"2020-02-06T13:56:51\" , \"id\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"key\" : \"EXTRACT\" , \"previous\" : [], \"status\" : \"pending\" , \"task\" : \"c8606f67-9923-4c84-bc41-69efacb0c7cb\" , \"updated\" : \"2020-02-06T13:56:51 }, { \" created \": \" 2020 - 02 - 06 T13 : 56 : 51 \", \" id \": \" 35 a2d47b - 8105 - 4 d03 - becb - 7 eb48f8c062e \", \" key \": \" TRANSFORM \", \" previous \": [ \" c8606f67 - 9923 - 4 c84 - bc41 - 69 efacb0c7cb \" ], \" status \": \" pending \", \" task \": \" 35 a2d47b - 8105 - 4 d03 - becb - 7 eb48f8c062e \", \" updated \": \" 2020 - 02 - 06 T13 : 56 : 51 \" }, { \" created \": \" 2020 - 02 - 06 T13 : 56 : 51 \", \" id \": \" e5a8eb49 - 0 a8c - 4063 - ad08 - a5e9e7bd49d2 \", \" key \": \" LOAD \", \" previous \": [ \" 35 a2d47b - 8105 - 4 d03 - becb - 7 eb48f8c062e \" ], \" status \": \" pending \", \" task \": \" e5a8eb49 - 0 a8c - 4063 - ad08 - a5e9e7bd49d2 \", \" updated \": \" 2020 - 02 - 06 T13 : 56 : 51 \" } ], \" updated \": \" 2020 - 02 - 06 T13 : 56 : 51 \" }","title":"GET /api/workflows/<id>"},{"location":"api/#post-apiworkflows","text":"Execute a new workflow. Example request: POST /api/workflows HTTP / 1.1 Host : example.com Accept : application/json { \"project\": \"example\", \"name\": \"ETL\", \"paylod\": {} } Example response: HTTP / 1 . 1 201 CREATED { \"created\" : \"2020-02-06T14:01:02\" , \"fullname\" : \"example.ETL\" , \"id\" : \"43e70707-b661-42e1-a7df-5b98851ae340\" , \"name\" : \"ETL\" , \"payload\" : {} , \"periodic\" : false , \"project\" : \"example\" , \"status\" : \"pending\" , \"updated\" : \"2020-02-06T14:01:02\" }","title":"POST /api/workflows"},{"location":"api/#get-apiping","text":"Health endpoint used to monitor Director API. Example request: GET /api/ping HTTP / 1.1 Host : example.com Accept : application/json Example response: HTTP / 1 . 1 200 OK { \"message\" : \"pong\" }","title":"GET /api/ping"},{"location":"quickstart/","text":"Quickstart Generate the project One of the Director goals is to facilitate the usage of Celery. For that we wanted to simplify the initialization of a Celery project by removing all the long an boring boilerplate steps (application creation, broker configuration...). So the first thing to do is to generate a project using the director init command : $ director init workflows [*] Project created in /home/director/workflows [*] Do not forget to initialize the database You can now export the DIRECTOR_HOME environment variable Info Because you can have several projects in your machine, Director needs to know what project you want to target when using the director command. So don't forget to set your DIRECTOR_HOME environment variable : $ export DIRECTOR_HOME=\"/home/director/workflows\" Director created the following structure for you containing a simple example : .env # The configuration file. workflows.yml # The workflows definition. tasks / etl.py # A file containing some tasks. ... # Other files containing other tasks. The Python files in the tasks folder will contain your Celery tasks : from director import task @task ( name = \"EXTRACT\" ) def extract ( * args , ** kwargs ): print ( \"Extracting data\" ) While the workflows.yml file will be used to combine them into workflows : example.ETL : tasks : - EXTRACT - TRANSFORM - LOAD Configure the database Director needs a connection to a relational database to store the dependencies between your tasks. Adjust the DIRECTOR_DATABASE_URI variable in the .env file and generate the database : Possible values for the DIRECTOR_DATABASE_URI variable: DIRECTOR_DATABASE_URI = \"postgresql://user:password@hostname:port/database\" # Using PostgreSQL DIRECTOR_DATABASE_URI = \"mysql+mysqlconnector://user:password@hostname:port/database\" # Using MySQL DIRECTOR_DATABASE_URI = \"sqlite:////path/to/your/database.db\" # Using SQLite Generate the database: $ director db upgrade INFO [ alembic.runtime.migration ] Context impl SQLiteImpl. INFO [ alembic.runtime.migration ] Will assume non-transactional DDL. INFO [ alembic.runtime.migration ] Running upgrade -> 70631f8bcff3, Init database Run the workflow The workflow command can be used to manage your workflows. List $ director workflow list +-----------------+----------+----------------------+ | Workflows ( 2 ) | Periodic | Tasks | +-----------------+----------+----------------------+ | example.ETL | -- | EXTRACT | | | | TRANSFORM | | | | LOAD | +-----------------+----------+----------------------+ | example.RANDOMS | -- | Group GROUP_RANDOMS: | | | | \u2514 RANDOM | | | | \u2514 RANDOM | | | | ADD | +-----------------+----------+----------------------+ Run $ director workflow run example.ETL Execute the tasks A Celery worker instance needs to be started to consume the broker and execute the tasks : $ director celery worker --loglevel = INFO Display the result You can finally start the webserver to track the tasks evolution : $ director webserver The WebUI is available by default on http://127.0.0.1:8000 : Change default parameters The webserver command forwards the arguments to gunicorn. For instance the binding can be easily changed : director webserver -b 0.0.0.0:5000 .","title":"Quickstart"},{"location":"quickstart/#quickstart","text":"","title":"Quickstart"},{"location":"quickstart/#generate-the-project","text":"One of the Director goals is to facilitate the usage of Celery. For that we wanted to simplify the initialization of a Celery project by removing all the long an boring boilerplate steps (application creation, broker configuration...). So the first thing to do is to generate a project using the director init command : $ director init workflows [*] Project created in /home/director/workflows [*] Do not forget to initialize the database You can now export the DIRECTOR_HOME environment variable Info Because you can have several projects in your machine, Director needs to know what project you want to target when using the director command. So don't forget to set your DIRECTOR_HOME environment variable : $ export DIRECTOR_HOME=\"/home/director/workflows\" Director created the following structure for you containing a simple example : .env # The configuration file. workflows.yml # The workflows definition. tasks / etl.py # A file containing some tasks. ... # Other files containing other tasks. The Python files in the tasks folder will contain your Celery tasks : from director import task @task ( name = \"EXTRACT\" ) def extract ( * args , ** kwargs ): print ( \"Extracting data\" ) While the workflows.yml file will be used to combine them into workflows : example.ETL : tasks : - EXTRACT - TRANSFORM - LOAD","title":"Generate the project"},{"location":"quickstart/#configure-the-database","text":"Director needs a connection to a relational database to store the dependencies between your tasks. Adjust the DIRECTOR_DATABASE_URI variable in the .env file and generate the database : Possible values for the DIRECTOR_DATABASE_URI variable: DIRECTOR_DATABASE_URI = \"postgresql://user:password@hostname:port/database\" # Using PostgreSQL DIRECTOR_DATABASE_URI = \"mysql+mysqlconnector://user:password@hostname:port/database\" # Using MySQL DIRECTOR_DATABASE_URI = \"sqlite:////path/to/your/database.db\" # Using SQLite Generate the database: $ director db upgrade INFO [ alembic.runtime.migration ] Context impl SQLiteImpl. INFO [ alembic.runtime.migration ] Will assume non-transactional DDL. INFO [ alembic.runtime.migration ] Running upgrade -> 70631f8bcff3, Init database","title":"Configure the database"},{"location":"quickstart/#run-the-workflow","text":"The workflow command can be used to manage your workflows. List $ director workflow list +-----------------+----------+----------------------+ | Workflows ( 2 ) | Periodic | Tasks | +-----------------+----------+----------------------+ | example.ETL | -- | EXTRACT | | | | TRANSFORM | | | | LOAD | +-----------------+----------+----------------------+ | example.RANDOMS | -- | Group GROUP_RANDOMS: | | | | \u2514 RANDOM | | | | \u2514 RANDOM | | | | ADD | +-----------------+----------+----------------------+ Run $ director workflow run example.ETL","title":"Run the workflow"},{"location":"quickstart/#execute-the-tasks","text":"A Celery worker instance needs to be started to consume the broker and execute the tasks : $ director celery worker --loglevel = INFO","title":"Execute the tasks"},{"location":"quickstart/#display-the-result","text":"You can finally start the webserver to track the tasks evolution : $ director webserver The WebUI is available by default on http://127.0.0.1:8000 : Change default parameters The webserver command forwards the arguments to gunicorn. For instance the binding can be easily changed : director webserver -b 0.0.0.0:5000 .","title":"Display the result"},{"location":"guides/build-workflows/","text":"Build Workflows Director separates the tasks logic from the workflows definition by providing a simple YAML syntax. Let's imagine the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): pass @task ( name = \"B\" ) def b ( * args , ** kwargs ): pass @task ( name = \"C\" ) def c ( * args , ** kwargs ): pass Chaining multiple tasks Chaining these tasks in the workflows.yml file is pretty simple : # Chain example # # +-------+ +-------+ +-------+ # | A +----->+ B +----->+ C | # +-------+ +-------+ +-------+ # example.CHAIN : tasks : - A - B - C In this example each task will be executed one after the other : first the task A will be executed, then the task B and finally the task C. Launch tasks in parallel Sometimes you need to execute some tasks in parallel to improve your workflow performance. The type: group keywords can be used to handle this canvas : # Group example # +-------+ # +-->+ B | # +-------+ | +-------+ # + A +--+ # +-------+ | +-------+ # +-->+ C | # +-------+ example.GROUP : tasks : - A - GROUP_1 : type : group tasks : - B - C In this example the group is named GROUP_1 but it can be anything. The important is to keep unique names in case of multiple groups in your workflow. Periodic workflows Celery provides a scheduler used to periodically execute some tasks. This scheduler is named the Celery beat . Director allows you to periodically schedule a whole workflow using a simple YAML syntax. First example: example.CHAIN : tasks : - A - B - C periodic : schedule : 60 Second example: example.CHAIN_CRONTAB : tasks : - A - B - C periodic : schedule : \"* */3 * * *\" The periodic > schedule key takes an integer (unity is the second) or a string argument ( crontab syntax). So in the first example, the example.CHAIN worflow will be executed every 60 seconds and the second one, example.CHAIN_CRONTAB , every three hours . Please note that the scheduler must be started to handle periodic workflows : $ director celery beat Tip Celery also accepts the -B option when launching a worker : $ director celery worker --loglevel=INFO -B This way you can start your worker and scheduler instances using a single command. Please note this option is only to use during your development, otherwise use the celery beat command. Use of queues in Workflows With director, you can set queues for workflows. All workflow's tasks will use the same queue: example.ETL : tasks : - A - B - C queue : q1 You need the start Celery worker instance with the --queues option: $ director celery worker --loglevel = INFO --queues = q1","title":"Build Workflows"},{"location":"guides/build-workflows/#build-workflows","text":"Director separates the tasks logic from the workflows definition by providing a simple YAML syntax. Let's imagine the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): pass @task ( name = \"B\" ) def b ( * args , ** kwargs ): pass @task ( name = \"C\" ) def c ( * args , ** kwargs ): pass","title":"Build Workflows"},{"location":"guides/build-workflows/#chaining-multiple-tasks","text":"Chaining these tasks in the workflows.yml file is pretty simple : # Chain example # # +-------+ +-------+ +-------+ # | A +----->+ B +----->+ C | # +-------+ +-------+ +-------+ # example.CHAIN : tasks : - A - B - C In this example each task will be executed one after the other : first the task A will be executed, then the task B and finally the task C.","title":"Chaining multiple tasks"},{"location":"guides/build-workflows/#launch-tasks-in-parallel","text":"Sometimes you need to execute some tasks in parallel to improve your workflow performance. The type: group keywords can be used to handle this canvas : # Group example # +-------+ # +-->+ B | # +-------+ | +-------+ # + A +--+ # +-------+ | +-------+ # +-->+ C | # +-------+ example.GROUP : tasks : - A - GROUP_1 : type : group tasks : - B - C In this example the group is named GROUP_1 but it can be anything. The important is to keep unique names in case of multiple groups in your workflow.","title":"Launch tasks in parallel"},{"location":"guides/build-workflows/#periodic-workflows","text":"Celery provides a scheduler used to periodically execute some tasks. This scheduler is named the Celery beat . Director allows you to periodically schedule a whole workflow using a simple YAML syntax. First example: example.CHAIN : tasks : - A - B - C periodic : schedule : 60 Second example: example.CHAIN_CRONTAB : tasks : - A - B - C periodic : schedule : \"* */3 * * *\" The periodic > schedule key takes an integer (unity is the second) or a string argument ( crontab syntax). So in the first example, the example.CHAIN worflow will be executed every 60 seconds and the second one, example.CHAIN_CRONTAB , every three hours . Please note that the scheduler must be started to handle periodic workflows : $ director celery beat Tip Celery also accepts the -B option when launching a worker : $ director celery worker --loglevel=INFO -B This way you can start your worker and scheduler instances using a single command. Please note this option is only to use during your development, otherwise use the celery beat command.","title":"Periodic workflows"},{"location":"guides/build-workflows/#use-of-queues-in-workflows","text":"With director, you can set queues for workflows. All workflow's tasks will use the same queue: example.ETL : tasks : - A - B - C queue : q1 You need the start Celery worker instance with the --queues option: $ director celery worker --loglevel = INFO --queues = q1","title":"Use of queues in Workflows"},{"location":"guides/enable-authentication/","text":"Enable authentication Director provide basic authentication. To enable it, you have to create users and set DIRECTOR_AUTH_ENABLED variable to true in the .env file. Manage user You can manage users using the CLI. $ director user [ create | list | update | delete ] Create user example: $ director user create john","title":"Enable Authentication"},{"location":"guides/enable-authentication/#enable-authentication","text":"Director provide basic authentication. To enable it, you have to create users and set DIRECTOR_AUTH_ENABLED variable to true in the .env file.","title":"Enable authentication"},{"location":"guides/enable-authentication/#manage-user","text":"You can manage users using the CLI. $ director user [ create | list | update | delete ] Create user example: $ director user create john","title":"Manage user"},{"location":"guides/error-tracking/","text":"Error Tracking Director can send errors to Sentry . You can enable this feature by adding your SENTRY_DSN value in the DIRECTOR_SENTRY_DSN variable of the .env file : DIRECTOR_SENTRY_DSN = \"https://xyz@sentry.example.com/0\" Let's imagine the following workflow : # workflows.yml --- demo.SENTRY_ALERT : tasks : - WORKING_TASK - ERROR_TASK With the associated tasks : # tasks/example.py from director import task @task ( name = \"WORKING_TASK\" ) def working_task ( * args , ** kwargs ): return { \"hello\" : \"world\" } @task ( name = \"ERROR_TASK\" ) def error_task ( * args , ** kwargs ): print ( 1 / 0 ) When a Celery worker will execute this code, an issue will be created in Sentry with the ZeroDivisionError : In order to group the issues by workflow's name or by project, Director associated some tags to the event : Each event also contains additional data to better dig into the problem :","title":"Error Tracking"},{"location":"guides/error-tracking/#error-tracking","text":"Director can send errors to Sentry . You can enable this feature by adding your SENTRY_DSN value in the DIRECTOR_SENTRY_DSN variable of the .env file : DIRECTOR_SENTRY_DSN = \"https://xyz@sentry.example.com/0\" Let's imagine the following workflow : # workflows.yml --- demo.SENTRY_ALERT : tasks : - WORKING_TASK - ERROR_TASK With the associated tasks : # tasks/example.py from director import task @task ( name = \"WORKING_TASK\" ) def working_task ( * args , ** kwargs ): return { \"hello\" : \"world\" } @task ( name = \"ERROR_TASK\" ) def error_task ( * args , ** kwargs ): print ( 1 / 0 ) When a Celery worker will execute this code, an issue will be created in Sentry with the ZeroDivisionError : In order to group the issues by workflow's name or by project, Director associated some tags to the event : Each event also contains additional data to better dig into the problem :","title":"Error Tracking"},{"location":"guides/run-workflows/","text":"Run Workflows The next step after building your workflows is of course to execute them, and Director provides several methods for that. Using the CLI This method can be useful if you want to test your tasks and workflows when you are developing them : $ director workflow run ovh.MY_WORKFLOW Using the API You can run a workflow using a POST request on the Director API. This is very convenient if your applications are based on webservices. The request is a POST on the /api/workflows endpoint : $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\":\"ovh\", \"name\": \"MY_WORKFLOW\", \"payload\": {}}' \\ http://localhost:8000/api/workflows Technical explanation To really understand this feature it's important to know how native Celery works. Concretely Celery is constantly exchanging messages through a broker. Producers are sending tasks in it and workers, consuming the queue, are really executing the Python code. To make it work both side must be able to import the Python code (it means the producers and the consumer must be able to do from tasks import my_task ). Sometimes it's difficult to allow that in distributed environment. Director solves this problem by providing an endpoint used to execute a workflow and its tasks, so there's no more need to have the modules available in the producer's PYTHON_PATH. Using the periodic scheduling A workflow can also be execute periodically without any manual action from the user. Director provides a simple YAML syntax for that.","title":"Run Workflows"},{"location":"guides/run-workflows/#run-workflows","text":"The next step after building your workflows is of course to execute them, and Director provides several methods for that.","title":"Run Workflows"},{"location":"guides/run-workflows/#using-the-cli","text":"This method can be useful if you want to test your tasks and workflows when you are developing them : $ director workflow run ovh.MY_WORKFLOW","title":"Using the CLI"},{"location":"guides/run-workflows/#using-the-api","text":"You can run a workflow using a POST request on the Director API. This is very convenient if your applications are based on webservices. The request is a POST on the /api/workflows endpoint : $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\":\"ovh\", \"name\": \"MY_WORKFLOW\", \"payload\": {}}' \\ http://localhost:8000/api/workflows Technical explanation To really understand this feature it's important to know how native Celery works. Concretely Celery is constantly exchanging messages through a broker. Producers are sending tasks in it and workers, consuming the queue, are really executing the Python code. To make it work both side must be able to import the Python code (it means the producers and the consumer must be able to do from tasks import my_task ). Sometimes it's difficult to allow that in distributed environment. Director solves this problem by providing an endpoint used to execute a workflow and its tasks, so there's no more need to have the modules available in the producer's PYTHON_PATH.","title":"Using the API"},{"location":"guides/run-workflows/#using-the-periodic-scheduling","text":"A workflow can also be execute periodically without any manual action from the user. Director provides a simple YAML syntax for that.","title":"Using the periodic scheduling"},{"location":"guides/use-payload/","text":"Use Payload Most of the time your workflow and their tasks will not be static but will depend on some payload to work. For example you can have the following workflow : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL This usecase is simple : the fist task creates an order about a specific product, then an email is sent to the customer about its order details. Send payload Of course the tasks need some data to work (the product and the user IDs for example). This is possible in Director using the payload field : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' or $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http://localhost:8000/api/workflows Handle payload You can handle the payload in the code using the kwargs dictionnary : @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send () As you can see the payload is forwarded to all the tasks contained in your workflow. Create the schema The previous example executes the workflow without validate its payload. Director provides a way to validate it using JsonSchema . Your schema needs to be stored in a schemas folder inside your DIRECTOR_HOME (you have to create the folder if it doesn't exist yet): $ cat schemas/order.json { \"type\" : \"object\" , \"properties\" : { \"user\" : { \"type\" : \"integer\" } , \"product\" : { \"type\" : \"integer\" } } , \"required\" : [ \"user\" , \"product\" ] } Then you can reference it in your workflow using the schema keyword : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL schema : order Tip You can host your schemas into subfolders (ie $DIRECTOR_HOME/schemas/foo/bar/baz.json ) and reference it in your YAML file with : schema: foo/bar/baz . From now the execution will be blocked if the payload is not valid : $ director workflow run product.ORDER '{\"user\": \"john\", \"product\": 1000}' Error: Payload is not valid - 'john' is not of type 'integer' Aborted! The API returns a 400 Bad request error. Periodic workflows Celery Director provides a YAML syntax to periodically schedule a workflow . If your workflow needs a payload to work, you can send it default values : users.UPDATE_CACHE : tasks : - UPDATE_CACHE periodic : schedule : 3600 payload : { \"user\" : False } The corresponding task can easily handle this default value : @task ( name = \"UPDATE_CACHE\" ) def update_cache ( * args , ** kwargs ): user = kwargs [ \"payload\" ][ \"user\" ] if not user : return update_all_users () return update_user ( user ) This way the whole list of users will be updated every hours, and a manual update can be done on a specific user : $ director workflow run users.UPDATE_CACHE '{\"user\": \"john.doe\"}'","title":"Use Payload"},{"location":"guides/use-payload/#use-payload","text":"Most of the time your workflow and their tasks will not be static but will depend on some payload to work. For example you can have the following workflow : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL This usecase is simple : the fist task creates an order about a specific product, then an email is sent to the customer about its order details.","title":"Use Payload"},{"location":"guides/use-payload/#send-payload","text":"Of course the tasks need some data to work (the product and the user IDs for example). This is possible in Director using the payload field : $ director workflow run product.ORDER '{\"user\": 1234, \"product\": 1000}' or $ curl --header \"Content-Type: application/json\" \\ --request POST \\ --data '{\"project\": \"product\", \"name\": \"ORDER\", \"payload\": {\"user\": 1234, \"product\": 1000}}' \\ http://localhost:8000/api/workflows","title":"Send payload"},{"location":"guides/use-payload/#handle-payload","text":"You can handle the payload in the code using the kwargs dictionnary : @task ( name = \"ORDER_PRODUCT\" ) def order_product ( * args , ** kwargs ): order = Order ( user = kwargs [ \"payload\" ][ \"user\" ], product = kwargs [ \"payload\" ][ \"product\" ] ) . save () return { \"id\" : order . id } @task ( name = \"SEND_MAIL\" ) def send_mail ( * args , ** kwargs ): order_id = args [ 0 ][ \"id\" ] mail = Mail ( title = f \"Your order # { order_id } has been received\" , user = kwargs [ \"payload\" ][ \"user\" ] ) mail . send () As you can see the payload is forwarded to all the tasks contained in your workflow.","title":"Handle payload"},{"location":"guides/use-payload/#create-the-schema","text":"The previous example executes the workflow without validate its payload. Director provides a way to validate it using JsonSchema . Your schema needs to be stored in a schemas folder inside your DIRECTOR_HOME (you have to create the folder if it doesn't exist yet): $ cat schemas/order.json { \"type\" : \"object\" , \"properties\" : { \"user\" : { \"type\" : \"integer\" } , \"product\" : { \"type\" : \"integer\" } } , \"required\" : [ \"user\" , \"product\" ] } Then you can reference it in your workflow using the schema keyword : product.ORDER : tasks : - ORDER_PRODUCT - SEND_MAIL schema : order Tip You can host your schemas into subfolders (ie $DIRECTOR_HOME/schemas/foo/bar/baz.json ) and reference it in your YAML file with : schema: foo/bar/baz . From now the execution will be blocked if the payload is not valid : $ director workflow run product.ORDER '{\"user\": \"john\", \"product\": 1000}' Error: Payload is not valid - 'john' is not of type 'integer' Aborted! The API returns a 400 Bad request error.","title":"Create the schema"},{"location":"guides/use-payload/#periodic-workflows","text":"Celery Director provides a YAML syntax to periodically schedule a workflow . If your workflow needs a payload to work, you can send it default values : users.UPDATE_CACHE : tasks : - UPDATE_CACHE periodic : schedule : 3600 payload : { \"user\" : False } The corresponding task can easily handle this default value : @task ( name = \"UPDATE_CACHE\" ) def update_cache ( * args , ** kwargs ): user = kwargs [ \"payload\" ][ \"user\" ] if not user : return update_all_users () return update_user ( user ) This way the whole list of users will be updated every hours, and a manual update can be done on a specific user : $ director workflow run users.UPDATE_CACHE '{\"user\": \"john.doe\"}'","title":"Periodic workflows"},{"location":"guides/write-tasks/","text":"Write Tasks Director is a wrapper around Celery, so creating tasks with it is almost the same as creating tasks for pure Celery. Create a task In pure Celery you had to create a Celery application object ( app = Celery(...) ) and use the app.task() decorator to transform Python function into Celery tasks. This work has already be done for you in Director, so you just have to transform your function using the director.task decorator : # tasks/example.py from director import task @task ( name = \"TASK_EXAMPLE\" ) def my_task ( * args , ** kwargs ): pass Warning The name parameter in the task decorator is mandatory. Because it will be used in the YAML file to combine tasks into workflows , this name must be unique . Task signature To simplify the tasks creation, and to allow multiple workflows to reuse the same task, the signature is always the same : (*args, **kwargs) . The kwargs dictionnary can be used to handle the payload while args contains the results of the task parents (of course args is empty if your task is at the beginning of a workflow). Technical explanation In Celery the developer can decide if a task is able to receive or not the result of its parents with 2 methods : s() and si() . The i means immutability and is intended to ignore the parents results. So normally, as a developer, you have to be carefull about the method to use and you also have to create your tasks signatures consequently. But Director has been created to simplify that ! As we decided to received the results of the parents in the args parameter we always use the s() method. Here is are some concrete examples based on the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): return { \"result\" : \"a_data\" } @task ( name = \"B\" ) def b ( * args , ** kwargs ): return { \"result\" : \"b_data\" } @task ( name = \"C\" ) def c ( * args , ** kwargs ): print ( args ) The following workflows present different usecases and the output of the C task (see the Build Workflows guide to understand the YAML format) : example.NO_PARENT : tasks : - C # Result : (None,) example.ONE_PARENT : tasks : - A - B - C # Result : ({'result': 'b_data'},) example.MULTIPLE_PARENT : tasks : - GROUP_1 : type : group tasks : - A - B - C # Result : ([{'result': 'a_data'}, {'result': 'b_data'}],) Bound Tasks Celery allows use to bind a task , providing the task instance itself as the first parameter. In this case the signature will must contain a first parameter just before args and kwargs : # tasks/example.py from director import task @task ( bind = True , name = \"BOUND_TASK\" ) def bound_task ( self , * args , ** kwargs ): print ( self . name ) Celery Task Options The task() decorator provided by Director is just a wrapper of the native app.task() decorator provided by Celery, so all the original options are still available. You can for example apply a rate_limit or even configure the max number of retries.","title":"Write Tasks"},{"location":"guides/write-tasks/#write-tasks","text":"Director is a wrapper around Celery, so creating tasks with it is almost the same as creating tasks for pure Celery.","title":"Write Tasks"},{"location":"guides/write-tasks/#create-a-task","text":"In pure Celery you had to create a Celery application object ( app = Celery(...) ) and use the app.task() decorator to transform Python function into Celery tasks. This work has already be done for you in Director, so you just have to transform your function using the director.task decorator : # tasks/example.py from director import task @task ( name = \"TASK_EXAMPLE\" ) def my_task ( * args , ** kwargs ): pass Warning The name parameter in the task decorator is mandatory. Because it will be used in the YAML file to combine tasks into workflows , this name must be unique .","title":"Create a task"},{"location":"guides/write-tasks/#task-signature","text":"To simplify the tasks creation, and to allow multiple workflows to reuse the same task, the signature is always the same : (*args, **kwargs) . The kwargs dictionnary can be used to handle the payload while args contains the results of the task parents (of course args is empty if your task is at the beginning of a workflow). Technical explanation In Celery the developer can decide if a task is able to receive or not the result of its parents with 2 methods : s() and si() . The i means immutability and is intended to ignore the parents results. So normally, as a developer, you have to be carefull about the method to use and you also have to create your tasks signatures consequently. But Director has been created to simplify that ! As we decided to received the results of the parents in the args parameter we always use the s() method. Here is are some concrete examples based on the following tasks : # tasks/example.py from director import task @task ( name = \"A\" ) def a ( * args , ** kwargs ): return { \"result\" : \"a_data\" } @task ( name = \"B\" ) def b ( * args , ** kwargs ): return { \"result\" : \"b_data\" } @task ( name = \"C\" ) def c ( * args , ** kwargs ): print ( args ) The following workflows present different usecases and the output of the C task (see the Build Workflows guide to understand the YAML format) : example.NO_PARENT : tasks : - C # Result : (None,) example.ONE_PARENT : tasks : - A - B - C # Result : ({'result': 'b_data'},) example.MULTIPLE_PARENT : tasks : - GROUP_1 : type : group tasks : - A - B - C # Result : ([{'result': 'a_data'}, {'result': 'b_data'}],)","title":"Task signature"},{"location":"guides/write-tasks/#bound-tasks","text":"Celery allows use to bind a task , providing the task instance itself as the first parameter. In this case the signature will must contain a first parameter just before args and kwargs : # tasks/example.py from director import task @task ( bind = True , name = \"BOUND_TASK\" ) def bound_task ( self , * args , ** kwargs ): print ( self . name )","title":"Bound Tasks"},{"location":"guides/write-tasks/#celery-task-options","text":"The task() decorator provided by Director is just a wrapper of the native app.task() decorator provided by Celery, so all the original options are still available. You can for example apply a rate_limit or even configure the max number of retries.","title":"Celery Task Options"}]}
\ No newline at end of file
diff --git a/sitemap.xml b/sitemap.xml
index 4d82a55..283ef0e 100644
--- a/sitemap.xml
+++ b/sitemap.xml
@@ -2,47 +2,47 @@
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
None
- 2020-10-16
+ 2020-11-26
daily
\ No newline at end of file
diff --git a/sitemap.xml.gz b/sitemap.xml.gz
index bcaa46b..c7e42a4 100644
Binary files a/sitemap.xml.gz and b/sitemap.xml.gz differ