Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add frequency metric to determine some average per-second metrics #760

Merged
merged 20 commits into from Feb 3, 2020
Merged

add frequency metric to determine some average per-second metrics #760

merged 20 commits into from Feb 3, 2020

Conversation

erip
Copy link
Contributor

@erip erip commented Feb 1, 2020

Fixes # N/A

Description:

This code is to compute X per-second performance metrics (like words per second, images per second, etc). Likely this will be used in conjunction with ignite.metrics.RunningAverage for most utility.

Check list:

  • New tests are added (if a new feature is added)
  • New doc strings: description and/or example code are in RST format
  • Documentation is updated (if required)

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 1, 2020

@erip thanks for the PR! I see the idea, maybe we can iterate over the implementation...
Could you please provide a snippet of usage you think about?

And to make our CI happy I can add some tests :)

@erip
Copy link
Contributor Author

erip commented Feb 1, 2020

@vfdev-5 absolutely! I mostly wanted to put some pen to paper quickly - happy to add some tests and see where that leads the implementation.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 1, 2020

Thanks for the update. Just saw the docs on usage. How about doing as here for GPU Info ? In this it is configured without using RunningAverage and just fills the metric on iteration.

@erip
Copy link
Contributor Author

erip commented Feb 1, 2020

@vfdev-5 I have fixed some simple flake issues and added a docstring with an envisioned usage. I suspect that there's some improvements to be made. Since I'd like to compute average throughput, I think it might be good to include a class that inherits from RunningAverage and just wraps this notional FrequencyMetric (like in the docstring). Do you have strong opinions one way or the other?

@erip
Copy link
Contributor Author

erip commented Feb 1, 2020

Ah, you beat me to the comment. :-)

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

@vfdev-5 Ok, hopefully the distributed is-init'd checks are consistent with the rest of ignite.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

@erip maybe we can add a single CPU distrib test to ensure the correct behavior.

For example, like here:

def test_distrib_cpu(distributed_context_single_node_gloo):

If any questions, please do not hesitate to ask about how to do.

This can run on CPU with the following command:

- py.test --cov ignite --cov-append --cov-report term-missing --dist=each --tx $WORLD_SIZE*popen//python=python$TRAVIS_PYTHON_VERSION tests -m distributed -vvv

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Brilliant. That's the next thing I wanted to add. 😄

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Yay, it looks like it works. 😄

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

Okay, let's then wait until CI accomplishes its job and go on with merging.

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Awesome! For your awareness, I'm hoping to begin tackling facebookresearch/fairseq#1648 and this is the first step in that journey. You may see more of me as I run into features that ignite doesn't currently support that fairseq needs for parity.

Copy link
Collaborator

@vfdev-5 vfdev-5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @erip

@erip erip changed the title [WIP] add frequency metric to determine some average per-second metrics add frequency metric to determine some average per-second metrics Feb 2, 2020
@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

That would be great! So, yes, feel free to send other PRs and we can work out them too as well,

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

Just another point, I would like to discuss before merging and seeing the context of where it could be potentially used.

Maybe we can put this metric directly into core part as it does not require any additional packages ?
Maybe we can also rename it as just Frequency instead of FrequencyMetric ?

cc @justusschock thoughts ?

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

I'm happy to do that - I thought contrib was the landing ground for external contributions; I'll make the two changes separately and they can be cherrypicked as desired.

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

@vfdev-5 looks like I've run into some flakiness in the tests which is likely just a result of misunderstanding torch.distributed -- the logic of my test is basically that the X per second of some process with intentional delays should fall in some range between 90% of the "ideal" frequency with these delays and the ideal frequency. When in a distributed environment, these "ideals" (and indeed Xps) should scale up with the number of workers. I assumed this scaling factor would be in torch.distributed.get_world_size(), but it seems this is not the case. My new hypothesis is that it scales in get_world_size() * num_nodes where the world size defines the number of workers on a given node.

Is this a correct understanding? If so, is num_nodes equal to the local rank?

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

@erip well, I think we did not implemented that correctly. This is my fault, when I suggested to all_reduce elapsed time. What happens now is the following, for example we have 4 processes (world_size=4), and for a single iteration we get _n per process [10, 11, 10, 10] and elapsed per process [1.0, 1.0, 0.99, 1.0]. What we do in compute is "all reduce" with sum operation for _n and elapsed (i.e. compute total of processed _n and elapsed), so _n becomes sum([10, 11, 10, 10]) and elapsed=sum([1.0, 1.0, 0.99, 1.0]). So we have actually a mean of processed objects (tokens) per process.
Possibly, we would like to compute (all_reduce(_n) / all_reduce(elapsed)) * world_size ?

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

@erip to help you with your random search for a good testing, you can execute a script with Frequency class and the following

class Frequency:
   # ...

def _test_frequency_with_engine(device, workers):
    artificial_time = 2  # seconds
    batch_size = 4
    n_tokens = 10000
    total_tokens = n_tokens * batch_size
    time_per_epoch = total_tokens / artificial_time
    average_upper_bound = time_per_epoch * workers
    average_lower_bound = average_upper_bound * 0.9

    def update_fn(engine, batch):
        time.sleep(artificial_time)
        return {"ntokens": len(batch)}

    engine = Engine(update_fn)
    wps_metric = Frequency(output_transform=lambda x: x["ntokens"], device=device)
    wps_metric.attach(engine, 'wps')
    data = [list(range(n_tokens))] * batch_size
    wps = engine.run(data, max_epochs=1).metrics['wps']
    print("{} | {} | wps: {} | {}".format(dist.get_rank(), average_lower_bound, wps, average_upper_bound))


def test_frequency_with_engine_nondistributed():
    device = "cpu"
    _test_frequency_with_engine(device, workers=1)

if __name__ == "__main__":

    dist.init_process_group("gloo", init_method="env://")
    
    device = "cpu"
    _test_frequency_with_engine(device, workers=dist.get_world_size())

like that

python3 -u -m torch.distributed.launch --nproc_per_node=1 frequency_metric_distrib.py

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Thanks! I tried the local testing prescribed according to the Travis script, but had run into a weird issue. 😓

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

Actually, another thing I forgot to mention about distributed. Idea is to perform DDP. So we split the data of tokens by process. So, if world size is 4, each process sees 1/4 of total data. And I think this is not coded in the tests...

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

There's the missing factor. 😅

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

Another thing on notations I do not get right is batch_size=4, but data=[list(range(n_tokens))] * batch_size so we have number of iterations is 4 and a batch of n_tokens.

In distributed code, generally, they scale the batch size by number of processes (world_size) to have the same batch size regarding the configuration.

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Interestingly I find that when the world size is 2, there seems to be two passes over the data? I added print(f"Batch size: {len(batch)}") to the update_fn and I see:

Batch size: 10000
Batch size: 10000
Batch size: 10000
Batch size: 10000
Batch size: 10000
Batch size: 10000
Batch size: 10000
Batch size: 10000

My algebra is failing me for some reason today...

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Ok, I think I really found it this time... whew

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

I believe my own bastardization of "batch_size" was causing a lot of confusion.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

Interestingly I find that when the world size is 2, there seems to be two passes over the data?

Yes, there are two processes who run the training. That's why we need DDP to make it like that:

world_size = 2

Total Data = 4 batches
[----|----|----|----]

Processes seeing data:
[----|----|----|----]
[1111|2222|1111|2222]

So we have over all batch_size of 8 elements and 4 per process. So the epoch lasts now twice less: 2 iterations vs 4 iterations (no distrib).

So when you print, you see the std out of all processes. Normally, we do an if on that

if dist.get_rank() == 0:
    print(...)

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Ok, I think this is better now. Thanks for your patience and help!

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

@erip I'm playing with tests and the code and probably it is not the end :)
Actual test has only one iteration, so there is a single update done and there is a bug with multiple updates. Here is my code:

def _test_frequency_with_engine(device, workers):
    artificial_time = 0.1  # seconds    
    total_tokens = 2000
    batch_size = 128 // workers    

    def update_fn(engine, batch):
        time.sleep(artificial_time)
        return {"ntokens": len(batch)}

    engine = Engine(update_fn)
    wps_metric = Frequency(output_transform=lambda x: x["ntokens"], device=device)
    wps_metric.attach(engine, 'wps')
    
    @engine.on(Events.ITERATION_COMPLETED)
    def assert_wps(e):
        wps = e.state.metrics['wps']
        if dist.get_rank() == 0:
            print("{}: wps={}".format(e.state.iteration, wps))
    
    data = [[i] * batch_size for i in range(0, total_tokens, batch_size)]    
    engine.run(data, max_epochs=1)

if __name__ == "__main__":

    dist.init_process_group("gloo", init_method="env://")
    device = "cpu"
    _test_frequency_with_engine(device, workers=dist.get_world_size())

if executed as

python3 -u -m torch.distributed.launch --nproc_per_node=1 frequency_metric_distrib.py

the output is

1: wps=1258
2: wps=1263
3: wps=1247
4: wps=1243
....
13: wps=1241
14: wps=1240
15: wps=1239

It is OK, as it is about 128 samples per 0.1 seconds.

If executed as

python3 -u -m torch.distributed.launch --nproc_per_node=2 frequency_metric_distrib.py

we have

1: wps=1185
2: wps=1805
3: wps=2820
4: wps=4544
5: wps=7540
...
30: wps=43612306556
31: wps=84412682684
32: wps=163572946560

Something still to fix with distrib config.

PS: I'm curious about what they do in fairseq for this ?

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

Fairseq uses what they call a TimeMeter which is a less robust version of ignite's Metric. The avg property is logged after each batch. The meters are reset at each batch, I think. I don't think this is a great way to approach this because it makes the wps very sensitive to a lot of things including IO.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

@erip I found the problem, let me commit directly the fix and updated test

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

@erip I made the changes vs your code:

  • measure the time in update instead of compute.
  • updated tests to measure wps on each iteration and check ranges.

it makes the wps very sensitive to a lot of things including IO.

In this code it will be also sensitive to IO, as timer measures the time between iterations: read data -> batch prep -> update model.
If we would like to exclude read data -> batch prep we need to stop timer on Events.GET_BATCH_STARTED and resume on Events.GET_BATCH_COMPLETED.

self._fire_event(Events.GET_BATCH_STARTED)

@erip
Copy link
Contributor Author

erip commented Feb 2, 2020

I suspect there's no way around it. :-)

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 2, 2020

I suspect there's no way around it. :-)

Well, we need to setup timer as here:

def attach(self, engine: Engine, start: str = Events.STARTED,

on the correct events...

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 3, 2020

@erip if you ok with this implementation we can merge it and if needed update the code to exclude data processing.

@erip
Copy link
Contributor Author

erip commented Feb 3, 2020

I'm OK with the implementation as-is for now. I think there may be complications surrounding wiring the Frequency._timer for the right events because Frequency._timer will be None in the call to Frequency.attach.

@vfdev-5
Copy link
Collaborator

vfdev-5 commented Feb 3, 2020

Thanks for pointing out that. Actually, reset() is called at Frequency.__init__ with super().__init__. So, timer starts counting even before trainer.run. Normally, in metrics we attached reset to Events.EPOCH_STARTED. Such that every epoch (=run for validation) will compute metrics from scratch.
Here, we miss this.

So, in this way, Frequency._timer is already created and can be finer set up to avoid data IO if this is really what we would like to have.

@erip
Copy link
Contributor Author

erip commented Feb 3, 2020

I will defer to you about whether to merge now or to wait for a more complete solution. For fairseq this is good enough. 👍

@vfdev-5 vfdev-5 merged commit 0375a6e into pytorch:master Feb 3, 2020
@erip erip deleted the feature/add-frequency-metric branch February 3, 2020 13:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants