Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Addresses Argyris's comments

  • Loading branch information...
commit 7cd6277b31e6b8b79c7becfed0d3c3a1330073d1 1 parent 77ea258
@johnynek johnynek authored
View
7 src/main/scala/com/twitter/scalding/TupleConversions.scala
@@ -43,6 +43,13 @@ trait TupleConversions extends GeneratedConversions {
}
}
+ def tupleAt(idx: Int)(tup: CTuple): CTuple = {
+ val obj = tup.getObject(idx)
+ val res = CTuple.size(1)
+ res.set(0, obj)
+ res
+ }
+
implicit object TupleEntryConverter extends TupleConverter[TupleEntry] {
override def apply(tup : TupleEntry) = tup
override def arity = -1
View
53 src/main/scala/com/twitter/scalding/typed/CoGrouped2.scala
@@ -52,52 +52,6 @@ class CoGrouped2[K,V,W,R](left: Grouped[K,V],
}
}
-object Joiner extends java.io.Serializable {
- def getOne[K](k1 : Option[K], k2 : Option[K]) = k1.getOrElse(k2.get)
-
- def tupleAt(idx: Int)(tup: CTuple): CTuple = {
- val obj = tup.getObject(idx)
- val res = CTuple.size(1)
- res.set(0, obj)
- res
- }
-
- // Returns the key from the FIRST tuple. Suitable for a single JoinerClosure
- def getKeyValue[K](tupit: java.util.Iterator[CTuple]): (Option[K], Iterator[CTuple]) = {
- val stupit = tupit.asScala
- if (stupit.isEmpty) {
- (None, stupit)
- }
- else {
- val first = stupit.next
- val key = Some(first.getObject(0).asInstanceOf[K])
- val value = Iterator(tupleAt(1)(first))
- (key, value ++ stupit.map { tupleAt(1) })
- }
- }
-
- def inner2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
- itv.flatMap { v => itu().map { u => (v,u) } }
- }
- def asOuter[U](it : Iterator[U]) : Iterator[Option[U]] = {
- if(it.isEmpty) {
- Iterator(None)
- }
- else {
- it.map { Some(_) }
- }
- }
- def outer2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
- asOuter(itv).flatMap { v => asOuter(itu()).map { u => (v,u) } }
- }
- def left2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
- itv.flatMap { v => asOuter(itu()).map { u => (v,u) } }
- }
- def right2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
- asOuter(itv).flatMap { v => itu().map { u => (v,u) } }
- }
-}
-
class Joiner2[K,V,W,R](leftGetter : Iterator[CTuple] => Iterator[V],
rightGetter: Iterator[CTuple] => Iterator[W],
joiner: (K, Iterator[V], () => Iterator[W]) => Iterator[R]) extends CJoiner {
@@ -110,12 +64,13 @@ class Joiner2[K,V,W,R](leftGetter : Iterator[CTuple] => Iterator[V],
// It's safe to call getIterator more than once on index > 0
val (rkopt, _) = getKeyValue[K](jc.getIterator(1))
// Try to get from the right-hand-side
- val goodKey = getOne(lkopt, rkopt)
+ val goodKey = lkopt.orElse(rkopt).get
- val rightG = { () => rightGetter(jc.getIterator(1).asScala.map { tupleAt(1) }) }
+ val rightG = { () => rightGetter(jc.getIterator(1).asScala.map { Dsl.tupleAt(1) }) }
joiner(goodKey, leftGetter(left), rightG).map { rval =>
- // There always has to be four resuling fields:
+ // There always has to be four resulting fields
+ // or otherwise the flow planner will throw
val res = CTuple.size(4)
res.set(0, goodKey)
res.set(1, rval)
View
60 src/main/scala/com/twitter/scalding/typed/Joiner.scala
@@ -0,0 +1,60 @@
+/*
+Copyright 2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package com.twitter.scalding.typed
+
+import cascading.tuple.{Tuple => CTuple}
+
+import com.twitter.scalding._
+
+import scala.collection.JavaConverters._
+
+object Joiner extends java.io.Serializable {
+ // Returns the key from the FIRST tuple. Suitable for a single JoinerClosure
+ def getKeyValue[K](tupit: java.util.Iterator[CTuple]): (Option[K], Iterator[CTuple]) = {
+ val stupit = tupit.asScala
+ if (stupit.isEmpty) {
+ (None, stupit)
+ }
+ else {
+ val first = stupit.next
+ val key = Some(first.getObject(0).asInstanceOf[K])
+ val value = Iterator(Dsl.tupleAt(1)(first))
+ (key, value ++ stupit.map { Dsl.tupleAt(1) })
+ }
+ }
+
+ def inner2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ itv.flatMap { v => itu().map { u => (v,u) } }
+ }
+ def asOuter[U](it : Iterator[U]) : Iterator[Option[U]] = {
+ if(it.isEmpty) {
+ Iterator(None)
+ }
+ else {
+ it.map { Some(_) }
+ }
+ }
+ def outer2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ asOuter(itv).flatMap { v => asOuter(itu()).map { u => (v,u) } }
+ }
+ def left2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ itv.flatMap { v => asOuter(itu()).map { u => (v,u) } }
+ }
+ def right2[K,V,U] = { (key: K, itv: Iterator[V], itu: () => Iterator[U]) =>
+ asOuter(itv).flatMap { v => itu().map { u => (v,u) } }
+ }
+ // TODO: implement CoGroup3, and inner3, outer3 (probably best to leave the other modes as custom)
+}
Please sign in to comment.
Something went wrong with that request. Please try again.