# Parallel dataflows

##### Parallel dataflows can be developed by passing data between Apps. 
###### This example is taken from the Parsl documentation ( section Parallel Dataflow).
###### In this example we create a set of files, each with a random number, 
###### we then concatenate these files into a single file and compute the sum of all numbers in that file.

###### The files will be created with the sandbox_app.
###### The transfer of files between apps is done through the use of the workflow: // schema

In [1]:
import parsl
from parsl.app.app import sandbox_app, python_app, bash_app
from parsl.data_provider.files import File
import os

print(parsl.version.VERSION)

1.1.0a1


In [2]:
parsl.load()

<parsl.dataflow.dflow.DataFlowKernel at 0x7f4d2871cdc0>

In [3]:
parsl.dfk().workflow_name="helloSandbox"

In [4]:
#App that generates a semi-random number (between 0 and 32.76)

@sandbox_app()
def generate(args,workflow_app_name =""):
    return "sleep {}; echo $(( RANDOM )) &> out.txt".format(str(args))


# App that concatenates input files into a single output file
@sandbox_app()
def concat(inputs=[], workflow_app_name=""):
    return "cat {0} > out.txt".format(" ".join([i for i in inputs]))

In [5]:
# Create 5 files with semi-random numbers in parallel
output_files = []
for i in range (5):
    output_files.append(generate(str(i+10),workflow_app_name = "G-{}".format(str(i))))

In [6]:
for i in range(len(output_files)):
    print(output_files[i].workflow_schema)

workflow://helloSandbox/G-0
workflow://helloSandbox/G-1
workflow://helloSandbox/G-2
workflow://helloSandbox/G-3
workflow://helloSandbox/G-4


In [7]:
# # Concatenate the files into a single file
cc = concat(inputs=[i.workflow_schema+"/out.txt" for i in output_files])

In [8]:
# App that calculates the sum of values in a list of input files
@python_app
def total(inputs=[]):
    def workflow_schema_resolver(inFile):
        inFile = inFile.replace(parsl.dfk().SCHEMA,"")
        filepath = inFile
        inFile = inFile.split("/")
        filepath = filepath.replace(inFile[1],parsl.dfk()._find_task_by_name(inFile[1])['app_fu'].result()['working_directory'])
        return filepath
    total = 0
    with open(workflow_schema_resolver(inputs[0]), 'r') as f:
        for l in f:
            total += int(l)
    return total

print(total(inputs = [cc.workflow_schema+"/out.txt"]).result())

78199
