Skip to content

Commit

Permalink
fixed openForRead on memory source tap.
Browse files Browse the repository at this point in the history
  • Loading branch information
sritchie committed Feb 22, 2012
1 parent a883834 commit e30391b
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
@@ -0,0 +1,2 @@
language: clojure
script: "lein compile, midje"
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -7,5 +7,5 @@ Assorted serializers for Kryo.
Add the following to project.clj:

```clojure
[meat-locker "0.1.2"]
[meat-locker "0.1.5"]
```
4 changes: 2 additions & 2 deletions project.clj
@@ -1,4 +1,4 @@
(defproject com.twitter/meat-locker "0.1.5-SNAPSHOT"
(defproject com.twitter/meat-locker "0.1.5"
:source-path "src/clj"
:java-source-path "src/jvm"
:description "Grab bag of utilities for Cascading DSLs."
Expand All @@ -8,6 +8,6 @@
[org.apache.thrift/libthrift "0.6.1"]
[com.google.protobuf/protobuf-java "2.4.0a"]
[org.apache.hadoop/hadoop-core "0.20.2-dev"]
[cascading/cascading-hadoop "2.0.0-wip-215"
[cascading/cascading-hadoop "2.0.0-wip-226"
:exclusions [org.codehaus.janino/janino
org.apache.hadoop/hadoop-core]]])
9 changes: 8 additions & 1 deletion src/jvm/com/twitter/meatlocker/tap/MemorySourceTap.java
Expand Up @@ -6,6 +6,7 @@
import cascading.scheme.SourceCall;
import cascading.tap.SourceTap;
import cascading.tap.Tap;
import cascading.tap.hadoop.MultiRecordReaderIterator;
import cascading.tap.hadoop.RecordReaderIterator;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
Expand Down Expand Up @@ -117,7 +118,13 @@ public boolean equals(Object object) {

@Override
public TupleEntryIterator openForRead( HadoopFlowProcess flowProcess, RecordReader input ) throws IOException {
return new TupleEntrySchemeIterator( flowProcess, getScheme(), new RecordReaderIterator( input ) );
if (input != null)
return new TupleEntrySchemeIterator( flowProcess, getScheme(), new RecordReaderIterator( input ) );

JobConf conf = flowProcess.getJobConf();

return new TupleEntrySchemeIterator(flowProcess, getScheme(),
new MultiRecordReaderIterator(flowProcess, this, conf), "MemoryTap: " + getIdentifier());
}

@Override
Expand Down
@@ -1,9 +1,6 @@
(ns com.twitter.meatlocker.tap.memory-test
(:use cascalog.api
[cascalog.workflow :only (fields)])
(:require [clojure.string :as s])
(:import [cascalog Util]
[java.util ArrayList]
(:import [java.util ArrayList]
[com.twitter.meatlocker.tap MemorySourceTap]
[cascading.tuple Fields]
[cascading.flow.hadoop HadoopFlowProcess]
Expand All @@ -14,7 +11,6 @@
(def defaults
{"io.serializations"
(s/join "," ["org.apache.hadoop.io.serializer.WritableSerialization"
"cascading.tuple.hadoop.BytesSerialization"
"cascading.tuple.hadoop.TupleSerialization"])})

(def mk-props
Expand All @@ -39,10 +35,12 @@
(doall (for [wrapper (iterator-seq it)]
(into [] (.getTuple wrapper))))))

(defn memory-tap
([tuples] (memory-tap Fields/ALL tuples))
([fields-in tuple-seq]
(let [tuples (->> tuple-seq
(clojure.core/map #(Util/coerceToTuple %))
(ArrayList.))]
(MemorySourceTap. tuples (fields fields-in)))))
(comment
"TODO: Implement coerceToTuple and fields."
(defn memory-tap
([tuples] (memory-tap Fields/ALL tuples))
([fields-in tuple-seq]
(let [tuples (->> tuple-seq
(map #(Util/coerceToTuple %))
(ArrayList.))]
(MemorySourceTap. tuples (fields fields-in))))))

0 comments on commit e30391b

Please sign in to comment.