Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Issue #59 -- Custom serializers for defop params #63

Closed
wants to merge 3 commits into from

4 participants

@mlimotte

Addresses #59

Probably not quite ready for prime-time. It seems to work, but needs some tweaking. See comments in the commit.

src/jvm/cascalog/KryoService.java
((14 lines not shown))
/** User: sritchie Date: 12/16/11 Time: 8:34 PM */
public class KryoService {
public static final Logger LOG = Logger.getLogger(KryoService.class);
static ObjectBuffer kryoBuf;
+ static Var require = RT.var("clojure.core", "require");

I'm using this code to get the current project JobConf (line 47 and line 63). Is there a better way?

@sritchie Collaborator

Hey Marc,

Thanks for the pull req! The problem with doing it this way is that we're not going to pick up cluster-wide configuration settings, or settings that are supplied dynamically using with-job-conf.

The right way to do this is to modify KryoService to accept a JobConf, and use this to build a SerializationFactory in the same way that the MemorySourceTap does it:

https://github.com/nathanmarz/cascalog/blob/master/src/jvm/cascalog/TupleMemoryInputFormat.java#L133

This is almost exactly what we need to do. I think you can downcast the FlowProcess that comes in through a BaseOperation's prepare method to a JobConf and pass this through. What do you think?

I considered this approach the first time, but the issue I saw is that while ClojureCascadingBase#prepare has a FlowProcess instance, which I can pass to KryoService#deserialize. The ClojureCascadingBase#initialize method has no FlowProcess for passing to KryoService#serialize.

Also, I didn't look too closely, because of the issue above, but can we be confident that the FlowProcess being passed in is actually an instance of cascading.flow.hadoop.HadoopFlowProcess? That way I can cast to that, and use .getJobConf().

@sritchie Collaborator

Yup, we can definitely be confident -- Cascalog's going to need a bit of work before it supports local mode.

JobConf jc = ((HadoopFlowProcess) flowProcess).getJobConf();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@Quantisan
Collaborator

what does this do please?

@sritchie
Collaborator

Closing as a duplicate of #65

@sritchie sritchie closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 20, 2012
  1. update changelog

    authored
Commits on Apr 3, 2012
  1. @mlimotte
This page is out of date. Refresh to see the latest.
View
4 CHANGELOG.md
@@ -1,3 +1,7 @@
+## Unreleased
+
+* Bug fix: make vars containing predmacros work properly
+
## 1.8.6
* "Call to unbound-fn" Error solved. To use functions within Cascalog queries, hadoop needed to call "require" on the containing namespace. If you define functions at the repl, the namespace file might not exist, and this call will fail; previously, Cascalog would squash these exceptions.
View
3  src/clj/cascalog/conf.clj
@@ -20,8 +20,9 @@
(def ^:dynamic *JOB-CONF* {})
-(defn project-conf []
+(defn project-conf [& [conf]]
(project-merge (project-settings)
+ (into {} conf)
*JOB-CONF*
{"io.serializations"
"cascalog.hadoop.ClojureKryoSerialization"}))
View
7 src/clj/cascalog/rules.clj
@@ -622,9 +622,10 @@
(defn- expand-predicate-macros [raw-predicates]
(mapcat (fn [[p _ vars :as raw-predicate]]
- (if (p/predicate-macro? p)
- (expand-predicate-macros (expand-predicate-macro p vars))
- [raw-predicate]))
+ (let [p (if (var? p) (var-get p) p)]
+ (if (p/predicate-macro? p)
+ (expand-predicate-macros (expand-predicate-macro p vars))
+ [raw-predicate])))
raw-predicates))
(defn normalize-raw-predicates
View
2  src/jvm/cascalog/ClojureCascadingBase.java
@@ -48,7 +48,7 @@ public ClojureCascadingBase(Fields fields, Object[] fn_spec, boolean stateful) {
@Override
public void prepare(FlowProcess flow_process, OperationCall op_call) {
- this.fn_spec = (Object[]) KryoService.deserialize(serialized_spec);
+ this.fn_spec = KryoService.deserialize(flow_process, serialized_spec);
this.fn = Util.bootFn(fn_spec);
if (stateful) {
try {
View
89 src/jvm/cascalog/KryoService.java
@@ -1,32 +1,103 @@
package cascalog;
+import cascading.CascadingException;
+import cascading.flow.hadoop.HadoopFlowProcess;
+import cascading.flow.FlowProcess;
import cascading.kryo.Kryo;
import cascading.kryo.KryoFactory;
import cascalog.hadoop.ClojureKryoSerialization;
import com.esotericsoftware.kryo.ObjectBuffer;
import org.apache.log4j.Logger;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.lang.NullPointerException;
+import java.lang.Object;
+import java.lang.String;
+import java.lang.System;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+import clojure.lang.RT;
+import clojure.lang.Var;
/** User: sritchie Date: 12/16/11 Time: 8:34 PM */
public class KryoService {
public static final Logger LOG = Logger.getLogger(KryoService.class);
- static ObjectBuffer kryoBuf;
+ static Var require = RT.var("clojure.core", "require");
+ static Var symbol = RT.var("clojure.core", "symbol");
+ static Var projectConf;
+ static Var hadoopJobConf;
static {
ClojureKryoSerialization serialization = new ClojureKryoSerialization();
Kryo k = serialization.populatedKryo();
k.setRegistrationOptional(true);
- kryoBuf = KryoFactory.newBuffer(k);
+ try {
+ require.invoke(symbol.invoke("cascalog.conf"));
+ require.invoke(symbol.invoke("hadoop-util.core"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ projectConf = RT.var("cascalog.conf", "project-conf");
+ hadoopJobConf = RT.var("hadoop-util.core", "job-conf");
}
- public static byte[] serialize(Object obj) {
- LOG.debug("Serializing " + obj);
- return kryoBuf.writeClassAndObject(obj);
+ public static byte[] serialize(Object[] objs) {
+ LOG.debug("Serializing " + objs);
+ Configuration conf = (Configuration) hadoopJobConf.invoke(projectConf.invoke());
+ SerializationFactory factory = new SerializationFactory(conf);
+
+ Serializer<Object> serializer;
+ try {
+ serializer = factory.getSerializer(Object.class);
+ } catch (NullPointerException e) {
+ // for compatability with the expected behavior documented by the
+ // java.util.GregorianCalendar test in cascalog.conf-test
+ // TODO or... should we change the test?
+ throw new CascadingException("No serializer found", e);
+ }
+
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ serializer.open(stream);
+ serializer.serialize(objs);
+ serializer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return stream.toByteArray();
}
- public static Object deserialize(byte[] serialized) {
- Object o = kryoBuf.readClassAndObject(serialized);
- LOG.debug("Deserialized " + o);
- return o;
+ public static Object[] deserialize(FlowProcess flow_process, byte[] serialized) {
+ Object[] objs = null;
+
+ Configuration flowConf = null;
+ if (flow_process != null && flow_process instanceof HadoopFlowProcess) {
+ // flow_process is null when called from KryoInsert
+ // flow_process is not an instance of HadoopFlowProcess when called via bridge-test
+ flowConf = ((HadoopFlowProcess) flow_process).getJobConf();
+ }
+ Configuration conf = (Configuration) hadoopJobConf.invoke(projectConf.invoke(flowConf));
+
+ SerializationFactory factory = new SerializationFactory(conf);
+ Deserializer<Object[]> deserializer = factory.getDeserializer(Object[].class);
+
+ ByteArrayInputStream stream = new ByteArrayInputStream(serialized);
+ try {
+ deserializer.open(stream);
+ objs = deserializer.deserialize(null);
+ deserializer.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.debug("Deserialized " + objs);
+ return objs;
}
}
View
5 src/jvm/cascalog/ops/KryoInsert.java
@@ -24,13 +24,16 @@ public KryoInsert(Fields fieldDeclaration, Object... values) {
public Tuple getTuple() {
if (this.values == null) {
- Object[] values = (Object[]) KryoService.deserialize(this.serialized);
+ Object[] values = (Object[]) KryoService.deserialize(null, this.serialized);
this.values = new Tuple(values);
}
return this.values;
}
@Override public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
+ // TODO we could pass flowProcess through getTuple to KryoService#deserialize here,
+ // but since the rest of this class doesn't have a FlowProcess to use in
+ // ser/deser, we could run into trouble if we're not consistent.
functionCall.getOutputCollector().add( new Tuple( getTuple() ) );
}
View
41 src/jvm/cascalog/test/StringBufferKryoSerializer.java
@@ -0,0 +1,41 @@
+/*
+ Copyright 2010 Nathan Marz
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package cascalog.test;
+
+import java.lang.Class;
+import java.lang.Object;
+import java.lang.Override;
+import java.lang.StringBuffer;
+import java.nio.ByteBuffer;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serialize.StringSerializer;
+
+public class StringBufferKryoSerializer extends Serializer {
+
+ @Override
+ public void writeObjectData(ByteBuffer byteBuffer, Object o) {
+ StringSerializer.put(byteBuffer,"foo");
+ }
+
+ @Override
+ public StringBuffer readObjectData(ByteBuffer byteBuffer, Class type) {
+ String s = StringSerializer.get(byteBuffer);
+ return new StringBuffer(s);
+ }
+
+}
View
18 test/cascalog/defops_test.clj
@@ -26,6 +26,13 @@
([state x] (+ x y state))
([state] nil))
+(defmapop [string-stateful [y]]
+ "append \"-\" y and \"-string\""
+ {:stateful true}
+ ([] "-string")
+ ([state x] (str x "-" y state))
+ ([state] nil))
+
(deftest defops-arg-parsing-test
(let [src [[1] [2]]
mk-query (fn [afn]
@@ -49,6 +56,17 @@
ident-meta
ident-both)))
+(deftest defops-custom-serializer-for-param-test
+ ; Normally, the result would be "one-bar-string", but we have a custom
+ ; serializer for StringBuffer. Using this serializer, all StringBuffers are
+ ; serialized with contents "foo".
+ (with-job-conf
+ {"cascading.kryo.registrations" "java.lang.StringBuffer,cascalog.test.StringBufferKryoSerializer"}
+ (fact?<- [["one-foo-string"]]
+ [?y]
+ ([["one"]] ?x)
+ (string-stateful [(StringBuffer. "bar")] ?x :> ?y))))
+
(facts "Metadata testing."
"Both function and var should contain custom metadata."
(meta ident-stateful) => (contains {:great-meta "yes!"})
View
8 test/cascalog/pred_macro_test.clj
@@ -7,13 +7,15 @@
(:require [cascalog.ops :as c]
[cascalog.io :as io]))
+(def mac2
+ (<- [:< ?a]
+ (* ?a ?a :> ?a)))
+
(deftest test-predicate-macro
(let [mac1 (<- [?a :> ?b ?c]
(+ ?a 1 :> ?t)
(* ?t 2 :> ?b)
(+ ?a ?t :> ?c))
- mac2 (<- [:< ?a]
- (* ?a ?a :> ?a))
mac3 (<- [?a :> ?b]
(+ ?a ?a :> ?b))
num1 [[0] [1] [2] [3]]]
@@ -26,7 +28,7 @@
(test?<- [[0] [1]]
[?n]
(num1 ?n)
- (mac2 ?n))
+ (#'mac2 ?n))
;; test that it allows same var used as input and output
(test?<- [[0]]
Something went wrong with that request. Please try again.