# MapNode

If you want to iterate over a list of inputs, but need to feed all iterated outputs afterward as one input (an array) to the next node, you need to use a **``MapNode``**. A ``MapNode`` is quite similar to a normal ``Node``, but it can take a list of inputs and operate over each input separately, ultimately returning a list of outputs.

Imagine that you have a list of items (let's say files) and you want to execute the same node on them (for example some smoothing or masking). Some nodes accept multiple files and do exactly the same thing on them, but some don't (they expect only one file). `MapNode` can solve this problem. Imagine you have the following workflow:

<img src="../static/images/mapnode.png"  width="325">

Node `A` outputs a list of files, but node `B` accepts only one file. Additionally, `C` expects a list of files. What you would like is to run `B` for every file in the output of `A` and collect the results as a list and feed it to `C`. Something like this:

```python
from nipype import Node, MapNode, Workflow
a = Node(interface=A(), name="a")
b = MapNode(interface=B(), name="b", iterfield=['in_file'])
c = Node(interface=C(), name="c")

my_workflow = Workflow(name="my_workflow")
my_workflow.connect([(a,b,[('out_files','in_file')]),
                     (b,c,[('out_file','in_files')])
                     ])
```

Let's demonstrate this with a simple function interface:

In [88]:
from nipype import Function
def square_func(x):
    return x ** 2
square = Function(["x"], ["f_x"], square_func)

We see that this function just takes a numeric input and returns its squared value.

In [89]:
square.run(x=2).outputs.f_x

4

What if we wanted to square a list of numbers? We could set an iterable and just split up the workflow in multiple sub-workflows. But say we were making a simple workflow that squared a list of numbers and then summed them. The sum node would expect a list, but using an iterable would make a bunch of sum nodes, and each would get one number from the list. The solution here is to use a `MapNode`.

## `iterfield`

The `MapNode` constructor has a field called `iterfield`, which tells it what inputs should be expecting a list.

In [90]:
from nipype import MapNode
square_node = MapNode(square, name="square", iterfield=["x"])

In [91]:
square_node.inputs.x = [0, 1, 2, 3]
res = square_node.run()

211103-17:05:56,187 nipype.workflow INFO:
	 [Node] Setting-up "square" in "/tmp/tmpq6s8od3w/square".
211103-17:05:56,190 nipype.workflow INFO:
	 [Node] Setting-up "_square0" in "/tmp/tmpq6s8od3w/square/mapflow/_square0".
211103-17:05:56,192 nipype.workflow INFO:
	 [Node] Running "_square0" ("nipype.interfaces.utility.wrappers.Function")
211103-17:05:56,195 nipype.workflow INFO:
	 [Node] Finished "_square0".
211103-17:05:56,196 nipype.workflow INFO:
	 [Node] Setting-up "_square1" in "/tmp/tmpq6s8od3w/square/mapflow/_square1".
211103-17:05:56,198 nipype.workflow INFO:
	 [Node] Running "_square1" ("nipype.interfaces.utility.wrappers.Function")
211103-17:05:56,200 nipype.workflow INFO:
	 [Node] Finished "_square1".
211103-17:05:56,202 nipype.workflow INFO:
	 [Node] Setting-up "_square2" in "/tmp/tmpq6s8od3w/square/mapflow/_square2".
211103-17:05:56,204 nipype.workflow INFO:
	 [Node] Running "_square2" ("nipype.interfaces.utility.wrappers.Function")
211103-17:05:56,206 nipype.workflow INFO:

In [92]:
res.outputs.f_x

[0, 1, 4, 9]

Because `iterfield` can take a list of names, you can operate over multiple sets of data, as long as they're the same length. The values in each list will be paired; it does not compute a combinatoric product of the lists.

In [93]:
def power_func(x, y):
    return x ** y

In [94]:
power = Function(["x", "y"], ["f_xy"], power_func)
power_node = MapNode(power, name="power", iterfield=["x", "y"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = [0, 1, 2, 3]
res = power_node.run()

211103-17:08:56,984 nipype.workflow INFO:
	 [Node] Setting-up "power" in "/tmp/tmp_6b6gik8/power".
211103-17:08:56,988 nipype.workflow INFO:
	 [Node] Setting-up "_power0" in "/tmp/tmp_6b6gik8/power/mapflow/_power0".
211103-17:08:56,990 nipype.workflow INFO:
	 [Node] Running "_power0" ("nipype.interfaces.utility.wrappers.Function")
211103-17:08:56,992 nipype.workflow INFO:
	 [Node] Finished "_power0".
211103-17:08:56,994 nipype.workflow INFO:
	 [Node] Setting-up "_power1" in "/tmp/tmp_6b6gik8/power/mapflow/_power1".
211103-17:08:56,995 nipype.workflow INFO:
	 [Node] Running "_power1" ("nipype.interfaces.utility.wrappers.Function")
211103-17:08:56,998 nipype.workflow INFO:
	 [Node] Finished "_power1".
211103-17:08:56,999 nipype.workflow INFO:
	 [Node] Setting-up "_power2" in "/tmp/tmp_6b6gik8/power/mapflow/_power2".
211103-17:08:57,1 nipype.workflow INFO:
	 [Node] Running "_power2" ("nipype.interfaces.utility.wrappers.Function")
211103-17:08:57,3 nipype.workflow INFO:
	 [Node] Finished "

In [95]:
print(res.outputs.f_xy)

[1, 1, 4, 27]


But not every input needs to be an iterfield.

In [96]:
power_node = MapNode(power, name="power", iterfield=["x"])
power_node.inputs.x = [0, 1, 2, 3]
power_node.inputs.y = 3
res = power_node.run()

211103-17:09:24,966 nipype.workflow INFO:
	 [Node] Setting-up "power" in "/tmp/tmp5i1xfcz3/power".
211103-17:09:24,968 nipype.workflow INFO:
	 [Node] Setting-up "_power0" in "/tmp/tmp5i1xfcz3/power/mapflow/_power0".
211103-17:09:24,971 nipype.workflow INFO:
	 [Node] Running "_power0" ("nipype.interfaces.utility.wrappers.Function")
211103-17:09:24,974 nipype.workflow INFO:
	 [Node] Finished "_power0".
211103-17:09:24,976 nipype.workflow INFO:
	 [Node] Setting-up "_power1" in "/tmp/tmp5i1xfcz3/power/mapflow/_power1".
211103-17:09:24,977 nipype.workflow INFO:
	 [Node] Running "_power1" ("nipype.interfaces.utility.wrappers.Function")
211103-17:09:24,980 nipype.workflow INFO:
	 [Node] Finished "_power1".
211103-17:09:24,982 nipype.workflow INFO:
	 [Node] Setting-up "_power2" in "/tmp/tmp5i1xfcz3/power/mapflow/_power2".
211103-17:09:24,985 nipype.workflow INFO:
	 [Node] Running "_power2" ("nipype.interfaces.utility.wrappers.Function")
211103-17:09:24,987 nipype.workflow INFO:
	 [Node] Finish

In [97]:
print(res.outputs.f_xy)

[0, 1, 8, 27]


As in the case of `iterables`, each underlying `MapNode` execution can happen in **parallel**. Hopefully, you see how these tools allow you to write flexible, reusable workflows that will help you process large amounts of data efficiently and reproducibly.

In more advanced applications it is useful to be able to iterate over items of nested lists (for example ``[[1,2],[3,4]]``). MapNode allows you to do this with the "nested=True" parameter. Outputs will preserve the same nested structure as the inputs.

# Why is this important?

Let's consider we have multiple functional images (A) and each of them should be motioned corrected (B1, B2, B3,..). But afterward, we want to put them all together into a GLM, i.e. the input for the GLM should be an array of [B1, B2, B3, ...]. [Iterables](basic_iteration.ipynb) can't do that. They would split up the pipeline. Therefore, we need **MapNodes**.

<img src="../static/images/mapnode.png"  width="300">

Let's look at a simple example, where we want to motion correct two functional images. For this we need two nodes:
 - Gunzip, to unzip the files (plural)
 - Realign, to do the motion correction

In [98]:
from nipype.algorithms.misc import Gunzip
from nipype.interfaces.spm import Realign
from nipype import Node, MapNode, Workflow

# Here we specify a list of files (for this tutorial, we just add the same file twice)
files = ['/home/neuro/Data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz',
         '/home/neuro/Data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz']

realign = Node(Realign(register_to_mean=True),
               name='motion_correction')

stty: 'standard input': Inappropriate ioctl for device


If we try to specify the input for the **Gunzip** node with a simple **Node**, we get the following error:

In [99]:
gunzip = Node(Gunzip(), name='gunzip',)
try:
    gunzip.inputs.in_file = files
except(Exception) as err:
    if "TraitError" in str(err.__class__):
        print("TraitError:", err)
    else:
        raise
else:
    raise

TraitError: The 'in_file' trait of a GunzipInputSpec instance must be a pathlike object or string representing an existing file, but a value of "['/home/neuro/Data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz', '/home/neuro/Data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz']" <class 'str'> was specified.


```bash
TraitError: The 'in_file' trait of a GunzipInputSpec instance must be an existing file name, but a value of ['/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz', '/data/ds000114/sub-01/ses-test/func/sub-01_ses-test_task-fingerfootlips_bold.nii.gz'] <class 'list'> was specified.
```

But if we do it with a **MapNode**, it works:

In [100]:
gunzip = MapNode(Gunzip(), name='gunzip',
                 iterfield=['in_file'])
gunzip.inputs.in_file = files

Now, we just have to create a workflow, connect the nodes and we can run it:

In [101]:
mcflow = Workflow(name='realign_with_spm')
mcflow.connect(gunzip, 'out_file', realign, 'in_files')
mcflow.base_dir = '/home/neuro/Result/Nipype_tutorial/working_dir'
mcflow.run('MultiProc', plugin_args={'n_procs': 4})

211103-17:16:59,838 nipype.workflow INFO:
	 Workflow realign_with_spm settings: ['check', 'execution', 'logging', 'monitoring']
211103-17:16:59,862 nipype.workflow INFO:
	 Running in parallel.
211103-17:16:59,864 nipype.workflow INFO:
	 [MultiProc] Running 0 tasks, and 1 jobs ready. Free memory (GB): 7.43/7.43, Free processors: 4/4.
211103-17:17:01,866 nipype.workflow INFO:
	 [MultiProc] Running 0 tasks, and 2 jobs ready. Free memory (GB): 7.43/7.43, Free processors: 4/4.
211103-17:17:01,956 nipype.workflow INFO:
	 [Node] Setting-up "_gunzip0" in "/home/neuro/Result/Nipype_tutorial/working_dir/realign_with_spm/gunzip/mapflow/_gunzip0".
211103-17:17:01,958 nipype.workflow INFO:
	 [Node] Setting-up "_gunzip1" in "/home/neuro/Result/Nipype_tutorial/working_dir/realign_with_spm/gunzip/mapflow/_gunzip1".
211103-17:17:01,992 nipype.workflow INFO:
	 [Node] Running "_gunzip0" ("nipype.algorithms.misc.Gunzip")
211103-17:17:01,994 nipype.workflow INFO:
	 [Node] Running "_gunzip1" ("nipype.algori

stty: 'standard input': Inappropriate ioctl for device


211103-17:18:25,939 nipype.workflow INFO:
	 [Job 1] Completed (realign_with_spm.motion_correction).
211103-17:18:25,946 nipype.workflow INFO:
	 [MultiProc] Running 0 tasks, and 0 jobs ready. Free memory (GB): 7.43/7.43, Free processors: 4/4.


<networkx.classes.digraph.DiGraph at 0x7fd89e4e86a0>

### Exercise 1

Create a workflow to calculate a sum of factorials of numbers from a range between $n_{min}$ and $n_{max}$, i.e.:

$$\sum _{k=n_{min}}^{n_{max}} k! = 0! + 1! +2! + 3! + \cdots$$ 

if $n_{min}=0$ and $n_{max}=3$
$$\sum _{k=0}^{3} k! = 0! + 1! +2! + 3!  =  1 + 1 + 2 + 6 = 10$$

Use ``Node`` for a function that creates a list of integers and a function that sums everything at the end. Use ``MapNode`` to calculate factorials.

In [16]:
#write your solution here
def factorial(n):
    multiply_result = 1
    if n == 0:
        return multiply_result
    else:
        while n>0:
            multiply_result *= n
            n -= 1
        return multiply_result

def sum_func(fac_list):
    return sum(fac_list)


In [23]:
list(ex_wf.run().nodes())[1].result.outputs

211104-13:40:56,363 nipype.workflow INFO:
	 Workflow exer_wf settings: ['check', 'execution', 'logging', 'monitoring']
211104-13:40:56,379 nipype.workflow INFO:
	 Running serially.
211104-13:40:56,380 nipype.workflow INFO:
	 [Node] Setting-up "exer_wf.exercise1" in "/home/neuro/Result/Nipype_tutorial/working_dir/exer_wf/exercise1".
211104-13:40:56,384 nipype.workflow INFO:
	 [Node] "exer_wf.exercise1" found cached.
211104-13:40:56,385 nipype.workflow INFO:
	 [Node] Setting-up "exer_wf.sum" in "/home/neuro/Result/Nipype_tutorial/working_dir/exer_wf/sum".
211104-13:40:56,388 nipype.workflow INFO:
	 [Node] Cached "exer_wf.sum" - collecting precomputed outputs
211104-13:40:56,389 nipype.workflow INFO:
	 [Node] "exer_wf.sum" found cached.



sum_result = 10

In [18]:
from nipype import Node, Workflow, Function, MapNode

exer_node = MapNode(Function(['n'], ['multiply_result'], factorial), iterfield=['n'], name='exercise1')
exer_node.inputs.n = [0,1,2,3]

sum_node = Node(Function(['fac_list'], ['sum_result'], sum_func), name='sum')

ex_wf = Workflow(base_dir='/home/neuro/Result/Nipype_tutorial/working_dir', name='exer_wf')

ex_wf.connect([(exer_node, sum_node, [('multiply_result', 'fac_list')])])

ex_wf.run()

211104-13:40:03,49 nipype.workflow INFO:
	 Workflow exer_wf settings: ['check', 'execution', 'logging', 'monitoring']
211104-13:40:03,77 nipype.workflow INFO:
	 Running serially.
211104-13:40:03,78 nipype.workflow INFO:
	 [Node] Setting-up "exer_wf.exercise1" in "/home/neuro/Result/Nipype_tutorial/working_dir/exer_wf/exercise1".
211104-13:40:03,111 nipype.workflow INFO:
	 [Node] Setting-up "_exercise10" in "/home/neuro/Result/Nipype_tutorial/working_dir/exer_wf/exercise1/mapflow/_exercise10".
211104-13:40:03,139 nipype.workflow INFO:
	 [Node] Running "_exercise10" ("nipype.interfaces.utility.wrappers.Function")
211104-13:40:03,163 nipype.workflow INFO:
	 [Node] Finished "_exercise10".
211104-13:40:03,166 nipype.workflow INFO:
	 [Node] Setting-up "_exercise11" in "/home/neuro/Result/Nipype_tutorial/working_dir/exer_wf/exercise1/mapflow/_exercise11".
211104-13:40:03,191 nipype.workflow INFO:
	 [Node] Running "_exercise11" ("nipype.interfaces.utility.wrappers.Function")
211104-13:40:03,21

<networkx.classes.digraph.DiGraph at 0x7f26b9288d90>

In [None]:
from nipype import Workflow, Node, MapNode, Function
import os

def range_fun(n_min, n_max):
    return list(range(n_min, n_max+1))

def factorial(n):
    # print("FACTORIAL, {}".format(n))
    import math
    return math.factorial(n)

def summing(terms):
    return sum(terms)

wf_ex1 = Workflow('ex1')
wf_ex1.base_dir = os.getcwd()

range_nd = Node(Function(input_names=['n_min', 'n_max'],
                         output_names=['range_list'],
                         function=range_fun), 
                name='range_list')

factorial_nd = MapNode(Function(input_names=['n'],
                                output_names=['fact_out'],
                                function=factorial), 
                       iterfield=['n'],
                       name='factorial')

summing_nd = Node(Function(input_names=['terms'],
                           output_names=['sum_out'],
                           function=summing), 
                  name='summing')


range_nd.inputs.n_min = 0
range_nd.inputs.n_max = 3

wf_ex1.add_nodes([range_nd])
wf_ex1.connect(range_nd, 'range_list', factorial_nd, 'n')
wf_ex1.connect(factorial_nd, 'fact_out', summing_nd, "terms")


eg = wf_ex1.run()

let's print all nodes:

In [None]:
eg.nodes()

the final result should be 10:

In [None]:
list(eg.nodes())[2].result.outputs

we can also check the results of two other nodes:

In [None]:
print(list(eg.nodes())[0].result.outputs)
print(list(eg.nodes())[1].result.outputs)