In [1]:
%pip install parsl


Collecting parsl
  Obtaining dependency information for parsl from https://files.pythonhosted.org/packages/1c/e0/9455faca67f5f8f6afa3414526452f442c120f675677d7f2654b3a212d63/parsl-2023.7.17-py3-none-any.whl.metadata
  Using cached parsl-2023.7.17-py3-none-any.whl.metadata (3.6 kB)
Collecting typeguard<3,>=2.10 (from parsl)
  Using cached typeguard-2.13.3-py3-none-any.whl (17 kB)
Collecting types-paramiko (from parsl)
  Obtaining dependency information for types-paramiko from https://files.pythonhosted.org/packages/8e/92/17f6d1bf82b039f6b42d9ef77b9dd96422691d8c3cc1dc2e6f4c1a3726b1/types_paramiko-3.2.0.1-py3-none-any.whl.metadata
  Using cached types_paramiko-3.2.0.1-py3-none-any.whl.metadata (1.6 kB)
Collecting types-requests (from parsl)
  Obtaining dependency information for types-requests from https://files.pythonhosted.org/packages/06/9b/04bb62f11a6824df5d4568439cf0715118c265d0ffbebeb7cf4b8c9caa15/types_requests-2.31.0.2-py3-none-any.whl.metadata
  Using cached types_requests-2.31.0

In [2]:
import parsl
import os
from parsl.app.app import python_app, bash_app
from parsl.configs.local_threads import config

#parsl.set_stream_logger() # <-- log everything to stdout

print(parsl.__version__)

2023.07.17


In [3]:
parsl.load(config)

<parsl.dataflow.dflow.DataFlowKernel at 0x11d65d480>

In [4]:
@python_app
def hello ():
    return 'Hello World!'

print(hello().result())

Hello World!


In [5]:
@python_app
def multiply(a, b):
    return a * b

print(multiply(5, 9).result())

45


In [6]:
@python_app
def slow_hello ():
    import time
    time.sleep(5)
    return 'Hello World!'

print(slow_hello().result())

Hello World!


In [7]:
@bash_app
def echo_hello(stdout='echo-hello.stdout', stderr='echo-hello.stderr'):
    return 'echo "Hello World!"'

echo_hello().result()

with open('echo-hello.stdout', 'r') as f:
     print(f.read())

Hello World!



In [8]:
for i in range(3):
    with open(os.path.join(os.getcwd(), 'hello-{}.txt'.format(i)), 'w') as f:
        f.write('hello {}\n'.format(i))

In [9]:
from parsl.data_provider.files import File

@bash_app
def cat(inputs=[], outputs=[]):
    return 'cat {} > {}'.format(" ".join([i.filepath for i in inputs]), outputs[0])

concat = cat(inputs=[File(os.path.join(os.getcwd(), 'hello-0.txt')),
                    File(os.path.join(os.getcwd(), 'hello-1.txt')),
                    File(os.path.join(os.getcwd(), 'hello-2.txt'))],
             outputs=[File(os.path.join(os.getcwd(), 'all_hellos.txt'))])

# Open the concatenated file
with open(concat.outputs[0].result(), 'r') as f:
     print(f.read())

hello 0
hello 1
hello 2



In [21]:
type(config)

parsl.config.Config

In [10]:
@python_app
def hello ():
    import time
    time.sleep(5)
    return 'Hello World!'

app_future = hello()

# Check if the app_future is resolved, which it won't be
print('Done: {}'.format(app_future.done()))

# Print the result of the app_future. Note: this
# call will block and wait for the future to resolve
print('Result: {}'.format(app_future.result()))
print('Done: {}'.format(app_future.done()))

Done: False
Result: Hello World!
Done: True


In [11]:
# App that echos an input message to an output file
@bash_app
def slowecho(message, outputs=[]):
    return 'sleep 5; echo %s &> %s' % (message, outputs[0])

# Call slowecho specifying the output file
hello = slowecho('Hello World!', outputs=[File(os.path.join(os.getcwd(), 'hello-world.txt'))])

# The AppFuture's outputs attribute is a list of DataFutures
print(hello.outputs)

# Also check the AppFuture
print('Done: {}'.format(hello.done()))

# Print the contents of the output DataFuture when complete
with open(hello.outputs[0].result(), 'r') as f:
     print(f.read())

# Now that this is complete, check the DataFutures again, and the Appfuture
print(hello.outputs)
print('Done: {}'.format(hello.done()))

[<parsl.app.futures.DataFuture object at 0x11d65ebf0 representing <File at 0x118227eb0 url=/Users/lauvav/Github/vairus-blog/posts/2023-07-20-parsl-test/hello-world.txt scheme=file netloc= path=/Users/lauvav/Github/vairus-blog/posts/2023-07-20-parsl-test/hello-world.txt filename=hello-world.txt> not done>]
Done: False
Hello World!

[<parsl.app.futures.DataFuture object at 0x11d65ebf0 representing <File at 0x118227eb0 url=/Users/lauvav/Github/vairus-blog/posts/2023-07-20-parsl-test/hello-world.txt scheme=file netloc= path=/Users/lauvav/Github/vairus-blog/posts/2023-07-20-parsl-test/hello-world.txt filename=hello-world.txt> done>]
Done: True


In [12]:
from parsl.data_provider.files import File

# App that copies the contents of a file to another file
@bash_app
def copy(inputs=[], outputs=[]):
     return 'cat %s &> %s' % (inputs[0], outputs[0])

# Create a test file
open(os.path.join(os.getcwd(), 'cat-in.txt'), 'w').write('Hello World!\n')

# Create Parsl file objects
parsl_infile = File(os.path.join(os.getcwd(), 'cat-in.txt'),)
parsl_outfile = File(os.path.join(os.getcwd(), 'cat-out.txt'),)

# Call the copy app with the Parsl file
copy_future = copy(inputs=[parsl_infile], outputs=[parsl_outfile])

# Read what was redirected to the output file
with open(copy_future.outputs[0].result(), 'r') as f:
     print(f.read())

Hello World!



In [13]:
from parsl.data_provider.files import File

@python_app
def sort_numbers(inputs=[]):
    with open(inputs[0].filepath, 'r') as f:
        strs = [n.strip() for n in f.readlines()]
        strs.sort()
        return strs

unsorted_file = File('https://raw.githubusercontent.com/Parsl/parsl-tutorial/master/input/unsorted.txt')

f = sort_numbers(inputs=[unsorted_file])
print (f.result())

['0', '1', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '2', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '3', '30', '31', '32', '33', '34', '35', '36', '37', '38', '39', '4', '40', '41', '42', '43', '44', '45', '46', '47', '48', '49', '5', '50', '51', '52', '53', '54', '55', '56', '57', '58', '59', '6', '60', '61', '62', '63', '64', '65', '66', '67', '68', '69', '7', '70', '71', '72', '73', '74', '75', '76', '77', '78', '79', '8', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '9', '90', '91', '92', '93', '94', '95', '96', '97', '98', '99']


In [14]:
# App that generates a random number
@python_app
def generate(limit):
      from random import randint
      return randint(1,limit)

# App that writes a variable to a file
@bash_app
def save(variable, outputs=[]):
      return 'echo %s &> %s' % (variable, outputs[0])

# Generate a random number between 1 and 10
random = generate(10)
print('Random number: %s' % random.result())

# Save the random number to a file
saved = save(random, outputs=[File(os.path.join(os.getcwd(), 'sequential-output.txt'))])

# Print the output file
with open(saved.outputs[0].result(), 'r') as f:
      print('File contents: %s' % f.read())

Random number: 8
File contents: 8



In [15]:
# App that generates a random number after a delay
@python_app
def generate(limit,delay):
    from random import randint
    import time
    time.sleep(delay)
    return randint(1,limit)

# Generate 5 random numbers between 1 and 10
rand_nums = []
for i in range(5):
    rand_nums.append(generate(10,i))

# Wait for all apps to finish and collect the results
outputs = [i.result() for i in rand_nums]

# Print results
print(outputs)

[6, 2, 2, 7, 2]


In [16]:
# App that generates a semi-random number between 0 and 32,767
@bash_app
def generate(outputs=[]):
    return "echo $(( RANDOM )) &> {}".format(outputs[0])

# App that concatenates input files into a single output file
@bash_app
def concat(inputs=[], outputs=[]):
    return "cat {0} > {1}".format(" ".join([i.filepath for i in inputs]), outputs[0])

# App that calculates the sum of values in a list of input files
@python_app
def total(inputs=[]):
    total = 0
    with open(inputs[0], 'r') as f:
        for l in f:
            total += int(l)
    return total

# Create 5 files with semi-random numbers in parallel
output_files = []
for i in range (5):
     output_files.append(generate(outputs=[File(os.path.join(os.getcwd(), 'random-{}.txt'.format(i)))]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0] for i in output_files],
            outputs=[File(os.path.join(os.getcwd(), 'all.txt'))])

# Calculate the sum of the random numbers
total = total(inputs=[cc.outputs[0]])
print (total.result())

85706


In [17]:
from parsl.app.app import join_app, python_app

@python_app
def add(*args):
    """Add all of the arguments together. If no arguments, then
    zero is returned (the neutral element of +)
    """
    accumulator = 0
    for v in args:
        accumulator += v
    return accumulator


@join_app
def fibonacci(n):
    if n == 0:
        return add()
    elif n == 1:
        return add(1)
    else:
        return add(fibonacci(n - 1), fibonacci(n - 2))

print(fibonacci(10).result())

55


In [18]:
# App that estimates pi by placing points in a box
@python_app
def pi(num_points):
    from random import random

    inside = 0
    for i in range(num_points):
        x, y = random(), random()  # Drop a random point in the box.
        if x**2 + y**2 < 1:        # Count points within the circle.
            inside += 1

    return (inside*4 / num_points)

# App that computes the mean of three values
@python_app
def mean(a, b, c):
    return (a + b + c) / 3

# Estimate three values for pi
a, b, c = pi(10**6), pi(10**6), pi(10**6)

# Compute the mean of the three estimates
mean_pi  = mean(a, b, c)

# Print the results
print("a: {:.5f} b: {:.5f} c: {:.5f}".format(a.result(), b.result(), c.result()))
print("Average: {:.5f}".format(mean_pi.result()))

a: 3.14020 b: 3.14044 c: 3.14084
Average: 3.14049


In [19]:
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor

local_threads = Config(
    executors=[
        ThreadPoolExecutor(
            max_threads=8,
            label='local_threads'
        )
    ]
)
