diff --git a/src/edu/washington/escience/myria/MyriaConstants.java b/src/edu/washington/escience/myria/MyriaConstants.java index b02d64974..fcd7c803b 100644 --- a/src/edu/washington/escience/myria/MyriaConstants.java +++ b/src/edu/washington/escience/myria/MyriaConstants.java @@ -399,7 +399,7 @@ public int getVal() { "function_isMultivalued", Type.BOOLEAN_TYPE, "function_binary", - Type.BLOB_TYPE); + Type.STRING_TYPE); /** Number of bytes per worker partition for parallel ingest - 100MB. */ public static final long PARALLEL_INGEST_WORKER_MINIMUM_PARTITION_SIZE = 100 * MB; diff --git a/src/edu/washington/escience/myria/accessmethod/JdbcAccessMethod.java b/src/edu/washington/escience/myria/accessmethod/JdbcAccessMethod.java index 2cef54068..957532cdc 100644 --- a/src/edu/washington/escience/myria/accessmethod/JdbcAccessMethod.java +++ b/src/edu/washington/escience/myria/accessmethod/JdbcAccessMethod.java @@ -48,7 +48,7 @@ public final class JdbcAccessMethod extends AccessMethod { /** The logger for this class. */ - private static final Logger LOGGER = LoggerFactory.getLogger(JdbcAccessMethod.class); + private final Logger LOGGER = LoggerFactory.getLogger(JdbcAccessMethod.class); /** The database connection information. */ private JdbcInfo jdbcInfo; /** The database connection. */ @@ -700,6 +700,8 @@ private String quote(final String column) { class JdbcTupleBatchIterator implements Iterator { /** The results from a JDBC query that will be returned in TupleBatches by this Iterator. */ private final ResultSet resultSet; + /** The logger for this class. */ + private final Logger LOGGER = LoggerFactory.getLogger(JdbcTupleBatchIterator.class); /** The Schema of the TupleBatches returned by this Iterator. */ private final Schema schema; /** Next TB. */ @@ -741,9 +743,12 @@ private TupleBatch getNextTB() throws SQLException { return null; } final int numFields = schema.numColumns(); + LOGGER.info("num columns " + numFields); + final List> columnBuilders = ColumnFactory.allocateColumns(schema); int numTuples = 0; int batch_size = TupleUtils.getBatchSize(schema); + LOGGER.info("batch size " + batch_size); for (numTuples = 0; numTuples < batch_size; ++numTuples) { if (!resultSet.next()) { final Connection connection = resultSet.getStatement().getConnection(); @@ -754,6 +759,7 @@ private TupleBatch getNextTB() throws SQLException { } for (int colIdx = 0; colIdx < numFields; ++colIdx) { /* Warning: JDBC is 1-indexed */ + LOGGER.info("column index: " + colIdx); columnBuilders.get(colIdx).appendFromJdbc(resultSet, colIdx + 1); } } diff --git a/src/edu/washington/escience/myria/expression/evaluate/PythonUDFEvaluator.java b/src/edu/washington/escience/myria/expression/evaluate/PythonUDFEvaluator.java index ad5aece24..99481dc68 100644 --- a/src/edu/washington/escience/myria/expression/evaluate/PythonUDFEvaluator.java +++ b/src/edu/washington/escience/myria/expression/evaluate/PythonUDFEvaluator.java @@ -81,7 +81,6 @@ public PythonUDFEvaluator( super(expression, parameters); pyFuncRegistrar = pyFuncReg; - if (parameters.getStateSchema() != null) {} PyUDFExpression op = (PyUDFExpression) expression.getRootExpressionOperator(); outputType = op.getOutput(); List childops = op.getChildren(); @@ -101,9 +100,12 @@ private void initEvaluator() throws DbException { ExpressionOperator op = getExpression().getRootExpressionOperator(); String pyFunctionName = ((PyUDFExpression) op).getName(); + LOGGER.info("trying to initialize evaluator"); try { if (pyFuncRegistrar != null) { + LOGGER.info("py func registrar is not null"); + LOGGER.info("py function name: " + pyFunctionName); String pyCodeString = pyFuncRegistrar.getFunctionBinary(pyFunctionName); FunctionStatus fs = pyFuncRegistrar.getFunctionStatus(pyFunctionName); diff --git a/src/edu/washington/escience/myria/functions/PythonFunctionRegistrar.java b/src/edu/washington/escience/myria/functions/PythonFunctionRegistrar.java index 8d6a8bd0d..c80a89509 100644 --- a/src/edu/washington/escience/myria/functions/PythonFunctionRegistrar.java +++ b/src/edu/washington/escience/myria/functions/PythonFunctionRegistrar.java @@ -25,7 +25,7 @@ public class PythonFunctionRegistrar { /** The logger for this class. */ private static final org.slf4j.Logger LOGGER = - org.slf4j.LoggerFactory.getLogger(ProfilingLogger.class); + org.slf4j.LoggerFactory.getLogger(PythonFunctionRegistrar.class); /** The connection to the database database. */ private final JdbcAccessMethod accessMethod; @@ -44,8 +44,6 @@ public PythonFunctionRegistrar(final ConnectionInfo connectionInfo) throws DbExc connectionInfo.getDbms().equals(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL), "Profiling only supported with Postgres JDBC connection"); - //LOGGER.info("trying to register python function"); - /* open the database connection */ accessMethod = (JdbcAccessMethod) AccessMethod.of(connectionInfo.getDbms(), connectionInfo, false); @@ -70,7 +68,7 @@ public void addFunction( final String description, final String outputType, final Boolean isMultivalued, - final ByteBuffer binary) + final String binary) throws DbException { String tableName = MyriaConstants.PYUDF_RELATION.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL); @@ -91,7 +89,7 @@ public void addFunction( pyFunctions.putString(1, description); pyFunctions.putString(2, outputType); pyFunctions.putBoolean(3, isMultivalued); - pyFunctions.putBlob(4, binary); + pyFunctions.putString(4, binary); accessMethod.tupleBatchInsert(MyriaConstants.PYUDF_RELATION, pyFunctions.popAny()); @@ -118,15 +116,16 @@ public FunctionStatus getFunctionStatus(final String pyFunctionName) throws DbEx accessMethod.tupleBatchIteratorFromQuery(sb.toString(), MyriaConstants.PYUDF_SCHEMA); if (tuples.hasNext()) { + LOGGER.info("found something!"); final TupleBatch tb = tuples.next(); if (tb.numTuples() > 0) { FunctionStatus fs = new FunctionStatus( pyFunctionName, + tb.getString(0, 0), tb.getString(1, 0), - tb.getString(2, 0), - tb.getBoolean(3, 0), + tb.getBoolean(2, 0), FunctionLanguage.PYTHON); return fs; } @@ -145,7 +144,7 @@ public boolean isValid() { try { return accessMethod.getConnection().isValid(1); } catch (SQLException e) { - LOGGER.warn("Error checking connection validity", e); + LOGGER.info("Error checking connection validity", e); return false; } } @@ -156,15 +155,18 @@ public boolean isValid() { * @throws DbException in case of error. */ public String getFunctionBinary(final String pyFunctionName) throws DbException { + LOGGER.info("Trying to get function" + pyFunctionName); StringBuilder sb = new StringBuilder(); - sb.append("Select function_binary from "); + sb.append("Select * from "); sb.append(MyriaConstants.PYUDF_RELATION.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL)); sb.append(" where function_name='"); sb.append(pyFunctionName); sb.append("'"); + LOGGER.info(sb.toString()); try { + Iterator tuples = accessMethod.tupleBatchIteratorFromQuery(sb.toString(), MyriaConstants.PYUDF_SCHEMA); @@ -172,11 +174,12 @@ public String getFunctionBinary(final String pyFunctionName) throws DbException final TupleBatch tb = tuples.next(); LOGGER.info("Got {} tuples", tb.numTuples()); if (tb.numTuples() > 0) { - String codename = tb.getString(0, 0); + String codename = tb.getString(3, 0); LOGGER.info("codename: " + codename); return codename; } } + } catch (Exception e) { LOGGER.info(e.getMessage()); throw new DbException(e); diff --git a/src/edu/washington/escience/myria/functions/PythonWorker.java b/src/edu/washington/escience/myria/functions/PythonWorker.java index 53181ae2a..01a40e409 100644 --- a/src/edu/washington/escience/myria/functions/PythonWorker.java +++ b/src/edu/washington/escience/myria/functions/PythonWorker.java @@ -78,11 +78,17 @@ public void sendCodePickle( try { if (pyCodeString.length() > 0 && dOut != null) { byte[] bytes = pyCodeString.getBytes(StandardCharsets.UTF_8); + LOGGER.info("code string length is: " + pyCodeString.length()); dOut.writeInt(bytes.length); + LOGGER.info("wrote length"); + dOut.write(bytes); + LOGGER.info("wrote bytes"); dOut.writeInt(numColumns); + LOGGER.info("wrote num columns: " + numColumns); writeOutputType(outputType); + LOGGER.info("wrote output type: " + outputType.toString()); if (isFlatMap) { dOut.writeInt(1); } else { @@ -165,6 +171,7 @@ private void createServerSocket() throws UnknownHostException, IOException { * @throws IOException */ private void startPythonWorker() throws IOException { + LOGGER.info("starting python worker"); String pythonWorker = MyriaConstants.PYTHONWORKER; ProcessBuilder pb = new ProcessBuilder(MyriaConstants.PYTHONEXEC, "-m", pythonWorker); @@ -184,6 +191,7 @@ private void startPythonWorker() throws IOException { out.flush(); clientSock = serverSocket.accept(); setupStreams(); + LOGGER.info("successfully started the python worker"); return; } diff --git a/src/edu/washington/escience/myria/operator/Apply.java b/src/edu/washington/escience/myria/operator/Apply.java index 7f5983e3b..adb77c5ee 100644 --- a/src/edu/washington/escience/myria/operator/Apply.java +++ b/src/edu/washington/escience/myria/operator/Apply.java @@ -259,7 +259,6 @@ protected void init(final ImmutableMap execEnvVars) throws DbExc if (expr.isConstant()) { evaluator = new ConstantEvaluator(expr, parameters); } else if (expr.isRegisteredUDF()) { - evaluator = new PythonUDFEvaluator(expr, parameters, getPythonFunctionRegistrar()); } else { evaluator = new GenericEvaluator(expr, parameters); diff --git a/src/edu/washington/escience/myria/operator/DbCreateFunction.java b/src/edu/washington/escience/myria/operator/DbCreateFunction.java index e48dc0819..2a9db6b30 100644 --- a/src/edu/washington/escience/myria/operator/DbCreateFunction.java +++ b/src/edu/washington/escience/myria/operator/DbCreateFunction.java @@ -107,14 +107,13 @@ protected void init(final ImmutableMap execEnvVars) break; case PYTHON: - BASE64Decoder decoder = new BASE64Decoder(); + //BASE64Decoder decoder = new BASE64Decoder(); if (binary != null) { - byte[] decodedBytes = decoder.decodeBuffer(binary); - ByteBuffer binaryFunction = ByteBuffer.wrap(decodedBytes); + //byte[] decodedBytes = decoder.decodeBuffer(binary); + //ByteBuffer binaryFunction = ByteBuffer.wrap(decodedBytes); PythonFunctionRegistrar pyFunc = new PythonFunctionRegistrar(connectionInfo); - pyFunc.addFunction( - name, description, outputType.toString(), isMultivalued, binaryFunction); + pyFunc.addFunction(name, description, outputType.toString(), isMultivalued, binary); } else { throw new DbException("Cannot register python UDF without binary."); } diff --git a/src/edu/washington/escience/myria/operator/Operator.java b/src/edu/washington/escience/myria/operator/Operator.java index fb1b2e383..5d11fc3c1 100644 --- a/src/edu/washington/escience/myria/operator/Operator.java +++ b/src/edu/washington/escience/myria/operator/Operator.java @@ -113,13 +113,14 @@ public ProfilingLogger getProfilingLogger() { * @throws DbException in case of error. */ public PythonFunctionRegistrar getPythonFunctionRegistrar() throws DbException { - if (execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE) != null - && !execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE).equals("true")) { + PythonFunctionRegistrar pyFuncRegistrar = null; + if (execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE) == null) { if (getLocalSubQuery() instanceof WorkerSubQuery) { - return ((WorkerSubQuery) getLocalSubQuery()).getWorker().getPythonFunctionRegistrar(); + pyFuncRegistrar = + ((WorkerSubQuery) getLocalSubQuery()).getWorker().getPythonFunctionRegistrar(); } } - return null; + return pyFuncRegistrar; } /**