-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add skew join #229
Add skew join #229
Conversation
val sampledRight = otherPipe.filter() { u : Unit => scala.math.random < sampleRate } | ||
.groupBy(fs._2) { _.size(rightSampledCountField) } | ||
|
||
val sampledCounts = sampledLeft.joinWithSmaller(fs._1 -> fs._2, sampledRight, joiner = new OuterJoin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be a joinWithTiny? You're doing a joinWithTiny later on with the full sampledCounts, so presumably sampledRight is also small enough...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no outerjoin with joinWithTiny, but there is a left join.
Not clear to me that we need outerjoin though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fully thought through this, but I think using inner joins here (and below, line 398) might actually be to our advantage. My reasoning is this: since the full skew join is only allowed to have inner join semantics, any (for example) left key that does not have a corresponding right key will not appear in the final output, and so should be replicated 0 times. This is kinda like doing a bloomfilter join, except that we're using the key counts hashtable to do the filtering instead of compressing it into a bloom filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But due to sampling, we can't trust the innerjoin is a true zero, vs a sampled zero where there is actually a heavy key on the other side, right?
Alternatively, Edwin and I were discussing use CountMinSketch, which (we thought) could conclusively distinguish a true zero from a rare item. If we did the count-min-sketch instead of sampling (and Edwin already implemented that in algebird) we could get the advantage of a bloomfilter at the same time.
Perhaps we should punt on that for a v2 and just get this version out.
In truth, the use case here is to replicate Bieber, Obama, Gaga, etc... and not replicate everyone else. So the savings from getting a few keys to replication 0 are probably not even measurable since those keys are by assumption very low mass to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I think my previous comments got lost...
Yeah, a CountMinSketch works like a Bloom filter, so if the CMS says the frequency is zero, it's a true zero. (but not vice-versa)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, re: Avi's first comment, I'm actually thinking of changing the joinWithTiny against the full sampledCounts to a joinWithSmaller, since I'm not so sure sampledCounts is that tiny.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd bias to the tiny if I were you. For instance, if we default to 1/10,000 probability, than a key has to show up 10k times to be expected to survive.
If there are 10^10 rows initially, then you have 10^6 after sampling, and presumably, some of those are repeated. In any case, that is only about 1 million rows, so I think it is safe. We can tune later anyway.
Finally, I think ultimately we should move to CMS anyway, so I wouldn't sweat it now. In that case, we would do a groupAll to create the CMS object and just make sure that object's size is not too big.
About the replication factor: presumably we want to minimize replication, right? There's certainly no point in a replication factor > number of reducers. Also I suspect that below a certain threshold of % of total keys, there's no need to replicate at all (ie replication factor of 1). A naive but probably effective approach would be to define a threshold, say (total_items / n_reducers / 3), and for any key with > threshold, you replicate n_reducers times, and for any key < threshold, you replicate 1 time. |
A special case of this that's worth considering is a star join, where there is one very large fact table (eg, an Impressions table), that needs to be joined to N smaller dimension tables (eg, Clicks table, User table, Ad table, Country table, etc). Here we can assume that the dimension tables have exactly one instance of each key, and we only need to sample and distribute the key frequencies for the large fact table. We can use this to determine the replication factor (possibly including 0) for each key in each dimensions table. I suspect that in practice, 95% of our skew joins will fit this pattern. However, Cascading doesn't actually support star joins, and I haven't yet figured out how to trick it into doing so :) |
About star-join:
|
I was talking to Argyris about this earlier. I don't think that's what most people mean when they say star join, because every table is joining on the same key (user_id). In a star join, you would, for example, have one table with (user_id, ad_id, impression_id) and you would be joining it simultaneously to a user table on user_id, an ad table on ad_id, a clicks table on impression_id, etc. Re your point about size; yes, in this sense the one central table would be large (potentially have many values/key) and the other tables would all be as small as possible (one value/key). |
About the replication factor: Let's limit ourselves to inner-join of two pipes for the moment. That's the most common case, and the useful case for matrix multiplication. Also, block join doesn't work with outer-join (in general replicated joins are difficult in that scenario). Now, for each key, k_i, the left there are v_i values, and on the right there are w_i values. So, the joined size is v_i w_i, and therefore the time for k_i is at least O(v_i w_i). We want that to be O(1), so we need to replicate v_i and w_i enough to make that O(1). Since the cost to replicate k_i is v_i L_i + w_i R_i = C, where L_i is the replication factor of the left for k_i and R_i is the replication factor on the right for k_i. With this replication, we cut k_i's work by a factor of L_i * R_i, so we need, L_i * R_i = Z for some constant Z. Combining this equation with the cost and minimizing, you have that L_i = w_i sqrt(Z) and R_i = v_i sqrt(Z) If we want to set a maximum replication of something like 100, we could do that too. So, what we need to do is: sample, groupBy { _.size } then leftJoinWithTiny to the opposite side to set the replication factor before the joinWithSmaller. Obviously if we don't have the key, we set replication factor 1. Does this make sense? Did I do this correctly? |
I don't think we care about strictly minimizing the O(v_i w_i) join size, because that work has to be done regardless - there are v_i w_i outputs, it's just a question of how many reducers they are spread across. We can define J as the maximum output of any individual join, which is (v_i w_i) / (L_i R_i). Setting L_i = w_i and R_i = v_i minimizes J to 1, but at the cost of a very high C of 2 v_i w_i. I think there are actually 2 thresholds that we care about. One is that we're going to have to hold v_i / R_i keys in memory while doing the join, so that has to be < M, where M is maybe 1e6. The other is that we don't want any 1 reducer to have a crazy amount of output, so J ought to be < N, where N is maybe 1e7? Finally, it's pointless having L_i be greater than the number of reducers. Apart from that, we want to try to minimize C. Given that, I would suggest: R_i = max(1, v_i / M) (BTW, my left and right here assumes we're doing a joinWithLarger... if we're doing a joinWithSmaller, reverse them). |
In my view, these are both replication strategies, and we can abstract them and try different approaches. So, we basically need a Function1[(Long,Long), (Int,Int)], which given the counts of the left, right, return the replication of the left and the right. I agree with your point that making the the new join O(1) is the wrong constraint, because in fact, what we really want is to make the variance of the running time <= the mean running time, or something like this, which clearly depends on the number of reducers, because if there is only 1, the optimal approach is replication factor = 1. For now, I don't care what we pick, we should just start playing with it. It might be nice to be able to plug in the strategy at run time so we can experiment more easily with some good defaults. |
sampleRate : Double = 0.001, replicationFactor : Int = 1, reducers : Int = -1) : Pipe = { | ||
|
||
assert(sampleRate > 0 && sampleRate < 1, "Sampling rate for skew joins must lie strictly between 0 and 1") | ||
assert(replicationFactor >= 1, "Replication factor must be >= 1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont you want this to be strictly greater than 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually never mind, you don't need this.
This looks great! Looking forward to trying it out on a big dataset. |
Two main changes:
|
shipit. But since we have so many commenters, I'll wait for Avi or Argyris (or maybe both) to agree and click merge. Very excited about this work, Edwin! |
LGTM. Merging. |
I haven't tested this out on a non-trivial dataset yet, and I'll probably want to clean some things up. This is mostly an fyi (and to get feedback if I'm doing anything wrong).
Note: to set the replication factor...
Suppose that key K appears in the sampled left pipe 3 times and in the sampled right pipe 5 times.
Then I simply replicate the rows K appears in 5 times each on the left, and 3 times each on the right. (Or, more generally, I have a replicationFactor parameter, and I replicate 5 * replicationFactor on the left, and 3 * replicationFactor on the right.)
This seems natural to me, but maybe I'm missing something?