From 707bbc03a349ed0a56e8a3720fd6617e83ccbb3a Mon Sep 17 00:00:00 2001 From: Josh Wills Date: Tue, 21 Feb 2012 22:37:02 -0800 Subject: [PATCH] Simplify in-memory collection usage in Scrunch with a Mem object --- .../scala/com/cloudera/scrunch/.Mem.scala.swp | Bin 0 -> 12288 bytes .../main/scala/com/cloudera/scrunch/Mem.scala | 49 ++++++++++++++++++ .../scala/com/cloudera/scrunch/TopTest.scala | 6 +++ .../cloudera/crunch/impl/mem/MemPipeline.java | 13 +++++ 4 files changed, 68 insertions(+) create mode 100644 scrunch/src/main/scala/com/cloudera/scrunch/.Mem.scala.swp create mode 100644 scrunch/src/main/scala/com/cloudera/scrunch/Mem.scala diff --git a/scrunch/src/main/scala/com/cloudera/scrunch/.Mem.scala.swp b/scrunch/src/main/scala/com/cloudera/scrunch/.Mem.scala.swp new file mode 100644 index 0000000000000000000000000000000000000000..75dbafcabff2592c44fa7812ef0971c9d18467f0 GIT binary patch literal 12288 zcmeI2O>ZMb5QclXu)r=L_yH8ziyfSHl0`y9S|qQNRh%z}*qdk-MNwzmwmY7g9%kD1 zMp5{XxbS)Bz#TXv!3l{A2Yvv812+yJA^rde@%DJec6QOiN?d~4(yN)Csj9Aex_YE2 zeKfyY+oYHHBE#=##y)s&xgN~F%vPUd>{usV`}m)JYp|Q!Q@)=hbDP4usdaB|JJ#IS zp75ufU$+6vEjcKAaX<9Bab3`QQJ4 zJjK}8&{xo1=tbyPoC){^`U3hKQqXhIcUKwv7WxMI6#4{u3wjkQK!09=AM`Wy6ZA3k z5%fM3L$5)%pcUx%Cm8z?`T_bLdJ}p9x&mE>9)&)8oUsp~ccFKnYtV1V|4Zl?YC~x~ z7jQjX%8q~|;0QPZj({WZ{~&P2h^&Z0Ii&5TIP&HGdWm)q3OLocts?2GP;zq;$yWNZ z-Y!ITR3eeAiGipYoP>l^!Ha~7lVY)$MjHs9QaieVYnvNuV|{zPBMm~dizSK%L1QJ&a-!Io1zC>B%@`N~s< zqTUipMG>p}>jyTC@o@K`XzwQ!kjE)QMI%Oe6&or+4K}}}!`TO?F`9k$znDE;p{n+M z=^3TN{pLX-6h;l?1m0+jr0-k2iC?;cWAOK{<7k3$Ro-~QdfQH!ji&%N&#^UQn0Su+ z$;9wJk8?V7A!TaL0iAwtr^Z`qoyXpSc@)$3;FTxH#&3oT&o3(s!mWRX{ zW}cs6=A&cI-0lsItoc;qJjniZ8{MDo$}`3y;q^pE(z(Fd0qPvqhVo>XNN&NIuB0TA zo@y(PY{q@--O;E%v=k#52P#QHq}Y5FD1%N180v>OdTk@vK-K!za;;gbZ#AfXiz-`p zXuY7!QnRZPN_5){MJx&TEo|{v9rcZD zkq(gt%~TpxYs}EiN~6{&(O#{&TEE?-y~@r`Wvf|RK@K|zWuw_&*O+LjTpkXGTmVp4 zavgWd_yAc9NqM7IUD;}^%q{Zy%n9`HeX^}MY9mW2r$%>lp8|10`pDogItj3jFv7QQ zVnxE($#+LRM2JiU`+6peGE?N^IH5MwPb>=5+E%^$s)Zh>JnJcQ zqv-@WjhBNeX_0xxrL3qb(u`H7YpCEAX>oo5AMbmDSA3s#tVu#UGLi8B%arbmzW~8$ Bcy#~( literal 0 HcmV?d00001 diff --git a/scrunch/src/main/scala/com/cloudera/scrunch/Mem.scala b/scrunch/src/main/scala/com/cloudera/scrunch/Mem.scala new file mode 100644 index 00000000..c1a03abd --- /dev/null +++ b/scrunch/src/main/scala/com/cloudera/scrunch/Mem.scala @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package com.cloudera.scrunch + +import com.cloudera.crunch.{Pair => P} +import com.cloudera.crunch.impl.mem.MemPipeline +import java.lang.{Iterable => JIterable} +import scala.collection.JavaConversions._ +import Conversions._ + +/** + * Object for working with in-memory PCollection and PTable instances. + */ +object Mem { + private val ptf = Avros + + def pipeline = MemPipeline.getInstance() + + def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = { + collectionOf(List(ts:_*)) + } + + def collectionOf[T](collect: Iterable[T])(implicit pt: PTypeH[T]): PCollection[T] = { + val native = MemPipeline.typedCollectionOf(pt.get(ptf), asJavaIterable(collect)) + new PCollection[T](native) + } + + def tableOf[K, V](pairs: (K, V)*)(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = { + tableOf(List(pairs:_*)) + } + + def tableOf[K, V](pairs: Iterable[(K, V)])(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = { + val cpairs = pairs.map(kv => P.of(kv._1, kv._2)) + val ptype = ptf.tableOf(pk.get(ptf), pv.get(ptf)) + new PTable[K, V](MemPipeline.typedTableOf(ptype, asJavaIterable(cpairs))) + } +} diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala index 2ded239f..a6ff5eb1 100644 --- a/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala +++ b/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala @@ -21,6 +21,12 @@ import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test class TopTest extends JUnitSuite { + + @Test def topInMem { + val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729)) + assert(ptable.top(1, true).materialize.head == ("baz", 1729)) + } + @Test def top2 { val pipeline = new Pipeline[TopTest] val input = FileHelper.createTempCopyOf("shakes.txt") diff --git a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java index 25a779ed..669fc5b8 100644 --- a/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java +++ b/src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java @@ -37,6 +37,7 @@ import com.cloudera.crunch.io.ReadableSourceTarget; import com.cloudera.crunch.io.text.TextFileTarget; import com.cloudera.crunch.type.PTableType; +import com.cloudera.crunch.type.PType; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -58,6 +59,14 @@ public static PCollection collectionOf(Iterable collect) { return new MemCollection(collect); } + public static PCollection typedCollectionOf(PType ptype, T... ts) { + return new MemCollection(ImmutableList.copyOf(ts), ptype, null); + } + + public static PCollection typedCollectionOf(PType ptype, Iterable collect) { + return new MemCollection(collect, ptype, null); + } + public static PTable tableOf(S s, T t, Object... more) { List> pairs = Lists.newArrayList(); pairs.add(Pair.of(s, t)); @@ -80,6 +89,10 @@ public static PTable tableOf(Iterable> pairs) { return new MemTable(pairs); } + public static PTable typedTableOf(PTableType ptype, Iterable> pairs) { + return new MemTable(pairs, ptype, null); + } + private Configuration conf = new Configuration(); private MemPipeline() {