Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cumulative sum to KeyedList #1085

Merged
merged 1 commit into from
Nov 9, 2014
Merged

Conversation

snoble
Copy link
Contributor

@snoble snoble commented Oct 20, 2014

Not sure where the best place to put this is (or where to add tests) but this is a method for taking cumulative sums that doesn't require all the entries to go through a single scanLeft. It accomplishes this by

  • taking an ordered partition of the entries
  • summing up the entries in each partition
  • scanning through the sums (which does require going through a single scanLeft so there can't be too many partitions)
  • then scanning through each partition starting from the cumulative sum of the prior partition (so each partition can't be too big either)

This has been compiled against master.

def cumulativeSum[U, V](implicit ev: T <:< (U, V), sg: Semigroup[V], ordU: Ordering[U], ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
toTypedPipe.group
.sortBy { case (u: U, _) => u }
.scanLeft(Nil: List[(U, V)]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be Option[(U, V)]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to recall flattenValues was giving me grief when I was using an Option... I'll confirm though

@johnynek
Copy link
Collaborator

These look like useful functions, but the placement does not seem quite right. The first thing you do is call .toTypedPipe on these things. This is because KeyedListLike does not have any notion of sorting because (sadly) Cascading does not support sorting during a join.

So, I think this might be a case for enrichment:

package com.twitter.scalding.typed

/**
 * Extension for TypedPipe to add a cumulativeSum method
 */
object CumulativeSum {
  implicit class Extension[K, V](val pipe: TypedPipe[(K, V)]) extends AnyVal {
    def cumulativeSum[......
  }
}

Then a user accesses it with: import CumulativeSum.Extension.

It's tough to draw the line between what the core methods are and what should be on the edge. This feels like the latter to me.

@snoble
Copy link
Contributor Author

snoble commented Oct 29, 2014

This rings true to me that this doesn't feel like a core method. I'll move this around.

@snoble snoble force-pushed the develop branch 4 times, most recently from 0737c66 to f788c5c Compare October 29, 2014 19:53
@snoble
Copy link
Contributor Author

snoble commented Oct 29, 2014

Moved the files around. Note this breaks building the 2.9 target.

Also, flattenValues really doesn't want to work with Options

@snoble
Copy link
Contributor Author

snoble commented Nov 3, 2014

bump

@ianoc
Copy link
Collaborator

ianoc commented Nov 3, 2014

This is failing the tests, we still have a 2.9 build that isn't working here.

@snoble
Copy link
Contributor Author

snoble commented Nov 3, 2014

@ianoc that's from using an implicit class. Is there a 2.9 way to add methods like this?

@ianoc
Copy link
Collaborator

ianoc commented Nov 3, 2014

implicit def toMyType(a: BaseType) = new MyType(a)

@ianoc
Copy link
Collaborator

ianoc commented Nov 3, 2014

implicit def toMyType(a: BaseType) = new MyType(a)

On Mon, Nov 3, 2014 at 2:50 PM, snoble notifications@github.com wrote:

@ianoc https://github.com/ianoc that's from using an implicit class. Is
there a 2.9 way to add methods like this?


Reply to this email directly or view it on GitHub
#1085 (comment).

@snoble
Copy link
Contributor Author

snoble commented Nov 5, 2014

tests are passing now

@snoble
Copy link
Contributor Author

snoble commented Nov 7, 2014

Added a new test for this method

import com.twitter.scalding.typed.CumulativeSum._

class AddRankingWithCumulativeSum(args: Args) extends Job(args) {
TypedTsv[(String, Double)]("input1", ('gender, 'height))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need field names when using the typed api

@ianoc
Copy link
Collaborator

ianoc commented Nov 7, 2014

you have added scalding-hadoop-test/NOTICE also, looks by mistake...

}
}
.flattenValues
.toTypedPipe
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we remove this, you could return an SortedGrouped, I think, which can still do reduces on, if people want that. I think returning the TypedPipe is only a loss of power. Consider if you want to follow up with a fold or another scan (or a filter followed by a fold or scan).

@johnynek
Copy link
Collaborator

johnynek commented Nov 8, 2014

Make the simple case return type SortedGrouped: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala#L55 and I'll click merge.

@snoble
Copy link
Contributor Author

snoble commented Nov 8, 2014

Done. (as an aside I'm still confused why Options don't seem to be working with flattenValues... /shrug)

@johnynek
Copy link
Collaborator

johnynek commented Nov 8, 2014

@snoble because Option is not TraversableOnce, there is an implicit to that. You could have done: .mapValues(_.iterator).flattenValues

@snoble
Copy link
Contributor Author

snoble commented Nov 8, 2014

oops, fixed

johnynek added a commit that referenced this pull request Nov 9, 2014
Add a cumulative sum to KeyedList
@johnynek johnynek merged commit 72e689b into twitter:develop Nov 9, 2014
@johnynek
Copy link
Collaborator

johnynek commented Nov 9, 2014

Thank you, sir!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants