Skip to content

Commit

Permalink
Simplify in-memory collection usage in Scrunch with a Mem object
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Wills committed Feb 22, 2012
1 parent 7ecccbd commit 707bbc0
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
Binary file not shown.
49 changes: 49 additions & 0 deletions scrunch/src/main/scala/com/cloudera/scrunch/Mem.scala
@@ -0,0 +1,49 @@
/**
* Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
*
* Cloudera, Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for
* the specific language governing permissions and limitations under the
* License.
*/
package com.cloudera.scrunch

import com.cloudera.crunch.{Pair => P}
import com.cloudera.crunch.impl.mem.MemPipeline
import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._
import Conversions._

/**
* Object for working with in-memory PCollection and PTable instances.
*/
object Mem {
private val ptf = Avros

def pipeline = MemPipeline.getInstance()

def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = {
collectionOf(List(ts:_*))
}

def collectionOf[T](collect: Iterable[T])(implicit pt: PTypeH[T]): PCollection[T] = {
val native = MemPipeline.typedCollectionOf(pt.get(ptf), asJavaIterable(collect))
new PCollection[T](native)
}

def tableOf[K, V](pairs: (K, V)*)(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
tableOf(List(pairs:_*))
}

def tableOf[K, V](pairs: Iterable[(K, V)])(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
val cpairs = pairs.map(kv => P.of(kv._1, kv._2))
val ptype = ptf.tableOf(pk.get(ptf), pv.get(ptf))
new PTable[K, V](MemPipeline.typedTableOf(ptype, asJavaIterable(cpairs)))
}
}
6 changes: 6 additions & 0 deletions scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala
Expand Up @@ -21,6 +21,12 @@ import org.scalatest.junit.JUnitSuite
import _root_.org.junit.Test

class TopTest extends JUnitSuite {

@Test def topInMem {
val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729))
assert(ptable.top(1, true).materialize.head == ("baz", 1729))
}

@Test def top2 {
val pipeline = new Pipeline[TopTest]
val input = FileHelper.createTempCopyOf("shakes.txt")
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/cloudera/crunch/impl/mem/MemPipeline.java
Expand Up @@ -37,6 +37,7 @@
import com.cloudera.crunch.io.ReadableSourceTarget;
import com.cloudera.crunch.io.text.TextFileTarget;
import com.cloudera.crunch.type.PTableType;
import com.cloudera.crunch.type.PType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

Expand All @@ -58,6 +59,14 @@ public static <T> PCollection<T> collectionOf(Iterable<T> collect) {
return new MemCollection<T>(collect);
}

public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, T... ts) {
return new MemCollection<T>(ImmutableList.copyOf(ts), ptype, null);
}

public static <T> PCollection<T> typedCollectionOf(PType<T> ptype, Iterable<T> collect) {
return new MemCollection<T>(collect, ptype, null);
}

public static <S, T> PTable<S, T> tableOf(S s, T t, Object... more) {
List<Pair<S, T>> pairs = Lists.newArrayList();
pairs.add(Pair.of(s, t));
Expand All @@ -80,6 +89,10 @@ public static <S, T> PTable<S, T> tableOf(Iterable<Pair<S, T>> pairs) {
return new MemTable<S, T>(pairs);
}

public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> ptype, Iterable<Pair<S, T>> pairs) {
return new MemTable<S, T>(pairs, ptype, null);
}

private Configuration conf = new Configuration();

private MemPipeline() {
Expand Down

0 comments on commit 707bbc0

Please sign in to comment.