Skip to content
This repository has been archived by the owner on Dec 30, 2019. It is now read-only.

Commit

Permalink
Testing new serialization.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabor Szabo committed Mar 31, 2012
1 parent e762e40 commit 6aea046
Show file tree
Hide file tree
Showing 16 changed files with 388 additions and 621 deletions.
29 changes: 18 additions & 11 deletions examples/wc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,23 @@
from pycascading.helpers import *
from pycascading.operators import *

import com.twitter.pycascading.FlowHead
def fun():
print '*** OK!!', module_paths, jobconf, flow_process

class FlowHead(Operation):
def __init__(self, *args, **kwargs):
pass
def m():
print 'glob func'

def _create_with_parent(self, parent):
return com.twitter.pycascading.FlowHead('noop-name', parent.get_assembly())
class C:
def __init__(self):
self.x = 1

@classmethod
def f(cls):
print 'clsmethod'

def fun():
print '*** OK!!', module_paths, jobconf
def m(self):
# m()
print 'ok m', self.x

def main():
flow = Flow()
Expand All @@ -44,8 +49,8 @@ def main():

@map(produces=['word'])
@yields
def split_words(tuple, field, fun):
print '&&&& fun', field, fun, fun()
def split_words(tuple, field, fun, obj):
print '&&&& fun', field, fun, fun(), obj.x
for word in tuple.get(field).split():
yield [word]

Expand All @@ -61,6 +66,8 @@ def count(group, tuple):
c += 1
yield [c]

input | Map('line', split_words(0, fun), 'word') | GroupBy('word') | count | output
c = C()
c.x = 2
input | Map('line', split_words(0, m, c), 'word') | GroupBy('word') | count | output

flow.run(num_reducers=2)
184 changes: 99 additions & 85 deletions java/src/com/twitter/pycascading/CascadingBaseOperationWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import org.python.core.PyTuple;
import org.python.util.PythonInterpreter;

import com.twitter.pycascading.PythonFunctionWrapper.RunningMode;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
Expand All @@ -51,7 +49,7 @@
*
* @author Gabor Szabo
*/
@SuppressWarnings("rawtypes")
@SuppressWarnings({ "rawtypes", "deprecation" })
public class CascadingBaseOperationWrapper extends BaseOperation implements Serializable {
private static final long serialVersionUID = -535185466322890691L;

Expand All @@ -63,15 +61,24 @@ public enum ConvertInputTuples {

private PyObject function;
private ConvertInputTuples convertInputTuples;
private PyFunction writeObjectCallBack;
private byte[] serializedFunction;

private PyTuple contextArgs = null;
protected PyDictionary contextKwArgs = null;

private PyFunction writeObjectCallBack;
private byte[] serializedFunction;

// These are some variables to optimize the frequent UDF calls
protected PyObject[] callArgs = null;
private String[] contextKwArgsNames = null;

/**
* Class to convert elements in an iterator to corresponding Jython objects.
*
* @author Gabor Szabo
*
* @param <I>
* the type of the items
*/
class ConvertIterable<I> implements Iterator<PyObject> {
private Iterator<I> iterator;

Expand Down Expand Up @@ -114,7 +121,7 @@ public CascadingBaseOperationWrapper(int numArgs, Fields fieldDeclaration) {
super(numArgs, fieldDeclaration);
}

private void setupInterpreter(JobConf jobConf) {
private PythonInterpreter setupInterpreter(JobConf jobConf, FlowProcess flowProcess) {
System.out.println("******* calling setupInt");
String pycascadingDir = null;
String sourceDir = null;
Expand Down Expand Up @@ -152,101 +159,91 @@ private void setupInterpreter(JobConf jobConf) {
// We set the Python variable "jobconf" to the MR jobconf
interpreter.set("jobconf", jobConf);

// PyObject module = (PyObject) interpreter
// .eval("load_source(module_name, file_name, module_paths)");
// We need to do this so that nested functions can also be used
interpreter.set("flow_process", flowProcess);

// We need to run the main file first so that imports etc. are defined,
// and nested functions can also be used
interpreter.execfile(sourceDir + (String) jobConf.get("pycascading.main_file"));
// interpreter.exec(funcSource);
// pythonFunction = module.__getattr__(funcName);
return interpreter;
}

// We need to delay the deserialization of the Python functions up to this
// point, since the sources are in the distributed cache, whose location is in
// the jobconf, and we get access to the jobconf only at this point for the
// first time.
@Override
public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
System.out.println("******* baseopprepare");
// pythonEnvironment = new PythonEnvironment();
// function.prepare(((HadoopFlowProcess) flowProcess).getJobConf());
JobConf jobConf = ((HadoopFlowProcess) flowProcess).getJobConf();
System.out.println("$$$$$ jobconf: " + jobConf.get("pycascading.running_mode"));
setupInterpreter(jobConf);
PythonInterpreter interpreter = setupInterpreter(jobConf, flowProcess);

ByteArrayInputStream str = new ByteArrayInputStream(serializedFunction);
StringBuilder sources = new StringBuilder();
ByteArrayInputStream baos = new ByteArrayInputStream(serializedFunction);
try {
PythonObjectInputStream pythonStream = new PythonObjectInputStream(str, sources);
// // function = (PythonFunctionWrapper) pythonStream.readObject();
PythonObjectInputStream pythonStream = new PythonObjectInputStream(baos, interpreter);

function = (PyObject) pythonStream.readObject();
convertInputTuples = (ConvertInputTuples) pythonStream.readObject();
if ((Boolean) pythonStream.readObject())
contextArgs = (PyTuple) pythonStream.readObject();
if ((Boolean) pythonStream.readObject())
contextKwArgs = (PyDictionary) pythonStream.readObject();
str.close();
serializedFunction = null;
PythonInterpreter interpreter = Main.getInterpreter();
// function2 = (PyFunction) interpreter.get(Py.tojava(function.funcName,
// String.class));
System.out.println("we got: " + function);
if (!PyFunction.class.isInstance(function)) {
// function is assumed to be decorated, resulting in a
// DecoratedFunction.
// The function was decorated so we need to get the original back
// Only for performance reasons. It's just as good to comment this
// out, as a DecoratedFunction is callable anyway.
// If we were to decorate the functions with other decorators as
// well, we certainly cannot use this.
try {
function = (PyFunction) ((PyDictionary) (function.__getattr__(new PyString("decorators"))))
.get(new PyString("function"));
} catch (Exception e) {
throw new RuntimeException(
"Expected a Python function or a decorated function. This shouldn't happen.");
}
}
baos.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
// If there are any kind of exceptions (ClassNotFoundException or
// IOException), we don't want to continue.
throw new RuntimeException(e);
}
serializedFunction = null;
if (!PyFunction.class.isInstance(function)) {
// function is assumed to be decorated, resulting in a
// DecoratedFunction, so we can get the original function back.
//
// Only for performance reasons. It's just as good to comment this
// out, as a DecoratedFunction is callable anyway.
// If we were to decorate the functions with other decorators as
// well, we certainly cannot use this.
try {
function = (PyFunction) ((PyDictionary) (function.__getattr__(new PyString("decorators"))))
.get(new PyString("function"));
} catch (Exception e) {
throw new RuntimeException(
"Expected a Python function or a decorated function. This shouldn't happen.");
}
}
System.out.println("####### contextArgs " + contextArgs);
setupArgs();
}

private void writeObject(ObjectOutputStream stream) throws IOException {
System.out.println("*** CascadingBaseOperationWrapper writeObject");
ByteArrayOutputStream str = new ByteArrayOutputStream();
PythonObjectOutputStream pythonStream = new PythonObjectOutputStream(str, writeObjectCallBack);
// pythonStream.writeObject(function);
System.out.println("1");
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PythonObjectOutputStream pythonStream = new PythonObjectOutputStream(baos, writeObjectCallBack);
pythonStream.writeObject(function);
System.out.println("2");
System.out.println("1");
pythonStream.writeObject(convertInputTuples);
System.out.println("1");
pythonStream.writeObject(new Boolean(contextArgs != null));
if (contextArgs != null)
System.out.println("1");
if (contextArgs != null) {
System.out.println("******* contextArgs: " + contextArgs);
pythonStream.writeObject(contextArgs);
}
System.out.println("1");
pythonStream.writeObject(new Boolean(contextKwArgs != null));
if (contextKwArgs != null)
pythonStream.writeObject(contextKwArgs);
pythonStream.close();
stream.writeObject(str.toByteArray());
System.out.println("1");

stream.writeObject(baos.toByteArray());
}

private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException,
URISyntaxException {
// TODO: we need to start up the interpreter and for all the imports, as
// the parameters may use other imports, like datetime. Or how else can
// we do this better?
System.out.println("^^^^^ cbow reado");
serializedFunction = (byte[]) stream.readObject();
// ByteArrayInputStream str = new ByteArrayInputStream(arr);
// PythonObjectInputStream pythonStream = new PythonObjectInputStream(str,
// sources);
// // function = (PythonFunctionWrapper) pythonStream.readObject();
// function2 = (PyFunction) pythonStream.readObject();
// convertInputTuples = (ConvertInputTuples) pythonStream.readObject();
// if ((Boolean) pythonStream.readObject())
// contextArgs = (PyTuple) pythonStream.readObject();
// if ((Boolean) pythonStream.readObject())
// contextKwArgs = (PyDictionary) pythonStream.readObject();
// str.close();
}

/**
Expand All @@ -261,43 +258,25 @@ public int getNumParameters() {
}

/**
* We may pass in Python functions as an argument to UDFs. In this case we
* have to wrap these the same way we wrapped the UDFs, and need to unwrap
* them at deserialization.
*
* @return the original argument to the UDF before serialization
*/
private PyObject getOriginalArg(PyObject arg) {
return arg;
// Object result = arg.__tojava__(PythonFunctionWrapper.class);
// if (result == Py.NoConversion)
// return arg;
// else
// return ((PythonFunctionWrapper) result).getPythonFunction();
}

/**
* Sets up the local variables that were not serialized for optimizations and
* unwraps function arguments wrapped with PythonFunctionWrapper.
* Sets up the local variables that were not serialized for optimizations.
*/
protected void setupArgs() {
int numArgs = getNumParameters();
callArgs = new PyObject[numArgs + (contextArgs == null ? 0 : contextArgs.size())
+ (contextKwArgs == null ? 0 : contextKwArgs.size())];
System.out.println("&&&&&&& numargs " + numArgs + "/" + callArgs.length + "/" + contextArgs);
int i = numArgs;
if (contextArgs != null) {
PyObject[] args = contextArgs.getArray();
for (PyObject arg : args) {
callArgs[i] = getOriginalArg(arg);
callArgs[i] = arg;
i++;
}
}
if (contextKwArgs != null) {
PyIterator values = (PyIterator) contextKwArgs.itervalues();
PyObject value = values.__iternext__();
while (value != null) {
callArgs[i] = getOriginalArg(value);
callArgs[i] = value;
value = values.__iternext__();
i++;
}
Expand Down Expand Up @@ -359,27 +338,62 @@ public PyObject callFunction() {
return function.__call__(callArgs);
else
return function.__call__(callArgs, contextKwArgsNames);
// return function.callFunction(callArgs, contextKwArgsNames);
}

/**
* Setter for the Python function object.
*
* @param function
* the Python function
*/
public void setFunction(PyFunction function) {
this.function = function;
}

/**
* Setter for the input tuple conversion type.
*
* @param convertInputTuples
* whether to do any conversion on input tuples, and the type of the
* converted tuple (none/list/dict)
*/
public void setConvertInputTuples(ConvertInputTuples convertInputTuples) {
this.convertInputTuples = convertInputTuples;
}

/**
* Setter for the constant unnamed arguments that are passed in for the UDF
* aside from the tuples.
*
* @param args
* the additional unnamed arguments
*/
public void setContextArgs(PyTuple args) {
contextArgs = args;
setupArgs();
}

/**
* Setter for the constant named arguments that are passed in for the UDF
* aside from the tuples.
*
* @param args
* the additional unnamed arguments
*/
public void setContextKwArgs(PyDictionary kwargs) {
contextKwArgs = kwargs;
setupArgs();
}

/**
* The Python callback function to call to get the source of a PyFunction. We
* better do it in Python using the inspect module, than hack it around in
* Java.
*
* @param callBack
* the PyFunction that is called to get the source of a Python
* function
*/
public void setWriteObjectCallBack(PyFunction callBack) {
this.writeObjectCallBack = callBack;
}
Expand Down
4 changes: 0 additions & 4 deletions java/src/com/twitter/pycascading/CascadingBufferWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,8 @@ public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
callArgs[1] = Py.java2py(arguments);
if (outputMethod == OutputMethod.COLLECTS) {
callArgs[2] = Py.java2py(outputCollector);
if (flowProcessPassIn == FlowProcessPassIn.YES)
callArgs[3] = Py.java2py(flowProcess);
callFunction();
} else {
if (flowProcessPassIn == FlowProcessPassIn.YES)
callArgs[2] = Py.java2py(flowProcess);
Object ret = callFunction();
collectOutput(outputCollector, ret);
}
Expand Down
Loading

0 comments on commit 6aea046

Please sign in to comment.