Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] How to hashlib a list of csv files with aiofile? #29

Open
scheung38 opened this issue Jan 17, 2020 · 13 comments
Open

[Question] How to hashlib a list of csv files with aiofile? #29

scheung38 opened this issue Jan 17, 2020 · 13 comments

Comments

@scheung38
Copy link

thanks

@Natim
Copy link

Natim commented Jan 17, 2020

with asyncio.gather?

@scheung38
Copy link
Author

scheung38 commented Jan 17, 2020

thanks @Natim asyncio.gather would independently process, say 100 independent csv files in parallel or non-blocking matter? for example 100,000 csv files to hashlib took 30 mins, so interesting to see this approach would take?

@Natim
Copy link

Natim commented Jan 17, 2020

You might want to watch my talk about IO Bound and CPU Bound mixes: https://www.youtube.com/watch?v=eJBbM3RpEUI

@Natim
Copy link

Natim commented Jan 17, 2020

To elaborate a little bit.

Single processing unit

If you don't mind having a single process computational CSV hasher.

You can use aiofile to read your files line by lines and then it will be made in parallel.

import asyncio
import hashlib
from aiofile import AIOFile, LineReader


async def hashlib_file(filename):
    # Open file
    async with AIOFile(filename, 'rb') as afd:
        # Create hasher
        hasher = hashlib.sha256()
        async for line in LineReader(afd):
            # For each line update hasher
            hasher.update(line)
            
    # return hexdigest
    return (hasher.hexdigest(), filename)


async def main():
    FILES = (
        "worker.py",
        "README.md",
    )
    actions = [hashlib_file(f) for f in FILES]
    results = await asyncio.gather(*actions)
    for filehash, filename in results:
        print(f"{filehash}\t{filename}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
$ sha256sum *
f3c8145d5c50b1b1536fe15561ad1fb6129f5c6e06f97054bdfb8f374ed4682f  worker.py
b4b392521258362b79cf1ed7b42ade0308a41a1275a6da21ede4ed03089bfae8  README.md

$ python worker.py 
f3c8145d5c50b1b1536fe15561ad1fb6129f5c6e06f97054bdfb8f374ed4682f	worker.py
b4b392521258362b79cf1ed7b42ade0308a41a1275a6da21ede4ed03089bfae8	README.md

Multi processing unit

If you want something really fast, you should use asyncio.create_subprocess_exec and a unix command such as sha256sum

import asyncio


async def hashlib_file(filename):
    proc = await asyncio.create_subprocess_exec(
        "sha256sum", filename,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()

    value, _ = stdout.decode().split()

    return value, filename


async def main():
    FILES = (
        "worker.py",
        "README.md",
    )
    actions = [hashlib_file(f) for f in FILES]
    results = await asyncio.gather(*actions)
    for filehash, filename in results:
        print(f"{filehash}\t{filename}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
$ python worker_subprocess.py 
f3c8145d5c50b1b1536fe15561ad1fb6129f5c6e06f97054bdfb8f374ed4682f	worker.py
b4b392521258362b79cf1ed7b42ade0308a41a1275a6da21ede4ed03089bfae8	README.md

@Natim
Copy link

Natim commented Jan 17, 2020

@scheung38 I added the code in my previous message.

@scheung38
Copy link
Author

performance of single vs multi processing option?

@Natim
Copy link

Natim commented Jan 17, 2020

I am going to let you try on your huge CSV files and tell us, it might be interesting.

For small files you won't see the difference. For huge ones, I am interested.

@scheung38
Copy link
Author

scheung38 commented Jan 17, 2020

not dealing with a huge csv file but maybe 100,000 small csv files?

Appreciate your feedback though...

@scheung38
Copy link
Author

scheung38 commented Jan 17, 2020

Using standard non-async:

import hashlib
import time

BLOCK_SIZE = 65536

 def hash_csv():
      digests = []
      for i in range(1000): # to simulate a 1000 files
          for filename in ['Y':\\sample.csv']: # around 75k in size 38 columns
              hasher = hashlib.blake2s()
              with open(filename, 'rb') as f:
                  buf = f.read(BLOCK_SIZE)
                  hasher.update(buf)
                  a = hasher.hexdigest()
                  digests.append(a)
                  print(a)
     return digests


if __name__ == '__main__':
    start = time.time()
    hash_csv()
    end = time.time()
    total = end - start
    print(total)  

12.5 sec

Tried your single processing version:

import asyncio
import hashlib
from aiofile import AIOFile, LineReader


async def hashlib_file(filename):
    # Open file
    async with AIOFile(filename, 'rb') as afd:
        # Create hasher
        hasher = hashlib.blake2s()
        async for line in LineReader(afd):
            # For each line update hasher
            hasher.update(line)
            
    # return hexdigest
    return (hasher.hexdigest(), filename)


async def main():
    FILES = (
        "Y: \\sample.csv", 
    )

     for i in range(1000): 
      actions = [hashlib_file(f) for f in FILES]
      results = await asyncio.gather(*actions)
      for filehash, filename in results:
        print(f"{filehash}\t{filename}")


start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
total = end - start
print(total)  

71 sec

Tried your multiprocessing version:

import asyncio


async def hashlib_file(filename):
    proc = await asyncio.create_subprocess_exec(
        "sha256sum", filename,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    stdout, stderr = await proc.communicate()

    value, _ = stdout.decode().split()

    return value, filename


async def main():
    FILES = (
        "Y: sample.csv",
    )
    actions = [hashlib_file(f) for f in FILES]
    results = await asyncio.gather(*actions)
    for filehash, filename in results:
        print(f"{filehash}\t{filename}")

start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end = time.time()
total = end - start
print(total)  

Error: NotImplementedError?

So not sure why it is slower than sync version?

@scheung38
Copy link
Author

@Natim
Copy link

Natim commented Jan 19, 2020

It seems you are using Windows, so you cannot exec a unix process from there.

@Natim
Copy link

Natim commented Jan 19, 2020

Your code is wrong for the async test 😂

    FILES = (
        "Y:\\sample.csv",
    )
    actions = [hashlib_file(FILES[0]) for _ in range(1000)]
    results = await asyncio.gather(*actions)
    for filehash, filename in results:
        print(f"{filehash}\t{filename}")

@scheung38
Copy link
Author

single or multiple is wrong?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants