Skip to content

Commit

Permalink
Merge branch 'release/0.13.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
isnotinvain committed Feb 4, 2015
2 parents 4decf8b + 1261a10 commit 83dba38
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 10 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
@@ -1,5 +1,12 @@
# Scalding #

### Version 0.13.1 ###
* Back out 4 changes to be binary compatible: https://github.com/twitter/scalding/pull/1187
* Use java.util.Random instead of scala.util.Random: https://github.com/twitter/scalding/pull/1186
* Add Execution.failed: https://github.com/twitter/scalding/pull/1185
* Using a ConcurrentHashMap instead of a WeakHashMap to make the Stats behave in a correct manner: https://github.com/twitter/scalding/pull/1184
* Add applicative for Execution: https://github.com/twitter/scalding/pull/1181

### Version 0.13.0 ###
* Covert LzoTextDelimited to Cascading scheme.: https://github.com/twitter/scalding/pull/1179
* Make TraceUtil support versions of cascading older than 2.6: https://github.com/twitter/scalding/pull/1180
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs.

![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png)

Current version: `0.13.0`
Current version: `0.13.1`

## Word Count

Expand Down
3 changes: 2 additions & 1 deletion scalding-core/src/main/scala/com/twitter/package.scala
Expand Up @@ -30,10 +30,11 @@ package object scalding {
type KeyedList[K, +V] = com.twitter.scalding.typed.KeyedList[K, V]
type ValuePipe[+T] = com.twitter.scalding.typed.ValuePipe[T]
type Grouped[K, +V] = com.twitter.scalding.typed.Grouped[K, V]

/**
* Make sure this is in sync with version.sbt
*/
val scaldingVersion: String = "0.13.0"
val scaldingVersion: String = "0.13.1"

object RichPathFilter {
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)
Expand Down
Expand Up @@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
*/
package com.twitter.scalding

import com.twitter.algebird.monad.Reader
Expand Down Expand Up @@ -193,6 +193,7 @@ object Execution {
override def apply[T](t: T): Execution[T] = Execution.from(t)
override def map[T, U](e: Execution[T])(fn: T => U): Execution[U] = e.map(fn)
override def flatMap[T, U](e: Execution[T])(fn: T => Execution[U]): Execution[U] = e.flatMap(fn)
override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u)
}

trait EvalCache { self =>
Expand Down Expand Up @@ -399,6 +400,13 @@ object Execution {
case Success(s) => Future.successful(s)
case Failure(err) => Future.failed(err)
}

/**
* This creates a definitely failed Execution.
*/
def failed(t: Throwable): Execution[Nothing] =
fromFuture(_ => Future.failed(t))

/**
* This makes a constant execution that runs no job.
* Note this is a lazy parameter that is evaluated every
Expand Down
Expand Up @@ -27,7 +27,7 @@ import cascading.operation.filter._
import cascading.tuple._
import cascading.cascade._

import scala.util.Random
import java.util.Random // this one is serializable, scala.util.Random is not
import scala.collection.JavaConverters._

object JoinAlgorithms extends java.io.Serializable {
Expand Down
Expand Up @@ -466,7 +466,7 @@ package com.twitter.scalding {
}
}

class SampleWithReplacement(frac: Double, val seed: Int = new scala.util.Random().nextInt) extends BaseOperation[Poisson]()
class SampleWithReplacement(frac: Double, val seed: Int = new java.util.Random().nextInt) extends BaseOperation[Poisson]()
with Function[Poisson] with ScaldingPrepare[Poisson] {
override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[Poisson]) {
super.prepare(flowProcess, operationCall)
Expand Down
4 changes: 2 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Stats.scala
Expand Up @@ -2,7 +2,7 @@ package com.twitter.scalding

import cascading.flow.{ FlowDef, FlowProcess }
import cascading.stats.CascadingStats
import java.util.{ Collections, WeakHashMap }
import java.util.concurrent.ConcurrentHashMap
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -109,7 +109,7 @@ object RuntimeStats extends java.io.Serializable {
@transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)

private val flowMappingStore: mutable.Map[String, WeakReference[FlowProcess[_]]] =
Collections.synchronizedMap(new WeakHashMap[String, WeakReference[FlowProcess[_]]])
new ConcurrentHashMap[String, WeakReference[FlowProcess[_]]]

def getFlowProcessForUniqueId(uniqueId: UniqueID): FlowProcess[_] = {
(for {
Expand Down
Expand Up @@ -28,7 +28,7 @@ import cascading.flow.FlowDef
import cascading.pipe.{ Each, Pipe }
import cascading.tap.Tap
import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry }
import util.Random
import java.util.Random // prefer to scala.util.Random as this is serializable

import scala.concurrent.Future

Expand Down Expand Up @@ -400,7 +400,7 @@ trait TypedPipe[+T] extends Serializable {
*/
def sample(percent: Double, seed: Long): TypedPipe[T] = {
// Make sure to fix the seed, otherwise restarts cause subtle errors
val rand = new Random(seed)
lazy val rand = new Random(seed)
filter(_ => rand.nextDouble < percent)
}

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
@@ -1 +1 @@
version in ThisBuild := "0.13.0"
version in ThisBuild := "0.13.1"

0 comments on commit 83dba38

Please sign in to comment.