Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make process spawning configurable: either multiprocess, either spark or dask #20

Closed
rom1504 opened this issue Aug 20, 2021 · 40 comments
Closed
Milestone

Comments

@rom1504
Copy link
Owner

rom1504 commented Aug 20, 2021

May at least be useful to control better the memory usage

@rom1504
Copy link
Owner Author

rom1504 commented Aug 22, 2021

with the current architecture of things, it should be pretty natural to make it possible to choose a multiprocessing pool and a spark or dask distributed environment
May be a good thing to add in order to have multi node support

@rom1504 rom1504 changed the title Build a pyspark and/or dask version and compare perf Consider making process spawning configurable: either multiprocess, either spark or dask Aug 22, 2021
@rom1504 rom1504 added this to the 1B milestone Aug 23, 2021
@rom1504
Copy link
Owner Author

rom1504 commented Nov 27, 2021

https://github.com/rom1504/img2dataset/blob/main/img2dataset/downloader.py#L337 at least do it at the file level, so this can be a pure mapper, follow the same idea as rom1504/clip-retrieval#79 (comment)

@rom1504
Copy link
Owner Author

rom1504 commented Nov 27, 2021

@rom1504
Copy link
Owner Author

rom1504 commented Dec 2, 2021

https://github.com/horovod/horovod/blob/386be429b1417a1f6cb5e715bbe36efd2e74f402/horovod/spark/runner.py#L244 is a good trick to let the user build his own spark context

@rom1504 rom1504 mentioned this issue Jan 2, 2022
4 tasks
@rom1504
Copy link
Owner Author

rom1504 commented Jan 3, 2022

to move forward on this, moving the reader at the executor level could be good

@rom1504
Copy link
Owner Author

rom1504 commented Jan 3, 2022

@rom1504 rom1504 pinned this issue Jan 7, 2022
@rom1504 rom1504 changed the title Consider making process spawning configurable: either multiprocess, either spark or dask Make process spawning configurable: either multiprocess, either spark or dask Jan 8, 2022
@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

Spark streaming can handle a streaming collection of files in a folder
However it may not be able to handle partial files
Solutions:

  • Write in temporary dir the partial files and move at the end (spark solution)
  • just do many standard spark batching instead
  • simply push file names in a TCP stream / queue and have spark streaming read that !!!

Third solution is the best.
That should also allow this to work in distributed inference mode and for any inference

@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

https://www.bogotobogo.com/Hadoop/BigData_hadoop_Apache_Spark_Streaming.php

Internally, a DStream is represented as a sequence of RDDs

@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 9, 2022

https://github.com/criteo/cluster-pack/tree/master/examples/spark-with-S3 maybe be helpful to create a pyspark session but should probably not be included by default and instead be under an option or even as an example script / let the user create the session as he prefers

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

ok we now have pyspark support.

Next step here is to actually try running it on some pyspark clusters.
I intend to try (and document):

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

standalone

https://spark.apache.org/downloads.html

https://spark.apache.org/docs/latest/spark-standalone.html

wget https://dlcdn.apache.org/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
tar xf spark-3.2.0-bin-hadoop3.2.tgz

on master:
bash ./sbin/start-master.sh

on nodes:
bash ./sbin/start-worker.sh "spark://master-ip:7077"

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

makes sure all writer overwrite for this to work well with spark feature of retrying (just delete the file if it already exists)

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

-> not obvious how to run standalone: how to send the env to other nodes ; where to write ? (how to setup a distributed fs locally)
maybe just try aws emr next

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

maybe using sshfs could work

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

An end-to-end Docker example for deploying a standalone PySpark with SparkSession.builder and PEX can be found here - it uses cluster-pack, a library on top of PEX that automatizes the the intermediate step of having to create & upload the PEX manually.

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

since this is just a mapper, it could also be possible to build a docker and spawn it one time per input file like https://blog.iron.io/docker-iron-io-super-easy-batch-processing/
might be interesting

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Jan 10, 2022

possibly reconsider a streaming based approach to eliminate the concept of file from most of the pipeline

@rom1504
Copy link
Owner Author

rom1504 commented Jan 11, 2022

consider yielding examples in the downloader and moving the aggregation by the writer at the distributor level (not the driver, but an abstraction on top of the downloader happening in the workers)
that would allow for perfect balancing of written files

@rom1504
Copy link
Owner Author

rom1504 commented Jan 15, 2022

https://github.com/intel-analytics/analytics-zoo looks really good

@rom1504
Copy link
Owner Author

rom1504 commented Jan 16, 2022

This could potentially be made easier by having a service handling the http/dns part, returning the original image and letting img2dataset job do the resizing and packaging

Pipeline is

  • read urls
  • shard
  • download each url
  • resize
  • write

The download part may be complicated to scale beyond 1000 request/s due to dns, so maybe it's better to let this part be done by a service

@rom1504
Copy link
Owner Author

rom1504 commented Jan 21, 2022

consider making 2 ways shared file systems not required (can be done by distributing the shards via pyspark/python serialization instead of arrow + save to file system)
that would make it possible to use a rsync target as target file system

@rom1504
Copy link
Owner Author

rom1504 commented Jan 26, 2022

rom1504@rom1504-Fixe:~/spark/spark-3.2.0-bin-hadoop3.2$ cat go_cluster.sh 
bash go_master.sh
bash go_worker.sh

rom1504@rom1504-Fixe:~/spark/spark-3.2.0-bin-hadoop3.2$ cat go_master.sh 
./sbin/start-master.sh -h ip -p 7077

rom1504@rom1504-Fixe:~/spark/spark-3.2.0-bin-hadoop3.2$ cat go_worker.sh 
export SPARK_IDENT_STRING=worker1
./sbin/start-worker.sh -c 2 -m 1G -h ip -p 3456 spark://ip:7077
export SPARK_IDENT_STRING=worker2
./sbin/start-worker.sh -c 2 -m 1G -h ip -p 3456 spark://ip:7077

@rom1504
Copy link
Owner Author

rom1504 commented Jan 30, 2022

this is almost done now
last thing to do will be a guide on how to setup a spark cluster on a set of machines available through ssh

@rom1504
Copy link
Owner Author

rom1504 commented Jan 31, 2022

https://github.com/rom1504/img2dataset/blob/main/examples/distributed_img2dataset_tutorial.md here is the guide

it works, but it's a bit complex

I would like to propose also these alternatives:

  • using aws emr eks
  • maybe propose the user another distribution mode without spark, using directly ssh

@rom1504
Copy link
Owner Author

rom1504 commented Jan 31, 2022

@rom1504
Copy link
Owner Author

rom1504 commented Feb 4, 2022

aws emr on eks is actually rather painful to setup

I'm considering instead going the raw ec2 route
it would have the added benefit to work in a natural way for any other provider of instances

options are to document the spark setup in this case, or to do a no spark option (would require implementing robustness)

@rom1504
Copy link
Owner Author

rom1504 commented Feb 5, 2022

writing to s3 (and hdfs) from any machine is working just fine now

I believe the only additional thing I will try here is a pure ssh based strategy, to make it easier for people to run in distributed mode

@rom1504
Copy link
Owner Author

rom1504 commented Feb 7, 2022

this is working. A little troublesome to setup but overall working!

@rom1504 rom1504 closed this as completed Feb 7, 2022
@rom1504 rom1504 unpinned this issue Feb 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant