### Running the Example

Below is an example application which uses the above code block. It otherwise does not differ from our earlier examples (but only adds on-th-fly creation of input.dat).

The result of this example’s execution is straight forward, as expected, but proves that the file staging happened as planned. You will likely notice though that the code runs significantly longer than earlier ones, because of the file staging overhead.

We start by importing the radical.pilot module and initializing the reporter facility used for printing well formatted runtime and progress information.

In [None]:
import os
import sys

verbose  = os.environ.get('RADICAL_PILOT_VERBOSE', 'REPORT')
os.environ['RADICAL_PILOT_VERBOSE'] = verbose

import radical.pilot as rp
import radical.utils as ru

report = ru.Reporter(name='radical.pilot')
report.title('Getting Started (RP version %s)' % rp.version)

We will now import the dotenv module for fetching our environment variables. To create a new Session, you need to provide the URL of a MongoDB server which we will fetch from our .env file.

We will set the resource value to 'local.localhost'. Using a resource key other than local.localhost implicitly tells RADICAL-Pilot that it is targeting a remote resource.

In [None]:
from dotenv import load_dotenv
load_dotenv()

resource = 'local.localhost'
session = rp.Session()

All other pilot code is now tried/excepted. If an exception is caught, we can rely on the session object to exist and be valid, and we can thus tear the whole RP stack down via a <i>'session.close()'</i> call in the <i>'finally'</i> clause.

In [None]:
def get_pilot_description(resource):
    report.info('read config')
    config = ru.read_json('../config.json')
    report.ok('>>ok\n')
    report.header('submit pilots')
    pdescs = list()
    pd_init = {
               'resource'      : resource,
               'runtime'       : 15,  # pilot runtime (min)
               'exit_on_error' : True,
               'project'       : config[resource].get('project', None),
               'queue'         : config[resource].get('queue', None),
               'access_schema' : config[resource].get('schema', None),
               'cores'         : config[resource].get('cores', 1),
               'gpus'          : config[resource].get('gpus', 0),
              }
    pdesc = rp.PilotDescription(pd_init)
    return pdesc

In this function, we first register the pilot in a TaskManager object. We then create a workload of char-counting a simple file. For this we create a the file right here and then use it as task input data for each task.

After this we initialize the number of tasks(n=128) and create a new Task description.
We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

## Staging Task Input Data

The vast majority of applications operate on data, and many of those read input data from files. Since RP provides an abstraction above the resource layer, it can run a task on any pilot the application created (see Selecting a Task Scheduler). To ensure that the task finds the data it needs on the resource where it runs, RP provides a mechanism to stage input data automatically.

For each task, the application can specify
 - source: what data files need to be staged;
 - target: what should the path be in the context of the task execution;
 - action: how should data be staged.

If <i>source</i> and <i>target</i> file names are the same, and if action is the default rp.TRANSFER, then you can simply specify task input data by giving a list of file names (we’ll discuss more complex staging directives in a later example):



```python
cud = rp.TaskDescription()
cud.executable     = '/usr/bin/wc'
cud.arguments      = ['-c', 'input.dat']
cud.input_staging  = ['input.dat']
```

In [None]:
def submit_input_staging_tasks(tmgr):
    report.header('submit tasks')

    os.system('hostname >  input.dat')
    os.system('date     >> input.dat')

    n = 10
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable     = '/usr/bin/wc'
        td.arguments      = ['-c', 'input.dat']
        td.input_staging  = {'source': 'client:///input.dat',
                             'target': 'task:///input.dat',
                             'action': rp.TRANSFER}
        tds.append(td)
        report.progress()
    report.ok('>>ok\n')
    tasks = tmgr.submit_tasks(tds)

    report.header('gather results')
    tmgr.wait_tasks()
    os.system('rm input.dat')
    return tasks


## Staging Task Output Data

Upon completion, tasks often create some amount of data. We have seen in Obtaining Task Details how we can inspect the task’s stdout string, but that will not be useful beyond the most trivial workloads. This section shows how to stage the output data of tasks back to the RP application, and/or to arbitrary storage locations and devices.

In principle, output staging is specified as the input staging discussed in the previous section:

 - source: what files need to be staged from the context of the task that terminated execution
 - target: where should the files be staged to
 - action: how should files be staged.

Note that in this example we specify the output file name to be changed to a unique name during staging:

```python

for i in range(0, n):
    cud.executable     = '/bin/cp'
    cud.arguments      = ['-v', 'input.dat', 'output.dat']
    cud.input_staging  = ['input.dat']
    cud.output_staging = {'source': 'output.dat',
                          'target': 'output_%03d.dat' % i,
                          'action': rp.TRANSFER}
```

In the <i>submit_tasks</i> function, we first register the pilot in a TaskManager object. We then create a workload of char-counting a simple file. For this we create a the file right here and then use it as task input data for each task.

After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

We will use ```tmgr.wait_tasks()```to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

In [None]:
def submit_tasks(tmgr):
    report.header('submit tasks')

    os.system('hostname >  /tmp/input.dat')
    os.system('date     >> /tmp/input.dat')

    n = 128
    report.info('create %d task description(s)\n\t' % n)

    tds = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable     = '/bin/cp'
        td.arguments      = ['-v', 'input.dat', 'output.dat']
        td.input_staging  = ['/tmp/input.dat']
        td.output_staging = {'source': 'task:///output.dat',
                              'target': 'client:///output_%03d.dat' % i,
                              'action': rp.TRANSFER}

        tds.append(td)
        report.progress()
    report.ok('>>ok\n')

    tasks = tmgr.submit_tasks(tds)

    report.header('gather results')
    tmgr.wait_tasks()
    return tasks

We create the <i>report_task_progress</i> function to report the task status of each task

In [None]:
def report_staging_in_task_progress(tasks):
    report.info('\n')
    for task in tasks:
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, task.stdout.strip()[:35]))




We create the <i>report_task_progress</i> function to report the task status of each task

In [None]:
def report_staging_out_task_progress(tasks):
    counts = dict()
    for task in tasks:
        out_str = task.stdout.strip()[:35]
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, out_str))
        if out_str not in counts:
            counts[out_str] = 0
        counts[out_str] += 1

    report.info("\n")
    for out_str in counts:
        report.info("  * %-20s: %3d\n" % (out_str, counts[out_str]))
    report.info("  * %-20s: %3d\n" % ('total', sum(counts.values())))


## Sharing Task Input Data

RP supports the concurrent execution of many tasks and, often, these tasks share some or all their input data, i.e., files. We have seen earlier that input staging can incur a significant runtime overhead. Such an overhead can be significantly reduced by avoiding redundant file staging operations.

Each RP pilot manages a shared data space where to store tasks’ input files. First, RP can stage input files into the shared data space of a pilot. Second, that pilot can create symbolic links (symlinks) in the work directory of each task to any file in the shared data space. In this way, set of tasks can access the same file, avoiding costly staging and replicating operations.

Stage shared data from `pwd` to the pilot's shared data space

```python
pilot.stage_in({'source': 'file://%s/input.dat' % os.getcwd(),
                'target': 'staging:///input.dat',
                'action': rp.TRANSFER})
```

Create a symlink in the work directory of each task to the file <i>input.dat</i>

```python
for i in range(0, n):
    cud = rp.TaskDescription()

    cud.executable     = '/usr/bin/wc'
    cud.arguments      = ['-c', 'input.dat']
    cud.input_staging  = {'source': 'staging:///input.dat',
                          'target': 'input.dat',
                          'action': rp.LINK}
```

The rp.LINK staging action creates a symlink, avoiding the copy operation used by the rp.TRANSFER action.

<b>Note:</b> Unlike other methods in RP, the pilot.stage_in method is synchronous, i.e., it only returns once the transfer is completed. This may change in a future version of RP.




Create a workload of char-counting a simple file.  We first create the file right here, and stage it to the pilot 'shared_data' space. We then synchronously stage the data to the pilot


In [None]:
def create_and_stage_input_data(pilot):
    os.system('hostname >  input.dat')
    os.system('date     >> input.dat')

    report.info('stage in shared data')
    pilot.stage_in({'source': 'client:///input.dat',
                    'target': 'pilot:///input.dat',
                    'action': rp.TRANSFER})
    report.ok('>>ok\n')


In the <i>submit_tasks</i> function, we first register the pilot in a TaskManager object.

After this we initialize the number of tasks(n=128) and create a new Task description. We submit the previously created Task descriptions to the PilotManager. This will trigger the selected scheduler to start assigning Tasks to the Pilots.

We will use ```tmgr.wait_tasks()```to wait for all tasks to reach a final (DONE, CANCELED or FAILED).

In [None]:
def submit_tasks(tmgr):
    report.header('submit tasks')

    n = 10
    report.info('create %d task description(s)\n\t' % n)

    tds  = list()
    outs = list()
    for i in range(0, n):
        td = rp.TaskDescription()
        td.executable     = '/bin/cat'
        td.arguments      = ['input.dat']
        td.stdout         = 'STDOUT'
        td.input_staging  = [{'source': 'pilot:///input.dat',
                              'target': 'task:///input.dat',
                              'action': rp.LINK}]
        td.output_staging = [{'source': 'task:///STDOUT',
                              'target': 'pilot:///STDOUT.%06d' % i,
                              'action': rp.COPY}]
        outs.append('STDOUT.%06d' % i)
        tds.append(td)
        report.progress()
    report.ok('>>ok\n')

    tasks = tmgr.submit_tasks(tds)
    report.header('gather results')
    tmgr.wait_tasks()
    return tasks, outs


We create the <i>report_task_progress</i> function to report the task status of each task

In [None]:
def report_task_progress(tasks, pilot,outs):
    report.info('\n')
    for task in tasks:
        report.plain('  * %s: %s, exit: %3s, out: %s\n'
                % (task.uid, task.state[:4],
                    task.exit_code, task.stdout.strip()[:35]))

    os.system('rm input.dat')

    report.info('stage out shared data')
    pilot.stage_out([{'source': 'pilot:///%s'  % fname,
                      'target': 'client:///%s' % fname,
                      'action': rp.TRANSFER} for fname in outs])
    report.ok('>>ok\n')


We put all function calls inside a try except block.  Finally, always clean up the session no matter if we caught an exception or not. This will kill all the remaining pilots.

In [None]:
try:
    pmgr  = rp.PilotManager(session=session)
    pilot = pmgr.submit_pilots(get_pilot_description(resource))

    tmgr = rp.TaskManager(session=session)
    tmgr.add_pilots(pilot)

    tasks = submit_input_staging_tasks(tmgr)
    report_staging_in_task_progress(tasks)
except Exception as e:
    report.error('caught Exception: %s\n' % e)
    raise

except (KeyboardInterrupt, SystemExit):
    report.warn('exit requested\n')

finally:
    report.header('finalize')
    session.close(cleanup=True)

report.header()