Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize reads #1607

Merged
merged 8 commits into from
Aug 17, 2016
Merged

Parallelize reads #1607

merged 8 commits into from
Aug 17, 2016

Conversation

pomadchin
Copy link
Member

@pomadchin pomadchin commented Aug 2, 2016

  • s3
  • cassandra
  • file

@pomadchin pomadchin changed the title [WIP] Parallelize reads Parallelize reads Aug 2, 2016
@fosskers
Copy link
Contributor

fosskers commented Aug 2, 2016

@lossyrob Grisha has opted here not to explicitly use Tasks with a custom thread pool. Apparently nondeterminism handles the specifics of that.

He was seeing similar issues to what we had before, where Tasks weren't appearing to complete and using pool.shutdown() was causing problems. We might want to consider back-porting his solution here to our other work.

@echeipesh
Copy link
Contributor

echeipesh commented Aug 4, 2016

Without dedicated pool it will end up using scala.concurrent.ExecutionContext.Implicits.global which is comes with this info:

The implicit global ExecutionContext. Import global when you want to provide the global ExecutionContext implicitly.

The default ExecutionContext implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of available processors.

I think that works out alright for spark tasks


tileSeq.flatten
nondeterminism.njoin(maxOpen = 8, maxQueued = 8) { range map read }.runFoldMap(identity).unsafePerformSync
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice 8 is magic number here but 32 is for Cassandra and File. Did it benchmark to be best that way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best if we could configure this somehow, and not have magic numbers impossibly un-magicked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you feel about using typesafe config for this? This is probably a system wide property rather than per-action property.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just experimented how tests would run faster on a local machine; though they are still magical. probably it makes sense to make them configurable, as speed of them depends on certain machine(s) configuration.

@pomadchin
Copy link
Member Author

pomadchin commented Aug 7, 2016

@echeipesh in terms of scalaz, it would use DefaultStrategy, which is FixedThread pool with threads = available processors amount. Thx for a pointer, though I am still curious (as an example of what i am talking about S3RDDWriter), creating a batch of tasks using our custom thread pool, how tasks would be scheduled with nondeterminism.njoin as it uses DefaultStrategy and DefaultExecutor?

def njoin[A](maxOpen: Int, maxQueued: Int)(source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A]

@fosskers
Copy link
Contributor

fosskers commented Aug 9, 2016

This continues to be an informative PR.

@echeipesh echeipesh merged commit 4772fb1 into locationtech:master Aug 17, 2016
@lossyrob lossyrob added this to the 1.0 milestone Oct 18, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants