# Crossflow - subprocess: run interactively

In [1]:
from crossflow.tasks import SubprocessTask

In [2]:
with open('test.txt', 'w') as f:
    for i in range(100):
        f.write(f'line {i}\n')

In [3]:
# set up the subprocess task
reverse = SubprocessTask('tac lines.txt > resersed.txt')
reverse.set_inputs(['lines.txt'])
reverse.set_outputs(['resersed.txt'])

In [4]:
# indicate the input file
reversed = reverse('test.txt')
print(type(reversed))
print(reversed)

<class 'crossflow.filehandling.FileHandle'>
/tmp/219a78a3-5521-4022-90e1-594e1b6f608f.txt


In [5]:
# show the output on the screen
print(reversed.read_text())

line 99
line 98
line 97
line 96
line 95
line 94
line 93
line 92
line 91
line 90
line 89
line 88
line 87
line 86
line 85
line 84
line 83
line 82
line 81
line 80
line 79
line 78
line 77
line 76
line 75
line 74
line 73
line 72
line 71
line 70
line 69
line 68
line 67
line 66
line 65
line 64
line 63
line 62
line 61
line 60
line 59
line 58
line 57
line 56
line 55
line 54
line 53
line 52
line 51
line 50
line 49
line 48
line 47
line 46
line 45
line 44
line 43
line 42
line 41
line 40
line 39
line 38
line 37
line 36
line 35
line 34
line 33
line 32
line 31
line 30
line 29
line 28
line 27
line 26
line 25
line 24
line 23
line 22
line 21
line 20
line 19
line 18
line 17
line 16
line 15
line 14
line 13
line 12
line 11
line 10
line 9
line 8
line 7
line 6
line 5
line 4
line 3
line 2
line 1
line 0



In [6]:
# save as an output file
reversed.save('reversed.txt')

'reversed.txt'

# Crossflow - cluster&client: run in background

In [7]:
from distributed import LocalCluster
from crossflow.clients import Client

# set up the cluster and client
cluster = LocalCluster(n_workers=1)
client = Client(cluster)

## submit jobs to the local cluster from client

In [8]:
reversed = client.submit(reverse, 'test.txt')
print(reversed)

<Future: pending, key: run-a752acd2-52b2-4f81-9365-811cb4d00ca3>


In [9]:
print(reversed.status)

pending


In [10]:
real_reversed = reversed.result()
print(real_reversed.read_text()) # not output file, just show the results

line 99
line 98
line 97
line 96
line 95
line 94
line 93
line 92
line 91
line 90
line 89
line 88
line 87
line 86
line 85
line 84
line 83
line 82
line 81
line 80
line 79
line 78
line 77
line 76
line 75
line 74
line 73
line 72
line 71
line 70
line 69
line 68
line 67
line 66
line 65
line 64
line 63
line 62
line 61
line 60
line 59
line 58
line 57
line 56
line 55
line 54
line 53
line 52
line 51
line 50
line 49
line 48
line 47
line 46
line 45
line 44
line 43
line 42
line 41
line 40
line 39
line 38
line 37
line 36
line 35
line 34
line 33
line 32
line 31
line 30
line 29
line 28
line 27
line 26
line 25
line 24
line 23
line 22
line 21
line 20
line 19
line 18
line 17
line 16
line 15
line 14
line 13
line 12
line 11
line 10
line 9
line 8
line 7
line 6
line 5
line 4
line 3
line 2
line 1
line 0



## add a second step to the workflow: chunk and generate multiple files

In [11]:
split5 = SubprocessTask('split -l {chunk_size} infile')
split5.set_inputs(['infile', 'chunk_size'])
split5.set_outputs(['xaa', 'xab', 'xac', 'xad', 'xae'])

In [12]:
n_lines = reversed.result().read_text().count('\n')
chunks = client.submit(split5, reversed, n_lines//5)
print(chunks)

(<Future: pending, key: lambda-5dcfdba4ae1799f775a83d9b512b30d5>, <Future: pending, key: lambda-0f7988ee2cfd6de5eea8184dbc15fe0f>, <Future: pending, key: lambda-7ea999776a1e18d3bac269071f371e95>, <Future: pending, key: lambda-d8501fc27dbcc6cb7868ef6e5b844bf6>, <Future: pending, key: lambda-2f1cbb3791d9cab5ad434903077ab8d3>)


In [13]:
reversed.result().read_text().split('\n')[-5:]

['line 3', 'line 2', 'line 1', 'line 0', '']

In [14]:
revchunks = []
for chunk in chunks:
    revchunks.append(client.submit(reverse, chunk))

In [15]:
revchunks = client.map(reverse, chunks)

In [16]:
print(revchunks[0].result())

/tmp/2ea804f0-6761-4d3a-b819-b70f116bbdc5.txt


## put the file back

In [17]:
# putting it back together
cat = SubprocessTask('cat chunk* > whole')
cat.set_inputs(['chunk*'])
cat.set_outputs(['whole'])

In [18]:
whole = client.submit(cat, revchunks)
print(whole.result().read_text())

line 80
line 81
line 82
line 83
line 84
line 85
line 86
line 87
line 88
line 89
line 90
line 91
line 92
line 93
line 94
line 95
line 96
line 97
line 98
line 99
line 60
line 61
line 62
line 63
line 64
line 65
line 66
line 67
line 68
line 69
line 70
line 71
line 72
line 73
line 74
line 75
line 76
line 77
line 78
line 79
line 40
line 41
line 42
line 43
line 44
line 45
line 46
line 47
line 48
line 49
line 50
line 51
line 52
line 53
line 54
line 55
line 56
line 57
line 58
line 59
line 20
line 21
line 22
line 23
line 24
line 25
line 26
line 27
line 28
line 29
line 30
line 31
line 32
line 33
line 34
line 35
line 36
line 37
line 38
line 39
line 0
line 1
line 2
line 3
line 4
line 5
line 6
line 7
line 8
line 9
line 10
line 11
line 12
line 13
line 14
line 15
line 16
line 17
line 18
line 19



## setting the functions to put it all together

In [19]:
def my_workflow(input_filename, output_filename):
    '''
    Run the whole workflow
    
    For compactness the function does not redefine the tasks, etc.
    '''
    reversed = client.submit(reverse, input_filename)
    n_lines = reversed.result().read_text().count('\n')
    chunks = client.submit(split5, reversed, n_lines//5)
    revchunks = client.map(reverse, chunks)
    whole = client.submit(cat, revchunks)
    whole.result().save(output_filename)

In [20]:
my_workflow('lines.txt', 'crossflow_output.txt')