diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..bac1323 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,2 @@ +language: clojure +script: "lein compile, midje" diff --git a/README.md b/README.md index 7a7b254..26a5ff8 100644 --- a/README.md +++ b/README.md @@ -7,5 +7,5 @@ Assorted serializers for Kryo. Add the following to project.clj: ```clojure -[meat-locker "0.1.2"] +[meat-locker "0.1.5"] ``` diff --git a/project.clj b/project.clj index cfdbf19..ceaea32 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.twitter/meat-locker "0.1.5-SNAPSHOT" +(defproject com.twitter/meat-locker "0.1.5" :source-path "src/clj" :java-source-path "src/jvm" :description "Grab bag of utilities for Cascading DSLs." @@ -8,6 +8,6 @@ [org.apache.thrift/libthrift "0.6.1"] [com.google.protobuf/protobuf-java "2.4.0a"] [org.apache.hadoop/hadoop-core "0.20.2-dev"] - [cascading/cascading-hadoop "2.0.0-wip-215" + [cascading/cascading-hadoop "2.0.0-wip-226" :exclusions [org.codehaus.janino/janino org.apache.hadoop/hadoop-core]]]) diff --git a/src/jvm/com/twitter/meatlocker/tap/MemorySourceTap.java b/src/jvm/com/twitter/meatlocker/tap/MemorySourceTap.java index 1a832fc..4d3cbc0 100644 --- a/src/jvm/com/twitter/meatlocker/tap/MemorySourceTap.java +++ b/src/jvm/com/twitter/meatlocker/tap/MemorySourceTap.java @@ -6,6 +6,7 @@ import cascading.scheme.SourceCall; import cascading.tap.SourceTap; import cascading.tap.Tap; +import cascading.tap.hadoop.MultiRecordReaderIterator; import cascading.tap.hadoop.RecordReaderIterator; import cascading.tuple.Fields; import cascading.tuple.Tuple; @@ -117,7 +118,13 @@ public boolean equals(Object object) { @Override public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordReader input ) throws IOException { - return new TupleEntrySchemeIterator( flowProcess, getScheme(), new RecordReaderIterator( input ) ); + if (input != null) + return new TupleEntrySchemeIterator( flowProcess, getScheme(), new RecordReaderIterator( input ) ); + + JobConf conf = flowProcess.getJobConf(); + + return new TupleEntrySchemeIterator(flowProcess, getScheme(), + new MultiRecordReaderIterator(flowProcess, this, conf), "MemoryTap: " + getIdentifier()); } @Override diff --git a/test/com/twitter/meatlocker/tap/memory-test.clj b/test/com/twitter/meatlocker/tap/memory_test.clj similarity index 69% rename from test/com/twitter/meatlocker/tap/memory-test.clj rename to test/com/twitter/meatlocker/tap/memory_test.clj index 78d6f47..8442732 100644 --- a/test/com/twitter/meatlocker/tap/memory-test.clj +++ b/test/com/twitter/meatlocker/tap/memory_test.clj @@ -1,9 +1,6 @@ (ns com.twitter.meatlocker.tap.memory-test - (:use cascalog.api - [cascalog.workflow :only (fields)]) (:require [clojure.string :as s]) - (:import [cascalog Util] - [java.util ArrayList] + (:import [java.util ArrayList] [com.twitter.meatlocker.tap MemorySourceTap] [cascading.tuple Fields] [cascading.flow.hadoop HadoopFlowProcess] @@ -14,7 +11,6 @@ (def defaults {"io.serializations" (s/join "," ["org.apache.hadoop.io.serializer.WritableSerialization" - "cascading.tuple.hadoop.BytesSerialization" "cascading.tuple.hadoop.TupleSerialization"])}) (def mk-props @@ -39,10 +35,12 @@ (doall (for [wrapper (iterator-seq it)] (into [] (.getTuple wrapper)))))) -(defn memory-tap - ([tuples] (memory-tap Fields/ALL tuples)) - ([fields-in tuple-seq] - (let [tuples (->> tuple-seq - (clojure.core/map #(Util/coerceToTuple %)) - (ArrayList.))] - (MemorySourceTap. tuples (fields fields-in))))) +(comment + "TODO: Implement coerceToTuple and fields." + (defn memory-tap + ([tuples] (memory-tap Fields/ALL tuples)) + ([fields-in tuple-seq] + (let [tuples (->> tuple-seq + (map #(Util/coerceToTuple %)) + (ArrayList.))] + (MemorySourceTap. tuples (fields fields-in))))))