Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize computing md5 for large files #2073

Closed
Witiko opened this issue May 30, 2019 · 23 comments · Fixed by #2197
Closed

Parallelize computing md5 for large files #2073

Witiko opened this issue May 30, 2019 · 23 comments · Fixed by #2197
Labels
enhancement Enhances DVC

Comments

@Witiko
Copy link
Contributor

Witiko commented May 30, 2019

After executing dvc run, I have been staring at the Computing md5 for a large file message for the past five hours. However, only a single CPU and about 20% of the SSD disk's maximum read speed are utilized. Therefore, a 5× speed-up can be achieved just by computing MD5 in parallel.

We may have to get rid of the per-file progress bars, but I think that a single overall progress bar would be more informative anyways, especially with hundreds of files being added to the cache. What do you think?

@efiop efiop added the enhancement Enhances DVC label May 30, 2019
@efiop
Copy link
Member

efiop commented May 30, 2019

@Witiko Great suggestion! For large files the parallelization is quite straightforward: just need to modify file_md5 from dvc/utils to compute a checksum for a file in parallel. Let us know if you would like to tackle this one as well 🙂

@Witiko
Copy link
Contributor Author

Witiko commented May 30, 2019

Thank you, @efiop. I will take a look. Is there any other parallelized code in dvc at the moment, so that I can keep things consistent?

Rather than parallelize the internals of utils.file_md5, I was planning to have every thread compute utils.file_md5 for one file. In theory, we could have every thread compute the MD5 for a part of a file, which would give better speed-up for a small number of files. However, there is no obvious way to combine the MD5s of the parts into the MD5 of the whole file. (Is there?)

@efiop
Copy link
Member

efiop commented May 30, 2019

Thank you, @efiop. I will take a look. Is there any other parallelized code in dvc at the moment, so that I can keep things consistent?

Thanks 🙂 We actually hardly use any parallelization these days 🙁 The only example I can think of is push/pull/fetch/status from dvc/remote/RemoteLOCAL, which are using ThreadPool's. But looks like in this particular case using processes should be more suitable because of GIL.

Rather than parallelize the internals of utils.file_md5, I was planning to have every thread compute utils.file_md5 for one file. In theory, we could have every thread compute the MD5 for a part of a file, which would give better speed-up for a small number of files.

So yes, the approach would depend on the particular issue that you are trying to solve. If we are talking about a small number of large standalone files, then parallelizing file_md5 to tackle parts of the same file should bring the most benefit. If we are talking about large number of small files, then, indeed, we'll need to make file_md5 handle each file in parallel.

However, there is no obvious way to combine the MD5s of the parts into the MD5 of the whole file. (Is there?)

There is. We are actually doing it already in https://github.com/iterative/dvc/blob/master/dvc/utils/__init__.py#L67 by reading chunks of the file and then gradually updating the md5 until we get a final answer for the whole file.

@Witiko
Copy link
Contributor Author

Witiko commented May 30, 2019

There is. We are actually doing it already in https://github.com/iterative/dvc/blob/master/dvc/utils/__init__.py#L67 by reading chunks of the file and then gradually updating the md5 until we get a final answer for the whole file.

That's different, however, because you process the chunks linearly, i.e. there is an internal state that depends on all the previously read chunks. A thread that starts reading in the middle of a file has not seen the previous chunks, and therefore it starts with an incorrect state. In the end, each thread will produce its own MD5, and there is no obvious way to combine these into the MD5 of the entire file.

Another thought: since dvc is not expected to produce a large CPU load, the parallelization should perhaps be an opt-in feature. We can do this by adding an option similar to Git's submodule.fetchJobs that will control the number of threads and will default to 1. A value ≤ 0 would use some sensible default, such as using all available CPUs using Pool(None).

@efiop
Copy link
Member

efiop commented May 30, 2019

But you can get total size of a file, split it into chunks(not literally, but by passing a proper offset to each of those and they would fseek() to) and feed them to the workers that would return md5 of their chunk, which you can then combine into a final md5 of the whole file. Unless I'm missing something.

As to parallelization, we can use dvc config core.jobs option(we have same one in our remotes already). By default it should use as many jobs as available, since it seems like a desirable behaviour -- to compute checksums as quickly as possible.

@Witiko
Copy link
Contributor Author

Witiko commented May 30, 2019

But you can get total size of a file, split it into chunks (not literally, but by passing a proper offset to each of those and they would fseek() to) and feed them to the workers that would return md5 of their chunk, which you can then combine into a final md5 of the whole file. Unless I'm missing something.

It's best to discuss this with a code example at hand. Currently, dvc does the following for the entire input file.

def file_md5(fname):
        hash_md5 = hashlib.md5()
        with open(fname, "rb") as fobj:
            while True:
                chunk = fobj.read(LOCAL_CHUNK_SIZE)
                if not chunk:
                    break
                hash_md5.update(chunk)
        return (hash_md5.hexdigest(), hash_md5.digest())

This produces a single md5 hash:

|                              hash                              |

You can fseek to an offset and compute the md5 hash for just a part of the input file:

def file_part_md5(fname, offset, size):
        hash_md5 = hashlib.md5()
        with open(fname, "rb") as fobj:
            fobj.seek(offset)
            while read < size:
                chunk = fobj.read(LOCAL_CHUNK_SIZE)
                if not chunk:
                    break
                read += len(chunk)
                hash_md5.update(chunk)
        return (hash_md5.hexdigest(), hash_md5.digest())

This produces several MD5 hashes:

|    hash    |    hash    |    hash    |    hash    |    hash    |

How would you combine these md5 hashes to obtain one md5 hash for the entire file?

As to parallelization, we can use dvc config core.jobs option (we have same one in our remotes already).

There is also the --jobs option for push, gc, pull, push, and status dvc operations, which we might want to add to dvc run.

By default it should use as many jobs as available, since it seems like a desirable behaviour -- to compute checksums as quickly as possible.

I am not sure if using all CPUs is the best default. It is true that we want to compute the checksums as fast as possible. However, users will not expect that dvc maxes out their CPUs. In my experience, using more than one CPU is almost always an opt-in for command-line tools, not a default.

Take this real-life example: I am using dvc on a shared server with 64 CPU cores. Dvc is a versioning tool and I don't expect it to do anything computationally-intensive. Therefore, I do not use nice to run dvc with low priority. When dvc uses all CPUs to compute md5 hashes, I get kicked from the shared server for slowing down other user's interactive sessions.

@efiop
Copy link
Member

efiop commented May 31, 2019

How would you combine these md5 hashes to obtain one md5 hash for the entire file?

You are absolutely right @Witiko , there is no way to do that, since md5 calculation is a linear process. Sorry, I've totally forgotten about it and didn't even remember that s3 has the same problem with multipart uploads and they are using an effective md5 as an etag for the resulting object. Indeed, we need to parallelize by files.

There is also the --jobs option for push, gc, pull, push, and status dvc operations, which we might want to add to dvc run.

Actually, thinking about it, we should probably set it in checksum section(it is introduced by one of our WIP patches that support additional hashes) like so dvc config checksum.jobs 8. And if we do decide to utilize cmdline flag as well, then we'll need to introduce a separate flag that looks something like --checksum-jobs to not overlap with --jobs since it has a different meaning for push/pull/etc.

I am not sure if using all CPUs is the best default. It is true that we want to compute the checksums as fast as possible. However, users will not expect that dvc maxes out their CPUs. In my experience, using more than one CPU is almost always an opt-in for command-line tools, not a default.

But if cmd tool offers an option for that, then you are most likely to always use it, no? 🙂 E.g. things like make that everyone has the muscle memory to run with -jN 🙂 It feels like we could set some reasonable default, same as we do with our remotes to provide the best experience right away. Maybe something like 2 by default would be acceptable? I agree that it is not the same as with remotes since push/pull/etc are IO bound and md5 computation is CPU bound, but from your investigation, it looks like we are not utilizing cpu and i/o enough, so I'm wondering if we should set the default value a bit higher.

Take this real-life example: I am using dvc on a shared server with 64 CPU cores. Dvc is a versioning tool and I don't expect it to do anything computationally-intensive. Therefore, I do not use nice to run dvc with low priority. When dvc uses all CPUs to compute md5 hashes, I get kicked from the shared server for slowing down other user's interactive sessions.

That is a really great point! I'm just trying to think if your scenario is the one that should set checksum.jobs explicitly to some lower value or if it is close to the most common scenario in which people use dvc and we should indeed set that by default to 1. What do you think?

@efiop
Copy link
Member

efiop commented May 31, 2019

btw as @shcheklein rightfully noted, we could possibly use that s3 approach too to be able to compute md5 of a large file in parallel by splitting it into chunks.

@efiop
Copy link
Member

efiop commented May 31, 2019

btw @Witiko , which problem is more relevant for you? Small number of large files or big number of small files?

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

Actually, thinking about it, we should probably set it in checksum section(it is introduced by one of our WIP patches that support additional hashes) like so dvc config checksum.jobs 8. And if we do decide to utilize cmdline flag as well, then we'll need to introduce a separate flag that looks something like --checksum-jobs to not overlap with --jobs since it has a different meaning for push/pull/etc.

That is a good point, although this makes the naming a little unintuitive (why does --jobs control only the number of pull jobs?). Perhaps we should have options --checksum-jobs and --pull-jobs, and --jobs would control both of these options? This way, the CLI API is backwards-compatible, but the naming remains intuitive.

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

But if cmd tool offers an option for that, then you are most likely to always use it, no? 🙂 E.g. things like make that everyone has the muscle memory to run with -jN 🙂

That is true, but having to specify -j also makes the user aware that multiple CPU cores will be used. When I know that multiple CPU cores will be used, I will use nice or wait for my other tasks to finish. I would not expect make without -j to max out my CPUs (and I would be even more surprised if a data versioning tool did that). To me, this is a question of courtesy and the least surprise to the user.

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

It feels like we could set some reasonable default, same as we do with our remotes to provide the best experience right away. Maybe something like 2 by default would be acceptable?

With the number of cores in modern consumer CPUs, I think we can go as high as 4, which should almost max out the SSD I/O. Note that some of the data will be cached in the RAM, and the user may also be using a fast NVMe disk, so there is still some utility in explicitly asking for more than 4 threads.

I'm just trying to think if your scenario is the one that should set checksum.jobs explicitly to some lower value or if it is close to the most common scenario in which people use dvc and we should indeed set that by default to 1. What do you think?

I think that having some sane value of checksum.jobs, such as 4, would be a good compromise between speed and surprise. Alternatively, if the default would be 1, we could print a message that says something to the effect of “The hash computation is taking a long time, consider using more than one thread using the --checksum-jobs option”.

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

btw as @shcheklein rightfully noted, we could possibly use that s3 approach too to be able to compute md5 of a large file in parallel by splitting it into chunks.

That is an interesting suggestion. Where does @shcheklein discuss this?

btw @Witiko , which problem is more relevant for you? Small number of large files or big number of small files?

The most relevant problem for me is computing hashes for a large number of large files, because that's what takes the longest. Parallelizing over files speeds up the hash computation for a large number of large files, but it offers only a small speed-up for a large number of small files and a small number of large files.

This actually points to a bigger problem with the cache, local remotes, and HTTP remotes: you may not be able to pull a local or HTTP remote if it contains a file that is too large for your filesystem (such as the 4G limit of FAT32). We could solve that by chunking the files that we store in the cache, but that also means that we could no longer hardlink and symlink files to the cache. Moreover, it would also make the format of the cache, local remotes, and HTTP remotes backwards-incompatible, so this would be quite a significant change to the fundamental design of dvc!

If we started chunking the files that we store in the cache, and we also started packing small files together, then parallelizing over files would speed up the hash computation for any number of files (both large and small). However, as I said above, this would be a major change.

@Witiko Witiko closed this as completed May 31, 2019
@Witiko Witiko reopened this May 31, 2019
@efiop
Copy link
Member

efiop commented May 31, 2019

That is a good point, although this makes the naming a little unintuitive (why does --jobs control only the number of pull jobs?). Perhaps we should have options --checksum-jobs and --pull-jobs, and --jobs would control both of these options? This way, the CLI API is backwards-compatible, but the naming remains intuitive.

but --pull-jobs doesn't apply to push, so we'll have to either have --push-jobs/--fetch-jobs too or just something like --cloud-jobs 🙂 But Anyway, for now we could simply leave --jobs as is and introduce --checksum-jobs on top.

That is true, but having to specify -j also makes the user aware that multiple CPU cores will be used. When I know that multiple CPU cores will be used, I will use nice or wait for my other tasks to finish. I would not expect make without -j to max out my CPUs (and I would be even more surprised if a data versioning tool did that). To me, this is a question of courtesy and the least surprise to the user.

Touche 🙂

With the number of cores in modern consumer CPUs, I think we can go as high as 4, which should almost max out the SSD I/O. Note that some of the data will be cached in the RAM, and the user may also be using a fast NVMe disk, so there is still some utility in explicitly asking for more than 4 threads.

4 by default seems like a little too much in general case. Maybe we could use NCPU and decide based on that so that (except 1CPU case) we don't take more than 50% of CPUs by default? This does seem like implicit magic that I'm not a big fan of, but also seems suitable to try to provide the best experience by default.

Alternatively, if the default would be 1, we could print a message that says something to the effect of “The hash computation is taking a long time, consider using more than one thread using the --checksum-jobs option”.

Or even dvc config checksum.jobs N, indeed. But for that message we will need to decide what "long time" means, which is not trivial in this case. It feels like using that dynamic default would provide a better experience for the majority of the users. Also it goes in-sync with what we do with --jobs for clouds, where we use dynamic defaults too.

That is an interesting suggestion. Where does @shcheklein discuss this?

In private messages 🙂 Maybe he'll join us here soon. The suggested approach doesn't mean that we need to actually spit that cache file into separate physical chunks, but rather that we would virtually do that when computing the checksum. As long as we keep the chunk size constant, we should be able to provide the same level of by-checksum deduplication that we have right now.

The most relevant problem for me is computing hashes for a large number of large files, because that's what takes the longest. Parallelizing over files speeds up the hash computation for a large number of large files, but it offers only a small speed-up for a large number of small files and a small number of large files.

Lets start by solving your problem, that is the most straightforward thing for now. We could look into speeding up checksum computation for 1 big file later.

This actually points to a bigger problem with the cache, local remotes, and HTTP remotes: you may not be able to pull a local or HTTP remote if it contains a file that is too large for your filesystem (such as the 4G limit of FAT32). We could solve that by chunking the files that we store in the cache, but that also means that we could no longer hardlink and symlink files to the cache. Moreover, it would also make the format of the cache, local remotes, and HTTP remotes backwards-incompatible, so this would be quite a significant change to the fundamental design of dvc!

As @shcheklein noted, FAT example might be stratching it a little, but maybe we are not aware of someone actually using it these days with dvc. Unless you are dealing with truly giant files, you should be fine. And if you are dealing with truly giant files you are problably splitting them yourself or using something like hdfs, which dvc is also able to work with through external dependencies/outputs.

And we won't be able to use reflink too, which is even worse 😅 Using links is quite an important feature for us.

If we started chunking the files that we store in the cache, and we also started packing small files together, then parallelizing over files would speed up the hash computation for any number of files (both large and small). However, as I said above, this would be a major change.

We have been thinking about splitting files on a smaller scale as per #829 too. One of the problems with that is that you would be doubling your storage, since you'll need to reconstruct the files back in the cache and then link them to the workspace. The other way would be to just not use links and reconstruct files directly into workspace, but then again you won't be able to use reflink/hardlink/symlink and will be tied to copy, so that would again double the storage requirements. Splitting data into chunks is important for applications where you are dealing with a somewhat big flow of data constantly, and in our case things are much more static: you are downloading(or creating) your cache files once(e.g. giant datasets that you use), we compute checksums for them once and store them in our state db for quick access without having to recompute them again. So maybe splitting is not the best way, at least for now, when there is a lot of other sorts of optimizations we could do to speed everything up.

Btw, @Witiko , how about we schedule a video call someday? We would all really love to meet you and talk about your use cases. 🙂 How about 5 pm (Prague timezone) next Wednesday?

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

but --pull-jobs doesn't apply to push, so we'll have to either have --push-jobs/--fetch-jobs too or just something like --cloud-jobs 🙂 But Anyway, for now we could simply leave --jobs as is and introduce --checksum-jobs on top.

That is true, and I like your --cloud-jobs suggestion. Perhaps in the future, you will be adding more remotes that do not necessarily correspond to cloud storages. Therefore, something more general such as --transfer-jobs may be a safer naming choice.

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

We have been thinking about splitting files on a smaller scale as per #829 too. One of the problems with that is that you would be doubling your storage, since you'll need to reconstruct the files back in the cache and then link them to the workspace.

This is only true for hardlinks and symlinks. You can reflink just a chunk of a file (see ioctl_ficlonerange), which allows you to move chunks of files between the cache and the workspace without copying. However, I am not sure that Darwin supports this (likely not).

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

As @shcheklein noted, FAT example might be stratching it a little.

I agree that this is perhaps not a concern for us.

Lets start by solving your problem, that is the most straightforward thing for now. We could look into speeding up checksum computation for 1 big file later.

I agree. I just thought that the idea of chunking the cache is worth discussing, but it is actually a separate issue that is independent on this one. Thanks to our detailed discussion, I think I have a good idea about how we'd like to approach fixing the current issue. 🙂

@Witiko
Copy link
Contributor Author

Witiko commented May 31, 2019

Btw, @Witiko , how about we schedule a video call someday? We would all really love to meet you and talk about your use cases. 🙂 How about 5 pm (Prague timezone) next Wednesday?

@efiop: Contributing to open-source projects is a way for me to unwind, and I consider video calls a major stress factor. 🙂 My use case can be summarized in one sentence: I am producing large language models and I need to store them and publish them together with the training and evaluation code. If you'd like to discuss further, I can join you and others on Discord.

@efiop
Copy link
Member

efiop commented Jun 1, 2019

That is true, and I like your --cloud-jobs suggestion. Perhaps in the future, you will be adding more remotes that do not necessarily correspond to cloud storages. Therefore, something more general such as --transfer-jobs may be a safer naming choice.

Agreed :)

This is only true for hardlinks and symlinks. You can reflink just a chunk of a file (see ioctl_ficlonerange), which allows you to move chunks of files between the cache and the workspace without copying. However, I am not sure that Darwin supports this (likely not).

Oh wow, I didn't know about that! Yeah, quick google search doesn't seem to find any evidance that osx supports it, but maybe it just needs a more careful research. So looks like if we would to go that way, it would probably be linux-only feature. Also remotes will become incomatible without doubling your local or remote storage to keep joined files alongside chunks 🙁

I agree. I just thought that the idea of chunking the cache is worth discussing, but it is actually a separate issue that is independent on this one. Thanks to our detailed discussion, I think I have a good idea about how we'd like to approach fixing the current issue. slightly_smiling_face

Didn't mean to discourage 🙂 The discussion turned out amazing!

Contributing to open-source projects is a way for me to unwind, and I consider video calls a major stress factor. slightly_smiling_face My use case can be summarized in one sentence: I am producing large language models and I need to store them and publish them together with the training and evaluation code. If you'd like to discuss further, I can join you and others on Discord.

Sure, no worries, chat is good too 🙂 Please feel free to join our comunity on Discord, we would really appreciate having you there 🙂

@Witiko
Copy link
Contributor Author

Witiko commented Jun 12, 2019

I am sorry about the delay in solving this issue, I was quite busy for the past two weeks.

@efiop
Copy link
Member

efiop commented Jun 12, 2019

@Witiko No worries 🙂

@Witiko
Copy link
Contributor Author

Witiko commented Jun 12, 2019

@efiop: It will take a couple of days to complete, there are a lot of callers (and callers of callers) of file_md5, which will all have to be patched together with their tests. The development takes place at witiko/dvc:parallelize-md5.

@Witiko
Copy link
Contributor Author

Witiko commented Jun 16, 2019

This was turning out to be a pretty extensive rewrite. Anything that called file_md5 (even indirectly) would need to be rewritten, so that it would first accumulate a list of files, and then pass these to file_md5. Since this would touch basically everything in dvc.remote, I went back to the drawing board, and made file_md5 use concurrency behind the scenes using concurrent.futures. This makes the changes contained to file_md5 and a few of its direct callers who must handle the futures. I'm currently testing the changes, and I will open a PR when I am happy with them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhances DVC
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants