Skip to content

Commit

Permalink
debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
parmitam committed Jan 11, 2017
1 parent f0d53b1 commit 0b81f5e
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/edu/washington/escience/myria/MyriaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public int getVal() {
"function_isMultivalued",
Type.BOOLEAN_TYPE,
"function_binary",
Type.BLOB_TYPE);
Type.STRING_TYPE);

/** Number of bytes per worker partition for parallel ingest - 100MB. */
public static final long PARALLEL_INGEST_WORKER_MINIMUM_PARTITION_SIZE = 100 * MB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
public final class JdbcAccessMethod extends AccessMethod {

/** The logger for this class. */
private static final Logger LOGGER = LoggerFactory.getLogger(JdbcAccessMethod.class);
private final Logger LOGGER = LoggerFactory.getLogger(JdbcAccessMethod.class);
/** The database connection information. */
private JdbcInfo jdbcInfo;
/** The database connection. */
Expand Down Expand Up @@ -700,6 +700,8 @@ private String quote(final String column) {
class JdbcTupleBatchIterator implements Iterator<TupleBatch> {
/** The results from a JDBC query that will be returned in TupleBatches by this Iterator. */
private final ResultSet resultSet;
/** The logger for this class. */
private final Logger LOGGER = LoggerFactory.getLogger(JdbcTupleBatchIterator.class);
/** The Schema of the TupleBatches returned by this Iterator. */
private final Schema schema;
/** Next TB. */
Expand Down Expand Up @@ -741,9 +743,12 @@ private TupleBatch getNextTB() throws SQLException {
return null;
}
final int numFields = schema.numColumns();
LOGGER.info("num columns " + numFields);

final List<ColumnBuilder<?>> columnBuilders = ColumnFactory.allocateColumns(schema);
int numTuples = 0;
int batch_size = TupleUtils.getBatchSize(schema);
LOGGER.info("batch size " + batch_size);
for (numTuples = 0; numTuples < batch_size; ++numTuples) {
if (!resultSet.next()) {
final Connection connection = resultSet.getStatement().getConnection();
Expand All @@ -754,6 +759,7 @@ private TupleBatch getNextTB() throws SQLException {
}
for (int colIdx = 0; colIdx < numFields; ++colIdx) {
/* Warning: JDBC is 1-indexed */
LOGGER.info("column index: " + colIdx);
columnBuilders.get(colIdx).appendFromJdbc(resultSet, colIdx + 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public PythonUDFEvaluator(
super(expression, parameters);
pyFuncRegistrar = pyFuncReg;

if (parameters.getStateSchema() != null) {}
PyUDFExpression op = (PyUDFExpression) expression.getRootExpressionOperator();
outputType = op.getOutput();
List<ExpressionOperator> childops = op.getChildren();
Expand All @@ -101,9 +100,12 @@ private void initEvaluator() throws DbException {
ExpressionOperator op = getExpression().getRootExpressionOperator();

String pyFunctionName = ((PyUDFExpression) op).getName();
LOGGER.info("trying to initialize evaluator");

try {
if (pyFuncRegistrar != null) {
LOGGER.info("py func registrar is not null");
LOGGER.info("py function name: " + pyFunctionName);
String pyCodeString = pyFuncRegistrar.getFunctionBinary(pyFunctionName);
FunctionStatus fs = pyFuncRegistrar.getFunctionStatus(pyFunctionName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class PythonFunctionRegistrar {

/** The logger for this class. */
private static final org.slf4j.Logger LOGGER =
org.slf4j.LoggerFactory.getLogger(ProfilingLogger.class);
org.slf4j.LoggerFactory.getLogger(PythonFunctionRegistrar.class);

/** The connection to the database database. */
private final JdbcAccessMethod accessMethod;
Expand All @@ -44,8 +44,6 @@ public PythonFunctionRegistrar(final ConnectionInfo connectionInfo) throws DbExc
connectionInfo.getDbms().equals(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL),
"Profiling only supported with Postgres JDBC connection");

//LOGGER.info("trying to register python function");

/* open the database connection */
accessMethod =
(JdbcAccessMethod) AccessMethod.of(connectionInfo.getDbms(), connectionInfo, false);
Expand All @@ -70,7 +68,7 @@ public void addFunction(
final String description,
final String outputType,
final Boolean isMultivalued,
final ByteBuffer binary)
final String binary)
throws DbException {
String tableName =
MyriaConstants.PYUDF_RELATION.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL);
Expand All @@ -91,7 +89,7 @@ public void addFunction(
pyFunctions.putString(1, description);
pyFunctions.putString(2, outputType);
pyFunctions.putBoolean(3, isMultivalued);
pyFunctions.putBlob(4, binary);
pyFunctions.putString(4, binary);

accessMethod.tupleBatchInsert(MyriaConstants.PYUDF_RELATION, pyFunctions.popAny());

Expand All @@ -118,15 +116,16 @@ public FunctionStatus getFunctionStatus(final String pyFunctionName) throws DbEx
accessMethod.tupleBatchIteratorFromQuery(sb.toString(), MyriaConstants.PYUDF_SCHEMA);

if (tuples.hasNext()) {
LOGGER.info("found something!");
final TupleBatch tb = tuples.next();
if (tb.numTuples() > 0) {

FunctionStatus fs =
new FunctionStatus(
pyFunctionName,
tb.getString(0, 0),
tb.getString(1, 0),
tb.getString(2, 0),
tb.getBoolean(3, 0),
tb.getBoolean(2, 0),
FunctionLanguage.PYTHON);
return fs;
}
Expand All @@ -145,7 +144,7 @@ public boolean isValid() {
try {
return accessMethod.getConnection().isValid(1);
} catch (SQLException e) {
LOGGER.warn("Error checking connection validity", e);
LOGGER.info("Error checking connection validity", e);
return false;
}
}
Expand All @@ -156,27 +155,31 @@ public boolean isValid() {
* @throws DbException in case of error.
*/
public String getFunctionBinary(final String pyFunctionName) throws DbException {
LOGGER.info("Trying to get function" + pyFunctionName);

StringBuilder sb = new StringBuilder();
sb.append("Select function_binary from ");
sb.append("Select * from ");
sb.append(MyriaConstants.PYUDF_RELATION.toString(MyriaConstants.STORAGE_SYSTEM_POSTGRESQL));
sb.append(" where function_name='");
sb.append(pyFunctionName);
sb.append("'");
LOGGER.info(sb.toString());

try {

Iterator<TupleBatch> tuples =
accessMethod.tupleBatchIteratorFromQuery(sb.toString(), MyriaConstants.PYUDF_SCHEMA);

if (tuples.hasNext()) {
final TupleBatch tb = tuples.next();
LOGGER.info("Got {} tuples", tb.numTuples());
if (tb.numTuples() > 0) {
String codename = tb.getString(0, 0);
String codename = tb.getString(3, 0);
LOGGER.info("codename: " + codename);
return codename;
}
}

} catch (Exception e) {
LOGGER.info(e.getMessage());
throw new DbException(e);
Expand Down
8 changes: 8 additions & 0 deletions src/edu/washington/escience/myria/functions/PythonWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ public void sendCodePickle(
try {
if (pyCodeString.length() > 0 && dOut != null) {
byte[] bytes = pyCodeString.getBytes(StandardCharsets.UTF_8);
LOGGER.info("code string length is: " + pyCodeString.length());
dOut.writeInt(bytes.length);
LOGGER.info("wrote length");

dOut.write(bytes);
LOGGER.info("wrote bytes");

dOut.writeInt(numColumns);
LOGGER.info("wrote num columns: " + numColumns);
writeOutputType(outputType);
LOGGER.info("wrote output type: " + outputType.toString());
if (isFlatMap) {
dOut.writeInt(1);
} else {
Expand Down Expand Up @@ -165,6 +171,7 @@ private void createServerSocket() throws UnknownHostException, IOException {
* @throws IOException
*/
private void startPythonWorker() throws IOException {
LOGGER.info("starting python worker");

String pythonWorker = MyriaConstants.PYTHONWORKER;
ProcessBuilder pb = new ProcessBuilder(MyriaConstants.PYTHONEXEC, "-m", pythonWorker);
Expand All @@ -184,6 +191,7 @@ private void startPythonWorker() throws IOException {
out.flush();
clientSock = serverSocket.accept();
setupStreams();
LOGGER.info("successfully started the python worker");
return;
}

Expand Down
1 change: 0 additions & 1 deletion src/edu/washington/escience/myria/operator/Apply.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
if (expr.isConstant()) {
evaluator = new ConstantEvaluator(expr, parameters);
} else if (expr.isRegisteredUDF()) {

evaluator = new PythonUDFEvaluator(expr, parameters, getPythonFunctionRegistrar());
} else {
evaluator = new GenericEvaluator(expr, parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ protected void init(final ImmutableMap<String, Object> execEnvVars)

break;
case PYTHON:
BASE64Decoder decoder = new BASE64Decoder();
//BASE64Decoder decoder = new BASE64Decoder();
if (binary != null) {
byte[] decodedBytes = decoder.decodeBuffer(binary);
ByteBuffer binaryFunction = ByteBuffer.wrap(decodedBytes);
//byte[] decodedBytes = decoder.decodeBuffer(binary);
//ByteBuffer binaryFunction = ByteBuffer.wrap(decodedBytes);

PythonFunctionRegistrar pyFunc = new PythonFunctionRegistrar(connectionInfo);
pyFunc.addFunction(
name, description, outputType.toString(), isMultivalued, binaryFunction);
pyFunc.addFunction(name, description, outputType.toString(), isMultivalued, binary);
} else {
throw new DbException("Cannot register python UDF without binary.");
}
Expand Down
9 changes: 5 additions & 4 deletions src/edu/washington/escience/myria/operator/Operator.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ public ProfilingLogger getProfilingLogger() {
* @throws DbException in case of error.
*/
public PythonFunctionRegistrar getPythonFunctionRegistrar() throws DbException {
if (execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE) != null
&& !execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE).equals("true")) {
PythonFunctionRegistrar pyFuncRegistrar = null;
if (execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE) == null) {
if (getLocalSubQuery() instanceof WorkerSubQuery) {
return ((WorkerSubQuery) getLocalSubQuery()).getWorker().getPythonFunctionRegistrar();
pyFuncRegistrar =
((WorkerSubQuery) getLocalSubQuery()).getWorker().getPythonFunctionRegistrar();
}
}
return null;
return pyFuncRegistrar;
}

/**
Expand Down

0 comments on commit 0b81f5e

Please sign in to comment.