# metaflow_magicdir

> Save Entire Directories Into Metaflow's Metadata Store

## Install

`pip install metaflow_magicdir`

## How to use

You can use `@magicdir` to pass local directories between metaflow steps.  This will also work remotely.

In [None]:
%%writefile examples/example_flow.py

from metaflow import FlowSpec, step
from metaflow_magicdir import magicdir


class MagicDirFlow(FlowSpec):

    @magicdir(dir='mydir')
    @step
    def start(self):
        with open('mydir/output1', 'w') as f:
            f.write('hello world')
        with open('mydir/output2', 'w') as f:
            f.write('hello world again')
        self.next(self.end)

    @magicdir(dir='mydir')
    @step
    def end(self):
        print('first', open('mydir/output1').read())
        print('second', open('mydir/output1').read())

if __name__ == "__main__":
    MagicDirFlow()

Overwriting examples/example_flow.py


If you run the above flow, you will see the following output:

In [None]:
!python examples/example_flow.py run

[35m[1mMetaflow 2.5.4[0m[35m[22m executing [0m[31m[1mMagicDirFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hamel[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-04-18 13:40:59.728 [0m[1mWorkflow starting (run-id 1650314459725082):[0m
[35m2022-04-18 13:40:59.734 [0m[32m[1650314459725082/start/1 (pid 12408)] [0m[1mTask is starting.[0m
[35m2022-04-18 13:41:00.472 [0m[32m[1650314459725082/start/1 (pid 12408)] [0m[1mTask finished successfully.[0m
[35m2022-04-18 13:41:00.479 [0m[32m[1650314459725082/end/2 (pid 12412)] [0m[1mTask is starting.[0m
[35m2022-04-18 13:41:01.161 [0m[32m[1650314459725082/end/2 (pid 12412)] [0m[22mfirst hello world[0m
[35m2022-04-18 13:41:01.245 [0m[32m[1650314459725082/end/2 (pid 12412)] [0m[22msecond hello world[0

In [None]:
#hide
from pathlib import Path
assert Path('mydir/output1').exists()
assert Path('mydir/output2').exists()

You can retrieve the results from the above Flow with the client api and `extract_magicdir`:

Let's first remove the directory if it exists:

In [None]:
!rm -rf mydir/ #remove the directory if it exists

In [None]:
from metaflow import Flow
from metaflow_magicdir import extract_magicdir
run_data = Flow('MagicDirFlow').latest_successful_run.data
extract_magicdir(run_data)

We can now inspect the contents of this directory to see it's contents!

In [None]:
!ls mydir/

output1 output2


### `magicdir` with `foreach`

Nothing special is required to use `magicdir` with foreach.  Consider the following modification to the above flow:

In [None]:
%%writefile examples/mapflow.py

from metaflow import FlowSpec, step
from metaflow_magicdir import magicdir


class MagicDirMapFlow(FlowSpec):
    """Show how magic directories work with foreach"""

    @step
    def start(self):
        self.step_num = range(5)
        self.next(self.write, foreach='step_num')

    @magicdir(dir='my_map_dir')
    @step
    def write(self):
        self.step_idx = self.input # metaflow gives self.input a value from `step_num` from the prior step
        with open(f'my_map_dir/{self.step_idx}.txt', 'w') as f:
            f.write(f'this is step {self.step_idx}')
        self.next(self.read)

    @magicdir(dir='my_map_dir')
    @step
    def read(self):
        print('file contents:', open(f'my_map_dir/{self.step_idx}.txt').read())
        self.next(self.join)
    
    @step
    def join(self, inputs):
        print(f"step numbers were: {[i.step_idx for i in inputs]}")
        self.next(self.end)

    @step
    def end(self): pass

if __name__ == "__main__":
    MagicDirMapFlow()

if __name__ == "__main__":
    MagicDirMapFlow()

Overwriting examples/mapflow.py


In [None]:
!python examples/mapflow.py run

[35m[1mMetaflow 2.5.4[0m[35m[22m executing [0m[31m[1mMagicDirMapFlow[0m[35m[22m[0m[35m[22m for [0m[31m[1muser:hamel[0m[35m[22m[K[0m[35m[22m[0m
[35m[22mValidating your flow...[K[0m[35m[22m[0m
[32m[1m    The graph looks good![K[0m[32m[1m[0m
[35m[22mRunning pylint...[K[0m[35m[22m[0m
[32m[1m    Pylint is happy![K[0m[32m[1m[0m
[35m2022-04-18 13:41:56.687 [0m[1mWorkflow starting (run-id 1650314516684584):[0m
[35m2022-04-18 13:41:56.695 [0m[32m[1650314516684584/start/1 (pid 12420)] [0m[1mTask is starting.[0m
[35m2022-04-18 13:41:57.444 [0m[32m[1650314516684584/start/1 (pid 12420)] [0m[1mForeach yields 5 child steps.[0m
[35m2022-04-18 13:41:57.445 [0m[32m[1650314516684584/start/1 (pid 12420)] [0m[1mTask finished successfully.[0m
[35m2022-04-18 13:41:57.452 [0m[32m[1650314516684584/write/2 (pid 12423)] [0m[1mTask is starting.[0m
[35m2022-04-18 13:41:57.459 [0m[32m[1650314516684584/write/3 (pid 12424)] [0m[1mTa

In [None]:
#hide
_dir = Path('my_map_dir')
assert len(list(_dir.glob('*'))) == 5

In [None]:
#hide
!rm -rf mydir my_map_dir