Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #59 -- Custom serializers for defop params #63

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## Unreleased

* Bug fix: make vars containing predmacros work properly

## 1.8.6

* "Call to unbound-fn" Error solved. To use functions within Cascalog queries, hadoop needed to call "require" on the containing namespace. If you define functions at the repl, the namespace file might not exist, and this call will fail; previously, Cascalog would squash these exceptions.
Expand Down
3 changes: 2 additions & 1 deletion src/clj/cascalog/conf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

(def ^:dynamic *JOB-CONF* {})

(defn project-conf []
(defn project-conf [& [conf]]
(project-merge (project-settings)
(into {} conf)
*JOB-CONF*
{"io.serializations"
"cascalog.hadoop.ClojureKryoSerialization"}))
7 changes: 4 additions & 3 deletions src/clj/cascalog/rules.clj
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,10 @@

(defn- expand-predicate-macros [raw-predicates]
(mapcat (fn [[p _ vars :as raw-predicate]]
(if (p/predicate-macro? p)
(expand-predicate-macros (expand-predicate-macro p vars))
[raw-predicate]))
(let [p (if (var? p) (var-get p) p)]
(if (p/predicate-macro? p)
(expand-predicate-macros (expand-predicate-macro p vars))
[raw-predicate])))
raw-predicates))

(defn normalize-raw-predicates
Expand Down
2 changes: 1 addition & 1 deletion src/jvm/cascalog/ClojureCascadingBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ClojureCascadingBase(Fields fields, Object[] fn_spec, boolean stateful) {

@Override
public void prepare(FlowProcess flow_process, OperationCall op_call) {
this.fn_spec = (Object[]) KryoService.deserialize(serialized_spec);
this.fn_spec = KryoService.deserialize(flow_process, serialized_spec);
this.fn = Util.bootFn(fn_spec);
if (stateful) {
try {
Expand Down
89 changes: 80 additions & 9 deletions src/jvm/cascalog/KryoService.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,103 @@
package cascalog;

import cascading.CascadingException;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.FlowProcess;
import cascading.kryo.Kryo;
import cascading.kryo.KryoFactory;
import cascalog.hadoop.ClojureKryoSerialization;
import com.esotericsoftware.kryo.ObjectBuffer;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.conf.Configuration;

import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.lang.NullPointerException;
import java.lang.Object;
import java.lang.String;
import java.lang.System;
import java.lang.reflect.Array;
import java.util.Arrays;

import clojure.lang.RT;
import clojure.lang.Var;

/** User: sritchie Date: 12/16/11 Time: 8:34 PM */
public class KryoService {
public static final Logger LOG = Logger.getLogger(KryoService.class);
static ObjectBuffer kryoBuf;
static Var require = RT.var("clojure.core", "require");
static Var symbol = RT.var("clojure.core", "symbol");
static Var projectConf;
static Var hadoopJobConf;

static {
ClojureKryoSerialization serialization = new ClojureKryoSerialization();
Kryo k = serialization.populatedKryo();
k.setRegistrationOptional(true);

kryoBuf = KryoFactory.newBuffer(k);
try {
require.invoke(symbol.invoke("cascalog.conf"));
require.invoke(symbol.invoke("hadoop-util.core"));
} catch (Exception e) {
e.printStackTrace();
}
projectConf = RT.var("cascalog.conf", "project-conf");
hadoopJobConf = RT.var("hadoop-util.core", "job-conf");
}

public static byte[] serialize(Object obj) {
LOG.debug("Serializing " + obj);
return kryoBuf.writeClassAndObject(obj);
public static byte[] serialize(Object[] objs) {
LOG.debug("Serializing " + objs);
Configuration conf = (Configuration) hadoopJobConf.invoke(projectConf.invoke());
SerializationFactory factory = new SerializationFactory(conf);

Serializer<Object> serializer;
try {
serializer = factory.getSerializer(Object.class);
} catch (NullPointerException e) {
// for compatability with the expected behavior documented by the
// java.util.GregorianCalendar test in cascalog.conf-test
// TODO or... should we change the test?
throw new CascadingException("No serializer found", e);
}

ByteArrayOutputStream stream = new ByteArrayOutputStream();
try {
serializer.open(stream);
serializer.serialize(objs);
serializer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
return stream.toByteArray();
}

public static Object deserialize(byte[] serialized) {
Object o = kryoBuf.readClassAndObject(serialized);
LOG.debug("Deserialized " + o);
return o;
public static Object[] deserialize(FlowProcess flow_process, byte[] serialized) {
Object[] objs = null;

Configuration flowConf = null;
if (flow_process != null && flow_process instanceof HadoopFlowProcess) {
// flow_process is null when called from KryoInsert
// flow_process is not an instance of HadoopFlowProcess when called via bridge-test
flowConf = ((HadoopFlowProcess) flow_process).getJobConf();
}
Configuration conf = (Configuration) hadoopJobConf.invoke(projectConf.invoke(flowConf));

SerializationFactory factory = new SerializationFactory(conf);
Deserializer<Object[]> deserializer = factory.getDeserializer(Object[].class);

ByteArrayInputStream stream = new ByteArrayInputStream(serialized);
try {
deserializer.open(stream);
objs = deserializer.deserialize(null);
deserializer.close();
} catch (Exception e) {
throw new RuntimeException(e);
}

LOG.debug("Deserialized " + objs);
return objs;
}
}
5 changes: 4 additions & 1 deletion src/jvm/cascalog/ops/KryoInsert.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ public KryoInsert(Fields fieldDeclaration, Object... values) {

public Tuple getTuple() {
if (this.values == null) {
Object[] values = (Object[]) KryoService.deserialize(this.serialized);
Object[] values = (Object[]) KryoService.deserialize(null, this.serialized);
this.values = new Tuple(values);
}
return this.values;
}

@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
// TODO we could pass flowProcess through getTuple to KryoService#deserialize here,
// but since the rest of this class doesn't have a FlowProcess to use in
// ser/deser, we could run into trouble if we're not consistent.
functionCall.getOutputCollector().add( new Tuple( getTuple() ) );
}

Expand Down
41 changes: 41 additions & 0 deletions src/jvm/cascalog/test/StringBufferKryoSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2010 Nathan Marz

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package cascalog.test;

import java.lang.Class;
import java.lang.Object;
import java.lang.Override;
import java.lang.StringBuffer;
import java.nio.ByteBuffer;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.serialize.StringSerializer;

public class StringBufferKryoSerializer extends Serializer {

@Override
public void writeObjectData(ByteBuffer byteBuffer, Object o) {
StringSerializer.put(byteBuffer,"foo");
}

@Override
public StringBuffer readObjectData(ByteBuffer byteBuffer, Class type) {
String s = StringSerializer.get(byteBuffer);
return new StringBuffer(s);
}

}
18 changes: 18 additions & 0 deletions test/cascalog/defops_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
([state x] (+ x y state))
([state] nil))

(defmapop [string-stateful [y]]
"append \"-\" y and \"-string\""
{:stateful true}
([] "-string")
([state x] (str x "-" y state))
([state] nil))

(deftest defops-arg-parsing-test
(let [src [[1] [2]]
mk-query (fn [afn]
Expand All @@ -49,6 +56,17 @@
ident-meta
ident-both)))

(deftest defops-custom-serializer-for-param-test
; Normally, the result would be "one-bar-string", but we have a custom
; serializer for StringBuffer. Using this serializer, all StringBuffers are
; serialized with contents "foo".
(with-job-conf
{"cascading.kryo.registrations" "java.lang.StringBuffer,cascalog.test.StringBufferKryoSerializer"}
(fact?<- [["one-foo-string"]]
[?y]
([["one"]] ?x)
(string-stateful [(StringBuffer. "bar")] ?x :> ?y))))

(facts "Metadata testing."
"Both function and var should contain custom metadata."
(meta ident-stateful) => (contains {:great-meta "yes!"})
Expand Down
8 changes: 5 additions & 3 deletions test/cascalog/pred_macro_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
(:require [cascalog.ops :as c]
[cascalog.io :as io]))

(def mac2
(<- [:< ?a]
(* ?a ?a :> ?a)))

(deftest test-predicate-macro
(let [mac1 (<- [?a :> ?b ?c]
(+ ?a 1 :> ?t)
(* ?t 2 :> ?b)
(+ ?a ?t :> ?c))
mac2 (<- [:< ?a]
(* ?a ?a :> ?a))
mac3 (<- [?a :> ?b]
(+ ?a ?a :> ?b))
num1 [[0] [1] [2] [3]]]
Expand All @@ -26,7 +28,7 @@
(test?<- [[0] [1]]
[?n]
(num1 ?n)
(mac2 ?n))
(#'mac2 ?n))

;; test that it allows same var used as input and output
(test?<- [[0]]
Expand Down