Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No good tools for long streams of tasks #493

Open
chrisseaton opened this issue Jan 9, 2016 · 15 comments
Open

No good tools for long streams of tasks #493

chrisseaton opened this issue Jan 9, 2016 · 15 comments
Labels
enhancement Adding features, adding tests, improving documentation. looking-for-contributor We are looking for a contributor to help with this issue. medium-priority Should be done soon.

Comments

@chrisseaton
Copy link
Member

chrisseaton commented Jan 9, 2016

A couple of times now I've been speaking to someone and they have a problem like this.

pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count)

1.upto 2000000 do |i|
  pool.post do
    # work here
  end
end

or

File.open('foo').each do |line|
  pool.post do
    # work here
  end
end

In both cases we're creating a huge number of tasks, and in the latter case we may not know how many tasks in advance.

The problem in both cases is that people create millions of tasks which can take up a lot of memory with the proc and the closure.

I feel like we're missing two abstractions here.

The first is a basic parallel #each on a Enumerable with a length. We don't have that do we? It would need chunking (run n tasks in each task), and perhaps automatic chunking based on profiling (run 1 task, see how long it takes, think about how many tasks there are and set n based on that).

The second is something similar that works on an Enumerator, which doesn't have a length. Here chunking is harder as we don't know how many tasks there will be in advance. We may need some kind of work stealing here.

I'd like to write the two examples above as:

1.upto(2000000).parallel_each(pool) do
  # work here
end

or

File.open('foo').each.parallel_each(pool) do |line|
  # work here
end

In both cases we'd only create as many tasks at a time as was reasonable (if the pool has n threads it may be kn tasks for some small constant k).

A workaround in the mean time to just stop so many tasks being created and memory being blown may be to do this (ping @digininja this is relevant to you):

pool = Concurrent::FixedThreadPool.new(
  Concurrent.processor_count,
  max_queue: 10 * Concurrent.processor_count,
  fallback_policy: :caller_runs)

1.upto 2000000 do |i|
  pool.post do
    # work here
  end
end

This will only create up to 10 times as many tasks as you have cores, with any other tasks being run immediately instead of being added to the pool. This means if you already have say 40 tasks in the pool, instead of creating a new task it will be run, and then by the time the loop gets around to finishing that the pool may be ready for new tasks, or may not be in which case the main thread runs that new task as well.

@chrisseaton chrisseaton added the enhancement Adding features, adding tests, improving documentation. label Jan 9, 2016
@chrisseaton
Copy link
Member Author

Oh when you are doing this you may want to set the thread pool to size Concurrent.processor_count - 1, as the main thread will also be running tasks, so you would have Concurrent.processor_count + 1 tasks running and being switched on and off cores, ruining cache.

@digininja
Copy link

Sounds like a good option, I'll give it a try tonight.
On 9 Jan 2016 12:50, "Chris Seaton" notifications@github.com wrote:

Oh when you are doing this you may want to set the thread pool to size Concurrent.processor_count

  • 1, as the main thread will also be running tasks, so you would have Concurrent.processor_count
  • 1 tasks running and being switched on and off cores, ruining cache.


Reply to this email directly or view it on GitHub
#493 (comment)
.

@eregon
Copy link
Collaborator

eregon commented Jan 9, 2016

This exists in many gems as peach/pmap, but I think it would make total sense in c-r.
It sounds like Scala's .par to me, that might be an interesting direction to explore.
It could indeed be nice to have parallel enumerators in additions to eager and lazy ones :)

@digininja
Copy link

Have you got good links to describe the basics of how all this works, I.e.
what are the different types of enumerators?
On 9 Jan 2016 15:15, "Benoit Daloze" notifications@github.com wrote:

This exists in many gems as peach/pmap, but I think it would make total
sense in c-r.
It sounds like Scala's .par
http://docs.scala-lang.org/overviews/parallel-collections/overview.html
to me, that might be an interesting direction to explore.
It could indeed be nice to have parallel enumerators in additions to eager
and lazy ones :)


Reply to this email directly or view it on GitHub
#493 (comment)
.

@jdantonio
Copy link
Member

👍 to parallel enumerable.

I'm heading out the door and only have time for a quick note. This topic has been discussed at length before and some spike code was written. Please see #222, #229, and #231.

// @SebastianEdwards

@chrisseaton
Copy link
Member Author

Ah I didn't know about those. We need a parallel Enumerator as well as Enumerable though, for streams of unknown length. I didn't see that discussed yet.

@digininja
Copy link

For my real code, and my sample that @chrisseaton reproduced above, the change to the pool initialisation worked perfectly. Memory doesn't go above 2.1% on my 16G machine so I've pushed the queue size up to 90 times and still only at 2.3% max.

@pitr-ch
Copy link
Member

pitr-ch commented Jan 11, 2016

An example with actors, which also has configurable parallelism, does not require to manage pool directly.

require 'concurrent-edge'

def work(i)
  sleep rand(0.01)
  puts "(#{i})"
  i * 2
end

parallelism = 10
all_inputs  = Array.new(100) { |i| i }

class Worker < Concurrent::Actor::Context
  def on_message(i)
    work i
  end

  def default_executor
    Concurrent.global_io_executor
  end
end #

POOL = Concurrent::Actor::Utils::Pool.spawn('pool', parallelism) do |index|
  Worker.spawn(name: "worker-#{index}")
end

# zip all futures into one
all_done = Concurrent.zip(*all_inputs.map do |i|
                            # ask the pool, returns future
                            POOL.ask i
                          end)

# block the main thread here until all is done
p all_done.value! # => [0, 2, 4, 6, 8, ... , 198]

@chrisseaton
Copy link
Member Author

@pitr-ch the problem here is that creating a closure (whether that's for a pool for a future) for every single work item just takes up too much memory.

@digininja
Copy link

I would assume that lumping together all the responses to the futures could
also take up a large chunk of memory.

On 11 January 2016 at 11:08, Chris Seaton notifications@github.com wrote:

@pitr-ch https://github.com/pitr-ch the problem here is that creating a
closure (whether that's for a pool for a future) for every single work item
just takes up too much memory.


Reply to this email directly or view it on GitHub
#493 (comment)
.

@digininja
Copy link

If it helps anyone, this is an approximation to the code I'm working on:

https://gist.github.com/digininja/d68f3c272778ec9a3299

I expect the word list to be around a million words and the individual tasks to be fairly quick and lightweight.

@pitr-ch
Copy link
Member

pitr-ch commented Jan 14, 2016

Ah, then Channels would work (cc @jdantonio) since they have back-pressure, a reading thread would become blocked if it would be sending jobs too fast to a job-handling-channel (which should have limited size). Actors would also work but there would have to be a bounded message box implemented. This use-case definitely points at some holes we have in our APIs. Thanks @digininja.

@jdantonio
Copy link
Member

Excellent feedback and suggestions. Thank you everyone!

@pitr-ch pitr-ch added this to the 1.1.0 milestone Mar 12, 2016
@brauliobo
Copy link

brauliobo commented Dec 22, 2016

very good idea IMO, would be pleased to have a peach like method with ruby concurrent.

peach gem is unmaintained btw. As I would like to stick with ruby-concurrent and not add another dependency, I wrote a simple peach implementation based on fixed thread pool:

module Enumerable

  def peach threads = nil
    pool = Concurrent::FixedThreadPool.new threads || (ENV['THREADS'] || '1').to_i

    each do |item|
      pool.post do
        begin
          yield item
        rescue => e
          puts "EXCEPTION: #{e.inspect}"
          puts e.backtrace
        end
      end
    end

    pool.shutdown
    pool.wait_for_termination
  end

end

@pitr-ch pitr-ch modified the milestones: 1.2.0, 1.1.0 Apr 2, 2017
@pitr-ch pitr-ch self-assigned this Jul 6, 2018
@pitr-ch pitr-ch removed this from the 1.2.0 milestone Jul 6, 2018
@pitr-ch
Copy link
Member

pitr-ch commented Jul 6, 2018

Having ThreadPoolExecutor block/wait fallback policy would also help with the issue.

@pitr-ch pitr-ch added the medium-priority Should be done soon. label Jul 6, 2018
@pitr-ch pitr-ch removed their assignment Jul 6, 2018
@pitr-ch pitr-ch added the looking-for-contributor We are looking for a contributor to help with this issue. label Jul 6, 2018
@pitr-ch pitr-ch added this to Interesting features in Hackathon Aug 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Adding features, adding tests, improving documentation. looking-for-contributor We are looking for a contributor to help with this issue. medium-priority Should be done soon.
Projects
Hackathon
Interesting features
Development

No branches or pull requests

6 participants