mapWith, flatMapWith and filterWith #510

Merged
merged 12 commits into from Mar 23, 2013

Conversation

Projects
None yet
3 participants
@markhamstra
Contributor

markhamstra commented Mar 5, 2013

A general means to make a per-partition Thing available for use in map, flatMap and filter transformations.

Motivation came from the SparkPi example, where the Thing needed in the map is a way to get a pair of random numbers between -1 and 1. The example cheats by using scala.math.random, which serializes all the calls within a JVM to the global PRNG. Trying to avoid that by creating a PRNG val within the map function is an inefficient solution since a new PRNG is constructed for each RDD element, when what is really wanted is a single PRNG resource per partition that will be used for all of the elements in a partition. It's possible to meet that goal by using mapPartitionsWithIndex, but that requires rethinking map, flatMap and filter in terms of mapping an iterator to an iterator. The effort in this pull request is an attempt to provide a useful generalization of the per-partition Thing that preserve map, flatMap and filter semantics.

The basic idea is that a Thing-factory function will be available within a map, flatMap or filter function. That factory function will produce a Thing given an RDD element. In the motivating case of SparkPi, the Thing to be produced is a pair of random values, and the Thing-factory would simply be a function Int => (Double, Double), where the Doubles are random values between -1 and 1. In fact, it would only be _ => (Double, Double), since the RDD element is not needed to produce the pair of random values. In order to setup the Thing-factory for each partition, a factoryBuilder must be provided, which constructs the Thing-factory from the index of the partition and a seed value.

Basic examples using random number generators are in RDDSuite, but PRNGs are by no means the only Thing-makers that could be instantiated with this technique. I'll post some more examples to the Spark Developers list and see whether that generates some more feedback as to whether this approach is generally useful enough to warrant merging.

@mateiz

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 5, 2013

Owner

Hey Mark,

This definitely seems useful, but for a simpler API, what about something like this?

nums.mapWith((num: Int, rand: Random) => rand.nextInt(), (partitionId: Int) => new Random(partitionId))

That is, ask the user to keep state within the Thing they return, instead of providing a new function to deal with it. It's less functional, but it seems nicer to use, especially for the RNG case.

Also, I would change the order to give the Thing-factory function first, and maybe use two separate argument lists, like this:

nums.mapWith(partitionId => new Random(partitionId))((num, rand) => rand.nextInt())

This also allows writing the second function as a block:

nums.mapWith(partitionId => new Random(partitionId)) { (num, rand) =>
  rand.nextInt()
}
Owner

mateiz commented Mar 5, 2013

Hey Mark,

This definitely seems useful, but for a simpler API, what about something like this?

nums.mapWith((num: Int, rand: Random) => rand.nextInt(), (partitionId: Int) => new Random(partitionId))

That is, ask the user to keep state within the Thing they return, instead of providing a new function to deal with it. It's less functional, but it seems nicer to use, especially for the RNG case.

Also, I would change the order to give the Thing-factory function first, and maybe use two separate argument lists, like this:

nums.mapWith(partitionId => new Random(partitionId))((num, rand) => rand.nextInt())

This also allows writing the second function as a block:

nums.mapWith(partitionId => new Random(partitionId)) { (num, rand) =>
  rand.nextInt()
}
@markhamstra

This comment has been minimized.

Show comment Hide comment
@markhamstra

markhamstra Mar 5, 2013

Contributor

The two argument lists is a good idea, not just for block expressions, but it also opens up a nice currying option.

I'll think and work a bit more on the statefull-Thing thing.

Is this also an opportunity to address SPARK-615, since mapWith, flatMapWith and filterWith are also using two-parameter Functions?

Contributor

markhamstra commented Mar 5, 2013

The two argument lists is a good idea, not just for block expressions, but it also opens up a nice currying option.

I'll think and work a bit more on the statefull-Thing thing.

Is this also an opportunity to address SPARK-615, since mapWith, flatMapWith and filterWith are also using two-parameter Functions?

@MLnick

This comment has been minimized.

Show comment Hide comment
@MLnick

MLnick Mar 9, 2013

Contributor

@markhamstra does it make sense to also have forEachWith? I can see a use case for updating (internal or external) state in parallel, e.g. writing results to HBase or Cassandra using a per-partition client instance.

Contributor

MLnick commented Mar 9, 2013

@markhamstra does it make sense to also have forEachWith? I can see a use case for updating (internal or external) state in parallel, e.g. writing results to HBase or Cassandra using a per-partition client instance.

@mateiz

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 10, 2013

Owner

@MLnick -- something else we should do is probably a foreachPartition. But that can be a separate pull request.

Mark, in terms of this one, one other thing that's missing is a Java API version of these methods. Do you mind adding one?

Owner

mateiz commented Mar 10, 2013

@MLnick -- something else we should do is probably a foreachPartition. But that can be a separate pull request.

Mark, in terms of this one, one other thing that's missing is a Java API version of these methods. Do you mind adding one?

@markhamstra

This comment has been minimized.

Show comment Hide comment
@markhamstra

markhamstra Mar 10, 2013

Contributor

In principle, no, I don't mind working on adding the Java API versions; but as I indicated previously, this is smelling similar enough to SPARK-615's need for 2-arity Functions that a more general solution is maybe what is needed.

Contributor

markhamstra commented Mar 10, 2013

In principle, no, I don't mind working on adding the Java API versions; but as I indicated previously, this is smelling similar enough to SPARK-615's need for 2-arity Functions that a more general solution is maybe what is needed.

@mateiz

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 10, 2013

Owner

Yeah, good point. I think we'll have to add those functions. Up to you on whether you want to do them -- otherwise we can just merge this for now.

Owner

mateiz commented Mar 10, 2013

Yeah, good point. I think we'll have to add those functions. Up to you on whether you want to do them -- otherwise we can just merge this for now.

@markhamstra

This comment has been minimized.

Show comment Hide comment
@markhamstra

markhamstra Mar 10, 2013

Contributor

Later today, I'll take a look at adding foreachWith and refactoring the API a little more to get you something worth merging at least just for Scala tomorrow.

Contributor

markhamstra commented Mar 10, 2013

Later today, I'll take a look at adding foreachWith and refactoring the API a little more to get you something worth merging at least just for Scala tomorrow.

@markhamstra

This comment has been minimized.

Show comment Hide comment
@markhamstra

markhamstra Mar 11, 2013

Contributor

Okay, I refactored the API to be a little simpler to comprehend and work with. Also added foreachWith, which is a little hacky in the absence of a good foreachPartitionWithIndex. On the other hand, foreachPartition is straightforward, so I also added that.

Contributor

markhamstra commented Mar 11, 2013

Okay, I refactored the API to be a little simpler to comprehend and work with. Also added foreachWith, which is a little hacky in the absence of a good foreachPartitionWithIndex. On the other hand, foreachPartition is straightforward, so I also added that.

core/src/main/scala/spark/RDD.scala
+ new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+ }
+
+ /**

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 16, 2013

Owner

Small indentation glitch here (maybe a tab?)

@mateiz

mateiz Mar 16, 2013

Owner

Small indentation glitch here (maybe a tab?)

core/src/main/scala/spark/RDD.scala
+ * additional parameter is produced by constructorOfA, which is called in each
+ * partition with the index of that partition.
+ */
+ def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false)

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 16, 2013

Owner

I'd rename the constructorOfA argument to constructA to match other names we've had for functions

@mateiz

mateiz Mar 16, 2013

Owner

I'd rename the constructorOfA argument to constructA to match other names we've had for functions

core/src/main/scala/spark/RDD.scala
+ * partition with the index of that partition.
+ */
+ def flatMapWith[A: ClassManifest, U: ClassManifest](constructorOfA: Int => A, preservesPartitioning: Boolean = false)
+ (f:(A, T) => Seq[U]): RDD[U] = {

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 16, 2013

Owner

One other small thing: I think the functions should take arguments in the order (T, A) instead of (A, T). It doesn't matter too much, but when I see "with", I assume that will be a second argument that will be passed along, so I think that order would make more sense.

@mateiz

mateiz Mar 16, 2013

Owner

One other small thing: I think the functions should take arguments in the order (T, A) instead of (A, T). It doesn't matter too much, but when I see "with", I assume that will be a second argument that will be passed along, so I think that order would make more sense.

@markhamstra

This comment has been minimized.

Show comment Hide comment
@markhamstra

markhamstra Mar 16, 2013

Contributor

Suggested changes made.

Contributor

markhamstra commented Mar 16, 2013

Suggested changes made.

mateiz added a commit that referenced this pull request Mar 23, 2013

Merge pull request #510 from markhamstra/WithThing
mapWith, flatMapWith and filterWith

@mateiz mateiz merged commit fd53f2f into mesos:master Mar 23, 2013

@mateiz

This comment has been minimized.

Show comment Hide comment
@mateiz

mateiz Mar 23, 2013

Owner

Looks good, thanks.

Owner

mateiz commented Mar 23, 2013

Looks good, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment