# Essential: Core concepts


## Ploomber's core: Tasks, Products, DAG and Clients

To get started with ploomber you only have to learn four concepts:

1. Task. A unit of work that takes some input and produces a persistent change
2. Product. A persistent change *produced* by a Task (e.g. a file in the local filesystem, a table in a remote database)
3. DAG. A collection of Tasks, used to specify dependencies among them (use output from Task A as input for Task B)
4. Client. An object that communicates with an external system (e.g. a database)

There is a standard [Task API](../api.rst#ploomber.tasks.Task) defined by an abstract class, this is also true for [Products](../api.rst#ploomber.products.Product) and [Clients](../api.rst#ploomber.clients.Client). Which means you only have to learn the concept once and all concrete classes will have the same API.


## The DAG lifecycle: Declare, render, build

A DAG goes through three steps before being executed:

1. Declaration. A DAG is created and Tasks are added to it
2. Rendering. Placeholders are resolved and validation is performed on Task inputs
3. Building. All *outdated* Tasks are executed in the appropriate order (run upstream task dependencies first)

### Declaration

In [1]:
import tempfile
import os
tmp_dir = tempfile.mkdtemp()
os.chdir(tmp_dir)

In [2]:
from pathlib import Path
import pandas as pd
from ploomber import DAG
from ploomber.tasks import PythonCallable, SQLUpload, SQLScript
from ploomber.clients import SQLAlchemyClient
from ploomber.products import File, SQLiteRelation

The simplest Task is `PythonCallable`, which takes a callable (e.g. a function) as its first argument. The only requirement for the functions is to have a `product`
argument, if the task has dependencies, it must have an upstream argument as well.

In [3]:
def _one_task(product):
    pd.DataFrame({'one_column': [1, 2, 3]}).to_csv(str(product))

def _another_task(upstream, product):
    df = pd.read_csv(str(upstream['one']))
    df['another_column'] = df['one_column'] + 1
    df.to_csv(str(product))

In [4]:
# instantate our DAG
dag = DAG()

# instantiate two tasks and add them to the DAG
one_task = PythonCallable(_one_task, File('one_file.csv'), dag, name='one')
another_task = PythonCallable(_another_task, File('another_file.csv'), dag, name='another')

# declare dependencies: another_task depends on one_task
one_task >> another_task

PythonCallable: another -> File(another_file.csv)

Note that in the previous function definitions we use `str(product)`, since products are custom objects, they will not work directly when used as parameters to the `DataFrame.to_csv()` function, since our products are `File` objects, using `str` will return a the path as a string. Other products implement different logic, for example a `SQLRelation` returns a `schema.name` string.

In [5]:
f = File('/path/to/some/file')
print('* {} to str: "{}"'.format(repr(f), str(f)))

# SQLiteRelation takes a ('schema', 'name', 'kind') or a ('name', 'kind') tuple where kind is 'table' or 'view'
t = SQLiteRelation(('my_table', 'table'))
print('* {} to str: "{}"'.format(repr(t), str(t)))

* File(/path/to/some/file) to str: "/path/to/some/file"
* SQLiteRelation(my_table) to str: "my_table"


### Rendering

To generate a Product, Tasks use a combination of inputs and a `source`. The kind of source depends on the kind of Task, `PythonCallable` uses a Python function as source, `SQLScript` uses a string with SQL code as source, `SQLUpload` uses a string to a file as source. Rendering is the process where any necessary preparation and validation to the source take place.

One use case for this is to avoid redudant code. If a Task is declared to have an upstream dependency, it means that it will take the upstream Product as input, instead of declaring the Product twice, we can refer to it in the downstream task using a placeholder. Let's see an example using `SQLUpload`:

In [6]:
# Clients are objects that communicate with external systems, such as databases
client = SQLAlchemyClient('sqlite:///my_db.db')

# Tasks that use clients have a client argument, but you can also define DAG-level clients
dag.clients[SQLUpload] = client
dag.clients[SQLiteRelation] = client
dag.clients[SQLScript] = client

# Source is defined as a placeholder: take the product from the upstream task
# named "another" and use it as source
my_table = SQLUpload(source='{{upstream["another"]}}',
                     product=SQLiteRelation(('my_table', 'table')),
                     dag=dag,
                     name='my_table')

another_task >> my_table

SQLUpload: my_table -> SQLiteRelation(my_table)

In [7]:
# resolve placeholders by rendering
dag.render()

# let's see the rendered value:
print('my_table.source as string: "{}"'.format(str(my_table.source)))

HBox(children=(FloatProgress(value=0.0, max=3.0), HTML(value='')))


my_table.source as string: "another_file.csv"


Another important use case for placeholders are parametrized SQL queries. `SQLScript` runs SQL code in a database that creates a table or a view. Since ploomber requires sources (SQL code) and products (a table/view) to be declared separately we can use placeholders to only declare the product once:

In [8]:
source = """
-- product is a placeholder
DROP TABLE IF EXISTS {{product}};

CREATE TABLE {{product}}
AS SELECT * FROM {{upstream["my_table"]}}
WHERE one_column = 1
"""

# instead of declaring "second_table" twice, we declare it in product and refer to it in source
second_table = SQLScript(source=source,
                         product=SQLiteRelation(('second_table', 'table')),
                         dag=dag,
                         name='second_table')

my_table >> second_table

SQLScript: second_table -> SQLiteRelation(second_table)

In [9]:
dag.render()

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




DAG("No name")

In [10]:
print('second_table.source:\n{}'.format(str(second_table.source)))

second_table.source:

-- product is a placeholder
DROP TABLE IF EXISTS second_table;

CREATE TABLE second_table
AS SELECT * FROM my_table
WHERE one_column = 1


ploomber uses [jinja2](https://jinja.palletsprojects.com/en/2.11.x/api/) for rendering, which opens a wide range of possibilities rendering SQL source code. Note that this time, we didn't use the `str` operator explicitely as we did for PythonCallable, this is because jinja automatically casts objects to strings.

Before building our dag, let's take a look at the current status:

In [11]:
dag.status()

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




name,Last updated,Outdated dependencies,Outdated code,Product,Doc (short),Location
one,Has not been run,False,True,one_file.csv,,:1
another,Has not been run,True,True,another_file.csv,,:4
my_table,Has not been run,True,True,my_table,,
second_table,Has not been run,True,True,second_table,,


### Build

Once rendering is done, we can build our DAG. 

In [12]:
dag.build()

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




name,Ran?,Elapsed (s),Percentage
one,True,0.123032,46.1594
another,True,0.117866,44.2213
my_table,True,0.022522,8.44986
second_table,True,0.003117,1.16944


The first time we run our pipeline, all Tasks are executed, but the real power of ploomber is running builds over and over again. Ploomber keeps track of each Task's status and only executed outdated ones, since we just built our pipeline, nothing will run:

In [13]:
dag.build()

HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=4.0), HTML(value='')))




name,Ran?,Elapsed (s),Percentage
one,False,0,0
another,False,0,0
my_table,False,0,0
second_table,False,0,0


### Task status

Upon sucessful execution, a Task will save metadata along with the Product, to keep track of status in subsequent builds. Once a DAG is built (even if some tasks fail), another call to `dag.build()` will only trigger execution on outdated tasks. A task is run if any of the following conditions is true:

1. The Products doesn't exist (e.g. when a Task is run for the first time)
2. No metadata (e.g. when a Task crashes)
3. Any upstream source changed (e.g. an upstream SQL script changed)
4. Task's own source changed

These rules enable the following use cases:

1. Fast incremental builds: Modify any Task source, next build will only run outdated Tasks
2. Crash recovery: If a DAG crashes, the next run will start where it was interrupted


### Task parameters

There is one last remaining Task argument to explain: `params`, they are optional parameters whose effect varies depending on the kind of Task. `PythonCallable` just passes them when calling the underlying function, Tasks that take SQL code as source, pass them directly to the source (they are available as placeholders), `NotebookRunner` (which runs Jupyter notebooks), passes them as parameters using [papermill](https://github.com/nteract/papermill).

Let's take a look at the `params` of our previous DAG:

In [14]:
print('one_task params:\n\t', one_task.params)
print('another_task params:\n\t', another_task.params)

one_task params:
	 {'product': File(one_file.csv)}
another_task params:
	 {'upstream': Upstream({'one': File(one_file.csv)}), 'product': File(another_file.csv)}


Even though we didn't pass any `param` to the tasks, `product` and `upstream` are automateically added after doing `DAG.render()`, that's why we see those parameters.


As a general advice, it is best to keep `params` short, their primary use case is for creating dynamic DAGs (whose number of Tasks is determined using control structures). Dynamic DAGs are covered in a more advanced tutorial.

In [15]:
import shutil
shutil.rmtree(tmp_dir)