Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched groups for TypedPipe #1318

Open
rubanm opened this issue Jun 10, 2015 · 6 comments
Open

Batched groups for TypedPipe #1318

rubanm opened this issue Jun 10, 2015 · 6 comments

Comments

@rubanm
Copy link
Contributor

rubanm commented Jun 10, 2015

With a TypedPipe, suppose we want to generate groups such that there are at most k values in each group, it can currently be done using something like:

pipe.groupAll.mapValueStream(_.zipWithIndex).values
  .map { case (v, i) => (i / k, v) }.group

However, this can be too slow for large datasets. It should be possible to use the total size of the dataset using groupAll + size instead. The extra mr step should still be faster than the earlier groupAll. Will something like the following work?

def batchedGroup(batchSize: Long): Grouped[Long, T] = {
  val random = Random(123)
  val intermediate = forceToDisk
  val size = intermediate.groupAll.size.values
  intermediate.cross(size)
    .map { case (data, size) =>
       val k = (random.nextLong % size) / batchSize
       (k, data)
    }
   .group
}
@johnynek
Copy link
Collaborator

I think that's not quite right. You want % batchSize to make sure that (0 until batchSize).toSet.contains(k) right? Another approach: (random.nextDouble() * batchSize).toLong

In fact in that approach wouldn't this work:

this.map { v =>
  val k = math.floor(random.nextDouble * batchSize)
  (k, v)
}
.group

@ianoc
Copy link
Collaborator

ianoc commented Jun 10, 2015

These won't be able to give you an at most bounds right? they are approximate bucketing?

@johnynek
Copy link
Collaborator

Yeah, I totally lost track of what was supposed to be happening here. disregard. :)

Still what is this doing:

(random.nextLong % size) / batchSize

so, nextLong % size should be a random bucketing into size buckets, but if there is a collision, then you could have more than batchSize is each bucket. If you knew the taskId and the number of tasks there were (something we have thought about adding in the past and I even added a patch to do this once, but we tossed it), we could do this without the randomness and then we could make it exact. Rather than counting all of the items, you count how many items each task has (create a hashTable of taskId => count). Then we can use that to produce an exact contiguous ordering of the items into the space 1 to sum(hashTable.values), then it should be exact, right?

@ianoc
Copy link
Collaborator

ianoc commented Jun 10, 2015

Yep, that could be exact since we can revisit the same persisted data on disk from the first pass. (So i guess you'd need to ensure you've done a forceToDisk before the operation to ensure its deterministic). But that'd work i think

@aalmahmud
Copy link

I am currently trying to solve this problem. I need to partition a pipe to multiple groups but each group can contain at most 50000(say batchSize) entries.
So in this case my number of buckets will be size/batchSize (say buckets)
using groupRandomly(buckets) can solve this problem. But the problem is that may be some group can contain more than batchSize entries. To avoid this problem am planning to use groupRandomly(buckets X 2) or groupRandomly(bucketsX 3)? This will reduce the probability of having a bucket more than batchSize entries.
[Edit: using * for multiplication was not displaying correctly"]

@rubanm
Copy link
Contributor Author

rubanm commented Jun 10, 2015

@aalmahmud that can be a workaround if your total number of buckets is not too many reducers to spawn in one step. I will work on a PR based on this thread so we have a proper solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants