# Celery Director

## Installation

```
$ pip install celery-director
```

## Init

```
$ director init workflows
[*] Project created in /[...]/workflows
You can now export the DIRECTOR_HOME environment variable

$ export DIRECTOR_HOME="/[...]/workflows"

$ cd workflows/
$ ls
tasks workflows.yml
$ ls tasks/
etl.py
```

## Database

```
$ director upgradedb
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 70631f8bcff3, Init database

$ ls
director.db  tasks  workflows.yml
```

## List workflows

```
$ director workflow list
+----------------+----------+-----------+
| Workflows (1)  | Periodic | Tasks     |
+----------------+----------+-----------+
| ovh.SIMPLE_ETL |    --    | EXTRACT   |
|                |          | TRANSFORM |
|                |          | LOAD      |
+----------------+----------+-----------+
```

## Execute the workflow

```
$ director workflow run ovh.SIMPLE_ETL '{}'
```

## Display the pending tasks

```
$ director webserve
```

## Execute the tasks and re-display them

```
$ director celery worker
[...]
[2020-01-29 16:12:02,454: WARNING/ForkPoolWorker-1] Extracting data
[2020-01-29 16:12:02,494: WARNING/ForkPoolWorker-3] Transforming data
[2020-01-29 16:12:02,547: WARNING/ForkPoolWorker-1] Loading data

$ director webserver
```

## Create an error task

```
$ cat tasks/error.py
from director import task


@task(name="ERROR")
def error(*args, **kwargs):
    print(1/0)

$ cat workflows.yml
ovh.WILLFAIL:
  tasks:
    - EXTRACT
    - GROUP_1:
        type: group
        tasks:
          - TRANSFORM
          - ERROR
    - LOAD
```

## Execute and display it

```
$ director  workflow run example.FAIL '{}'

$ director celery worker                    
[...]
celery.exceptions.ChordError: Dependency 84077038-258f-4d64-9686-0c1462dc6da8 raised ZeroDivisionError('division by zero',)

$ director webserver
```