Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Summingbird Storm/Online refactor #544

Merged
merged 8 commits into from Oct 1, 2014
Merged

Summingbird Storm/Online refactor #544

merged 8 commits into from Oct 1, 2014

Conversation

ianoc
Copy link
Collaborator

@ianoc ianoc commented Aug 29, 2014

based ontop of #539, so when we merge that I can rebase/merge to make the diff a little cleaner. This is much bigger though so didn't want to get stuck with merge conflicts.

This however does not effect behavior of jobs(though it is source code incompatible at this time) at all. Its mostly moving things out of storm into online where possible.

Attempts to put some nicer types around our Store Provider and Service Provider needs in online is probably the most useful cleanup to it. Adds a bunch of comments too where needed.

…orm. Attempt to but some better types down on the stores/services we use in storm that aren't storm specific.
@@ -18,12 +18,12 @@ package com.twitter.summingbird.store

import com.twitter.summingbird.online.Externalizer
import com.twitter.storehaus.ReadableStore
import com.twitter.storehaus.algebra.MergeableStore
import com.twitter.storehaus.algebra.Mergeable
import com.twitter.summingbird.batch.BatchID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you make it just a Mergeable? Aren't we dealing with Stores still?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had already done it in storm , so this brings it in line. We don't actually use more of the contract in our code than it being a Mergeable. I.e. we only actually call multiMerge, never get or put. So you can do stuff without providing those functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

@egonina
Copy link
Contributor

egonina commented Aug 29, 2014

Looks good to me, added a few suggestions for clarifications.

@@ -62,4 +62,17 @@ object Timestamp {
def prev(old: Timestamp) = if (old.milliSinceEpoch != Long.MinValue) Some(old.prev) else None
def ordering: Ordering[Timestamp] = Timestamp.orderingOnTimestamp
}

// This is a right semigroup, that given any two Timestamps just take the one on the right.
val rightSemigroup = new Semigroup[Timestamp] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I'm surprised we don't have this in algebird.

Is there a reason we prefer this to the Max semigroup?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its just a kind of known quantity. I don't believe the TS really makes that much sense in any case after this other than being within the bounds of some range usually. This is just a move here from elsewhere. The whole usage probably could do with more thought. (maybe Max, but then with aggregations you could go down also in a stream, so Right at least indicates its pretty arbitrary)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is strange to me. Where would we use this over Max, which is the default monoid for Timestamp.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was your choice Oscar ;)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I remember the situation now. We should probably comment it better. The reason we did this is because we don't want to give a stronger contract to the semigroup than the store actually respects.

Then looks back up the timestamp handed from the stream and outputs with that.
*/
def wrapOnlineFactory[K, V](supplier: MergeableStoreFactory[K, V]): MergeableStoreFactory[K, (Timestamp, V)] =
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curly on previous line?

@@ -32,6 +32,14 @@ trait FlatMapOperation[-T, +U] extends Serializable with Closeable {
*/
def maybeFlush: Future[TraversableOnce[U]] = Future.value(Seq.empty[U])

// Helper to add a simple U => S operation at the end of a FlatMapOperation
def map[S](fn: U => S): FlatMapOperation[T, S] =
andThen(FlatMapOperation(fn.andThen(r => Seq(r))))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do r => Iterator(r) here because I am afraid of creating an in-memory representation needlessly by forcing this to Seq.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but then as a map its output should be just 1 element I believe?

johnynek added a commit that referenced this pull request Oct 1, 2014
Summingbird Storm/Online refactor
@johnynek johnynek merged commit 7e847d8 into develop Oct 1, 2014
@johnynek johnynek deleted the stormOnlineRefactor branch October 1, 2014 23:26
snoble pushed a commit to snoble/summingbird that referenced this pull request Sep 8, 2017
* add Semigroup.maybePlus

* Add new test

* remove -adapted-args
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants