Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'release/0.13.1'

  • Loading branch information...
commit 83dba385958df2dd7c21c76d51131d5517ef0fcb 2 parents 4decf8b + 1261a10
@isnotinvain isnotinvain authored
View
7 CHANGES.md
@@ -1,5 +1,12 @@
# Scalding #
+### Version 0.13.1 ###
+* Back out 4 changes to be binary compatible: https://github.com/twitter/scalding/pull/1187
+* Use java.util.Random instead of scala.util.Random: https://github.com/twitter/scalding/pull/1186
+* Add Execution.failed: https://github.com/twitter/scalding/pull/1185
+* Using a ConcurrentHashMap instead of a WeakHashMap to make the Stats behave in a correct manner: https://github.com/twitter/scalding/pull/1184
+* Add applicative for Execution: https://github.com/twitter/scalding/pull/1181
+
### Version 0.13.0 ###
* Covert LzoTextDelimited to Cascading scheme.: https://github.com/twitter/scalding/pull/1179
* Make TraceUtil support versions of cascading older than 2.6: https://github.com/twitter/scalding/pull/1180
View
2  README.md
@@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs.
![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png)
-Current version: `0.13.0`
+Current version: `0.13.1`
## Word Count
View
3  scalding-core/src/main/scala/com/twitter/package.scala
@@ -30,10 +30,11 @@ package object scalding {
type KeyedList[K, +V] = com.twitter.scalding.typed.KeyedList[K, V]
type ValuePipe[+T] = com.twitter.scalding.typed.ValuePipe[T]
type Grouped[K, +V] = com.twitter.scalding.typed.Grouped[K, V]
+
/**
* Make sure this is in sync with version.sbt
*/
- val scaldingVersion: String = "0.13.0"
+ val scaldingVersion: String = "0.13.1"
object RichPathFilter {
implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f)
View
10 scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
@@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-*/
+ */
package com.twitter.scalding
import com.twitter.algebird.monad.Reader
@@ -193,6 +193,7 @@ object Execution {
override def apply[T](t: T): Execution[T] = Execution.from(t)
override def map[T, U](e: Execution[T])(fn: T => U): Execution[U] = e.map(fn)
override def flatMap[T, U](e: Execution[T])(fn: T => Execution[U]): Execution[U] = e.flatMap(fn)
+ override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u)
}
trait EvalCache { self =>
@@ -399,6 +400,13 @@ object Execution {
case Success(s) => Future.successful(s)
case Failure(err) => Future.failed(err)
}
+
+ /**
+ * This creates a definitely failed Execution.
+ */
+ def failed(t: Throwable): Execution[Nothing] =
+ fromFuture(_ => Future.failed(t))
+
/**
* This makes a constant execution that runs no job.
* Note this is a lazy parameter that is evaluated every
View
2  scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala
@@ -27,7 +27,7 @@ import cascading.operation.filter._
import cascading.tuple._
import cascading.cascade._
-import scala.util.Random
+import java.util.Random // this one is serializable, scala.util.Random is not
import scala.collection.JavaConverters._
object JoinAlgorithms extends java.io.Serializable {
View
2  scalding-core/src/main/scala/com/twitter/scalding/Operations.scala
@@ -466,7 +466,7 @@ package com.twitter.scalding {
}
}
- class SampleWithReplacement(frac: Double, val seed: Int = new scala.util.Random().nextInt) extends BaseOperation[Poisson]()
+ class SampleWithReplacement(frac: Double, val seed: Int = new java.util.Random().nextInt) extends BaseOperation[Poisson]()
with Function[Poisson] with ScaldingPrepare[Poisson] {
override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[Poisson]) {
super.prepare(flowProcess, operationCall)
View
4 scalding-core/src/main/scala/com/twitter/scalding/Stats.scala
@@ -2,7 +2,7 @@ package com.twitter.scalding
import cascading.flow.{ FlowDef, FlowProcess }
import cascading.stats.CascadingStats
-import java.util.{ Collections, WeakHashMap }
+import java.util.concurrent.ConcurrentHashMap
import org.slf4j.{ Logger, LoggerFactory }
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -109,7 +109,7 @@ object RuntimeStats extends java.io.Serializable {
@transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
private val flowMappingStore: mutable.Map[String, WeakReference[FlowProcess[_]]] =
- Collections.synchronizedMap(new WeakHashMap[String, WeakReference[FlowProcess[_]]])
+ new ConcurrentHashMap[String, WeakReference[FlowProcess[_]]]
def getFlowProcessForUniqueId(uniqueId: UniqueID): FlowProcess[_] = {
(for {
View
4 scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala
@@ -28,7 +28,7 @@ import cascading.flow.FlowDef
import cascading.pipe.{ Each, Pipe }
import cascading.tap.Tap
import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry }
-import util.Random
+import java.util.Random // prefer to scala.util.Random as this is serializable
import scala.concurrent.Future
@@ -400,7 +400,7 @@ trait TypedPipe[+T] extends Serializable {
*/
def sample(percent: Double, seed: Long): TypedPipe[T] = {
// Make sure to fix the seed, otherwise restarts cause subtle errors
- val rand = new Random(seed)
+ lazy val rand = new Random(seed)
filter(_ => rand.nextDouble < percent)
}
View
2  version.sbt
@@ -1 +1 @@
-version in ThisBuild := "0.13.0"
+version in ThisBuild := "0.13.1"
Please sign in to comment.
Something went wrong with that request. Please try again.