From 99ed2c4063d50c03e4c8b3edb7c946d9ea6b939e Mon Sep 17 00:00:00 2001 From: Parmita Date: Tue, 28 Jun 2016 15:07:39 -0700 Subject: [PATCH] cleaning up udf registration --- python/MyriaPythonWorker/worker.py | 1 - .../escience/myria/api/DatasetResource.java | 75 ---------- .../escience/myria/api/FunctionResource.java | 111 +++++++++++++++ .../myria/api/encoding/FunctionStatus.java | 90 ++++++++++++ .../myria/coordinator/MasterCatalog.java | 129 ++++++++++-------- .../escience/myria/operator/DbFunction.java | 4 - .../escience/myria/parallel/Server.java | 46 +++---- 7 files changed, 291 insertions(+), 165 deletions(-) create mode 100644 src/edu/washington/escience/myria/api/FunctionResource.java create mode 100644 src/edu/washington/escience/myria/api/encoding/FunctionStatus.java diff --git a/python/MyriaPythonWorker/worker.py b/python/MyriaPythonWorker/worker.py index b1d2dbd7d..eef85f177 100644 --- a/python/MyriaPythonWorker/worker.py +++ b/python/MyriaPythonWorker/worker.py @@ -35,7 +35,6 @@ def main(infile, outfile): print("python process done reading tuple, now writing ") retval = func(tup) write_with_length(retval, outfile, outputType) - #pickleSer.write_with_length(func(tup),outfile) outfile.flush() diff --git a/src/edu/washington/escience/myria/api/DatasetResource.java b/src/edu/washington/escience/myria/api/DatasetResource.java index 31d256fbc..c3d16465e 100644 --- a/src/edu/washington/escience/myria/api/DatasetResource.java +++ b/src/edu/washington/escience/myria/api/DatasetResource.java @@ -47,7 +47,6 @@ import edu.washington.escience.myria.api.encoding.CreateViewEncoding; import edu.washington.escience.myria.api.encoding.DatasetEncoding; import edu.washington.escience.myria.api.encoding.DatasetStatus; -import edu.washington.escience.myria.api.encoding.FunctionEncoding; import edu.washington.escience.myria.api.encoding.TipsyDatasetEncoding; import edu.washington.escience.myria.coordinator.CatalogException; import edu.washington.escience.myria.io.InputStreamSource; @@ -62,8 +61,6 @@ import edu.washington.escience.myria.parallel.Server; import edu.washington.escience.myria.storage.TupleBatch; -// remove once function encoding is complete - /** * This is the class that handles API calls to create or fetch datasets. * @@ -415,78 +412,6 @@ public Response createView(final CreateViewEncoding encoding) throws DbException return response.entity(queryId).build(); } - /** - * Creates an Function based on DbFunctionEncoding - * - * @POST - * @Path("/Function/") - * @Consumes(MediaType.APPLICATION_JSON) public Response createFunction(final CreateFunctionEncoding encoding) throws - * DbException { long queryId; try { queryId = - * server.createUDF(encoding.udfName, encoding.udfDefinition, encoding.workers); - * } catch (Exception e) { throw new DbException(); } // Build the response to - * return the queryId ResponseBuilder response = Response.ok(); return - * response.entity(queryId).build(); } - */ - - /** - * Creates an function based on DbFunctionEncoding - */ - @POST - @Path("/function/") - @Consumes(MediaType.APPLICATION_JSON) - public Response createFunction(final FunctionEncoding encoding) throws DbException { - long queryId; - try { - LOGGER.info(encoding.name + "\t " + encoding.text + "\t " + encoding.lang + "\t " + encoding.workers); - queryId = - server.createFunction(encoding.name, encoding.text, encoding.lang, encoding.workers, encoding.binary, - encoding.inputSchema, encoding.outputSchema); - - } catch (Exception e) { - throw new DbException(); - } - /* Build the response to return the queryId */ - ResponseBuilder response = Response.ok(); - return response.entity(queryId).build(); - } - - /** - * @param queryId an optional query ID specifying which datasets to get. - * @return a list of datasets. - * @throws DbException if there is an error accessing the Catalog. - */ - @GET - @Path("/function/") - public List getFunctions() throws DbException { - return server.getFunctions(); - } - - /** - * @param userName the user who owns the target relation. - * @param programName the program to which the target relation belongs. - * @param relationName the name of the target relation. - * @return metadata - * @throws DbException if there is an error in the database. - */ - @DELETE - @Path("/function/") - public Response deleteFunction(final String udf_name) throws DbException { - - Boolean status = server.functionExists(udf_name); - if (status == false) { - /* function not found, throw a 404 (Not Found) */ - throw new MyriaApiException(Status.NOT_FOUND, "That dataset was not found"); - } - - // delete command - try { - server.deleteFunction(udf_name); - } catch (Exception e) { - throw new DbException(); - } - return Response.noContent().build(); - } - /** * @param dataset the dataset to be ingested. * @return the created dataset resource. diff --git a/src/edu/washington/escience/myria/api/FunctionResource.java b/src/edu/washington/escience/myria/api/FunctionResource.java new file mode 100644 index 000000000..108e8bae5 --- /dev/null +++ b/src/edu/washington/escience/myria/api/FunctionResource.java @@ -0,0 +1,111 @@ +/** + * + */ +package edu.washington.escience.myria.api; + +import java.io.IOException; +import java.util.List; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.Response.Status; + +import org.apache.commons.httpclient.HttpStatus; +import org.slf4j.LoggerFactory; + +import com.wordnik.swagger.annotations.Api; +import com.wordnik.swagger.annotations.ApiOperation; +import com.wordnik.swagger.annotations.ApiResponse; +import com.wordnik.swagger.annotations.ApiResponses; + +import edu.washington.escience.myria.DbException; +import edu.washington.escience.myria.api.encoding.FunctionEncoding; +import edu.washington.escience.myria.api.encoding.FunctionStatus; +import edu.washington.escience.myria.parallel.Server; + +/** + * + */ +/** + * This is the class that handles API calls to create or fetch functions. + * + */ +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MyriaApiConstants.JSON_UTF_8) +@Path("/function") +@Api(value = "/function", description = "Operations on functions") +public class FunctionResource { + /** The Myria server running on the master. */ + @Context + private Server server; + /** Information about the URL of the request. */ + + /** Logger. */ + protected static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(FunctionResource.class); + + /** + * @param queryId an optional query ID specifying which datasets to get. + * @return a list of datasets. + * @throws DbException if there is an error accessing the Catalog. + */ + @GET + public List getFunctions() throws DbException { + LOGGER.info("get functions!"); + return server.getFunctions(); + } + + /** + * Creates an Function based on DbFunctionEncoding + * + * @POST + */ + @POST + @Path("/register") + @Consumes(MediaType.APPLICATION_JSON) + public Response createFunction(final FunctionEncoding encoding) throws DbException { + LOGGER.info("register function!"); + + long queryId; + try { + queryId = + server.createFunction(encoding.name, encoding.text, encoding.lang, encoding.workers, encoding.binary, + encoding.inputSchema, encoding.outputSchema); + + } catch (Exception e) { + throw new DbException(); + } + /* Build the response to return the queryId */ + ResponseBuilder response = Response.ok(); + return response.entity(queryId).build(); + } + + @GET + @ApiOperation(value = "get information about a function", response = FunctionStatus.class) + @ApiResponses(value = { @ApiResponse(code = HttpStatus.SC_NOT_FOUND, message = "Function not found", response = String.class) }) + @Produces({ MediaType.APPLICATION_OCTET_STREAM, MyriaApiConstants.JSON_UTF_8 }) + @Path("/{name}") + public Response getFunction(@PathParam("name") final String name) throws DbException, IOException { + + /* Assemble the name of the relation. */ + String functionName = name; + LOGGER.info("get function!"); + + LOGGER.info("looking for function with name: " + functionName); + FunctionStatus status = server.getFunctionDetails(functionName); + if (status == null) { + /* Not found, throw a 404 (Not Found) */ + throw new MyriaApiException(Status.NOT_FOUND, "That function was not found"); + } + + return Response.ok(status).build(); + } + +} diff --git a/src/edu/washington/escience/myria/api/encoding/FunctionStatus.java b/src/edu/washington/escience/myria/api/encoding/FunctionStatus.java new file mode 100644 index 000000000..a529088dd --- /dev/null +++ b/src/edu/washington/escience/myria/api/encoding/FunctionStatus.java @@ -0,0 +1,90 @@ +/** + * + */ +package edu.washington.escience.myria.api.encoding; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import edu.washington.escience.myria.MyriaConstants; +import edu.washington.escience.myria.Schema; + +/** + * + */ +public class FunctionStatus { + /** + * Instantiate a FunctionStatus with the provided values. + * + * @param name function name identifying the function. + * @param inputSchema The {@link Schema} of the input to the function. + * @param outputSchema The {@link Schema} of the input to the function. + * @param text Text associated with the function. + * @param lang language of the function. + */ + @JsonCreator + public FunctionStatus(@JsonProperty("name") final String name, @JsonProperty("inputSchema") final String inputSchema, + @JsonProperty("outputSchema") final String outputSchema, @JsonProperty("text") final String text, + @JsonProperty("lang") final MyriaConstants.FunctionLanguage lang) { + this.name = name; + this.outputSchema = outputSchema; + this.inputSchema = inputSchema; + this.text = text; + this.lang = lang; + + } + + /** The name identifying the function. */ + @JsonProperty + private final String name; + /** The {@link Schema} of the output tuples to the function. */ + @JsonProperty + private final String outputSchema; + /** The {@link Schema} of the input tuples to the function. */ + @JsonProperty + private final String inputSchema; + /** The text of the function */ + @JsonProperty + private final String text; + /** The text of the function */ + @JsonProperty + private final MyriaConstants.FunctionLanguage lang; + + // @JsonProperty public URI uri; + + /** + * @return The name identifying the function. + */ + public String getName() { + return name; + } + + /** + * @return The {@link Schema} of the output tuples in the function. + */ + public String getOutputSchema() { + return outputSchema; + } + + /** + * @return The {@link Schema} of the input tuples in the function. + */ + public String getInputSchema() { + return inputSchema; + } + + /** + * @return get text associated with the function + */ + public String getText() { + return text; + } + + /** + * @return the language of function + */ + public MyriaConstants.FunctionLanguage getLanguage() { + return lang; + } + +} diff --git a/src/edu/washington/escience/myria/coordinator/MasterCatalog.java b/src/edu/washington/escience/myria/coordinator/MasterCatalog.java index f38c39de2..411d32bbb 100644 --- a/src/edu/washington/escience/myria/coordinator/MasterCatalog.java +++ b/src/edu/washington/escience/myria/coordinator/MasterCatalog.java @@ -48,6 +48,7 @@ import edu.washington.escience.myria.accessmethod.SQLiteTupleBatchIterator; import edu.washington.escience.myria.api.MyriaJsonMapperProvider; import edu.washington.escience.myria.api.encoding.DatasetStatus; +import edu.washington.escience.myria.api.encoding.FunctionStatus; import edu.washington.escience.myria.api.encoding.QueryEncoding; import edu.washington.escience.myria.api.encoding.QueryStatusEncoding; import edu.washington.escience.myria.api.encoding.plan.SubPlanEncoding; @@ -151,19 +152,13 @@ public final class MasterCatalog { + "WHERE status = '" + QueryStatusEncoding.Status.ACCEPTED.toString() + "';"; private static final String CREATE_REGISTERED_UDFS = "CREATE TABLE registered_udfs (\n" - + " udf_id INTEGER NOT NULL, \n" + " udf_name INTEGER NOT NULL, \n" + " udf_definition TEXT NOT NULL,\n" - + " udf_language TEXT NOT NULL,\n" - + " udf_binary BLOB);"; - private static final String CREATE_UDF_SCHEMA = - "CREATE TABLE udf_schema (\n" - + " udf_name TEXT NOT NULL,\n" - + " udf_input INTEGER NOT NULL,\n" - + " udf_output INTEGER NOT NULL,\n" - + " col_index INTEGER NOT NULL,\n" - + " col_name TEXT,\n" - + " col_type TEXT NOT NULL);"; + + " udf_language INTEGER,\n" + + " udf_binary BLOB,\n" + + " udf_outputSchema TEXT NOT NULL,\n" + + " udf_inputSchema TEXT);"; + /** CREATE TABLE statements @formatter:on */ @@ -216,7 +211,6 @@ protected Object job(final SQLiteConnection sqliteConnection) throws SQLiteExcep sqliteConnection.exec(CREATE_SHARDS); sqliteConnection.exec(CREATE_SHARDS_INDEX); sqliteConnection.exec(CREATE_REGISTERED_UDFS); - sqliteConnection.exec(CREATE_UDF_SCHEMA); sqliteConnection.exec("END TRANSACTION"); } catch (final SQLiteException e) { sqliteConnection.exec("ROLLBACK TRANSACTION"); @@ -373,6 +367,55 @@ protected List job(final SQLiteConnection sqliteConnection) throws SQLit } } + /** + * Get the metadata about a relation. + * + * @param relationKey specified which relation to get the metadata about. + * @return the metadata of the specified relation. + * @throws CatalogException if there is an error in the catalog. + */ + public FunctionStatus getFunctionStatus(final String name) throws CatalogException { + if (isClosed) { + throw new CatalogException("Catalog is closed."); + } + + LOGGER.info("ger function status for function with name: " + name); + try { + return queue.execute(new SQLiteJob() { + @Override + protected FunctionStatus job(final SQLiteConnection sqliteConnection) throws CatalogException, SQLiteException { + try { + SQLiteStatement statement = + sqliteConnection + .prepare("SELECT udf_name, udf_definition, udf_language, udf_outputSchema, udf_inputSchema FROM registered_udfs WHERE udf_name=?"); + + statement.bind(1, name); + if (!statement.step()) { + LOGGER.info("returning null"); + return null; + } + + String name = statement.columnString(0); + String text = statement.columnString(1); + int lang = statement.columnInt(2); + String outputSchema = statement.columnString(3); + String inputSchema = statement.columnString(4); + + statement.dispose(); + + return new FunctionStatus(name, inputSchema, outputSchema, text, + MyriaConstants.FunctionLanguage.values()[lang]); + + } catch (final SQLiteException e) { + throw new CatalogException(e); + } + } + }).get(); + } catch (InterruptedException | ExecutionException e) { + throw new CatalogException(e); + } + } + /** * Private helper to add the metadata for a relation into the Catalog. * @@ -1511,17 +1554,13 @@ protected Void job(final SQLiteConnection sqliteConnection) throws CatalogExcept private void deleteFunctionIfExists(@Nonnull final SQLiteConnection sqliteConnection, @Nonnull final String udf_name, final boolean isOverwrite) throws CatalogException { try { - String sql = - String - .format("DELETE FROM registered_udfs WHERE udf_name=? AND %s;", (isOverwrite ? "1=1" : "is_deleted=1")); + String sql = String.format("DELETE FROM registered_udfs WHERE udf_name=? "); SQLiteStatement statement = sqliteConnection.prepare(sql); statement.bind(1, udf_name); statement.stepThrough(); statement.dispose(); statement = null; - // TODO: need to delete the input and output schema from the udf_schema table - // TODO: need to delete the UDF from postgres! } catch (final SQLiteException e) { throw new CatalogException(e); } @@ -1657,13 +1696,13 @@ protected Void job(final SQLiteConnection sqliteConnection) throws CatalogExcept public void registerFunction(@Nonnull final String name, @Nonnull final String text, @Nonnull final Schema outputSchema, final Schema inputSchema, final MyriaConstants.FunctionLanguage lang, final String binary) throws CatalogException { - LOGGER.info("in register UDFs"); - LOGGER.info("name " + name + "\t" + text + "\t" + lang); - LOGGER.info("outputschema " + outputSchema.toString()); if (isClosed) { throw new CatalogException("Catalog is closed."); } + LOGGER.info("Name of function: " + name); + LOGGER.info("input schema " + inputSchema); + LOGGER.info("output schema " + outputSchema); try { queue.execute(new SQLiteJob() { @@ -1672,57 +1711,29 @@ public void registerFunction(@Nonnull final String name, @Nonnull final String t protected Void job(final SQLiteConnection sqliteConnection) throws CatalogException, SQLiteException, IOException { try { - long udf_id = sqliteConnection.getLastInsertId(); /* First register the UDF */ + // surround delete and insert in a transaction: + + deleteFunctionIfExists(sqliteConnection, name, false); + SQLiteStatement statement = sqliteConnection - .prepare("INSERT INTO registered_udfs (udf_id, udf_name, udf_definition, udf_language, udf_binary) VALUES (?,?,?,?,?);"); + .prepare("INSERT INTO registered_udfs ( udf_name, udf_definition, udf_language, udf_binary, udf_outputSchema, udf_inputSchema) VALUES (?,?,?,?,?,?);"); - statement.bind(1, udf_id); - statement.bind(2, name); - statement.bind(3, text); - statement.bind(4, lang.toString()); + statement.bind(1, name); + statement.bind(2, text); + statement.bind(3, lang.ordinal()); if (binary != null) { // send the base64 string as string - statement.bind(5, binary); + statement.bind(4, binary); } + statement.bind(5, outputSchema.toString()); + statement.bind(6, inputSchema.toString()); statement.stepThrough(); statement.dispose(); statement = null; - /* Second, populate the Schema */ - statement = - sqliteConnection - .prepare("INSERT INTO udf_schema(udf_name,udf_input, udf_output,col_index,col_name,col_type) " - + "VALUES (?,?,?,?,?,?);"); - statement.bind(1, name); - - // output schema - for (int i = 0; i < outputSchema.numColumns(); ++i) { - statement.bind(2, 0); - statement.bind(3, 1);// output schema - statement.bind(4, i); - statement.bind(5, outputSchema.getColumnName(i)); - statement.bind(6, outputSchema.getColumnType(i).toString()); - statement.step(); - statement.reset(false); - } - // input schema - if (inputSchema != null) { - for (int i = 0; i < inputSchema.numColumns(); ++i) { - statement.bind(2, 1);// input schema - statement.bind(3, 0); - statement.bind(4, i); - statement.bind(5, inputSchema.getColumnName(i)); - statement.bind(6, inputSchema.getColumnType(i).toString()); - statement.step(); - statement.reset(false); - } - } - statement.dispose(); - statement = null; - } catch (final SQLiteException e) { LOGGER.info("SQLiteException" + e.getMessage()); throw new CatalogException(e); diff --git a/src/edu/washington/escience/myria/operator/DbFunction.java b/src/edu/washington/escience/myria/operator/DbFunction.java index d230b0153..60976b6e2 100644 --- a/src/edu/washington/escience/myria/operator/DbFunction.java +++ b/src/edu/washington/escience/myria/operator/DbFunction.java @@ -61,7 +61,6 @@ protected void init(final ImmutableMap execEnvVars) throws DbExc } if (lang == MyriaConstants.FunctionLanguage.POSTGRES) { - /* Open the database connection */ accessMethod = AccessMethod.of(connectionInfo.getDbms(), connectionInfo, false); /* Add the POSTGRES UDF */ @@ -70,10 +69,7 @@ protected void init(final ImmutableMap execEnvVars) throws DbExc if (lang == MyriaConstants.FunctionLanguage.PYTHON) { if (binary != null) { - // LOGGER.info("UDF code string length: " + binary.length()); - // LOGGER.info("Code String: " + binary); PythonFunctionRegistrar pyFunc = new PythonFunctionRegistrar(connectionInfo); - pyFunc.addUDF(name, binary); } else { throw new DbException("Cannot register python UDF without binary"); diff --git a/src/edu/washington/escience/myria/parallel/Server.java b/src/edu/washington/escience/myria/parallel/Server.java index 0caf845ec..bdbe6949b 100644 --- a/src/edu/washington/escience/myria/parallel/Server.java +++ b/src/edu/washington/escience/myria/parallel/Server.java @@ -59,6 +59,7 @@ import edu.washington.escience.myria.accessmethod.AccessMethod.IndexRef; import edu.washington.escience.myria.api.MyriaJsonMapperProvider; import edu.washington.escience.myria.api.encoding.DatasetStatus; +import edu.washington.escience.myria.api.encoding.FunctionStatus; import edu.washington.escience.myria.api.encoding.QueryEncoding; import edu.washington.escience.myria.coordinator.CatalogException; import edu.washington.escience.myria.coordinator.ConfigFileException; @@ -1181,14 +1182,9 @@ public long createFunction(final String name, final String text, final MyriaCons public long addFunctiontoDB(final Set workers, final String udfName, final String udfDefinition, final String binary, final MyriaConstants.FunctionLanguage lang) throws DbException { - if (binary != null) { - LOGGER.info("UDF code string length: " + binary.length()); - LOGGER.info("Code String: " + binary); - } - long queryID; try { - LOGGER.info("got inside first try loop"); + Map workerPlans = new HashMap<>(); for (Integer workerId : workers) { LOGGER.info("adding subplan for worker id " + workerId); @@ -1196,20 +1192,18 @@ public long addFunctiontoDB(final Set workers, final String udfName, fi udfDefinition, lang, binary, null))); } - LOGGER.info("Attempting to the submit query"); + ListenableFuture qf = queryManager.submitQuery("create UDF", "create UDF", "create UDF", new SubQueryPlan(new SinkRoot( new EOSSource())), workerPlans); - LOGGER.info("submit query succeeded"); + try { - LOGGER.info("getQueryID?"); queryID = qf.get().getQueryId(); } catch (ExecutionException | InterruptedException e) { throw new DbException("Error executing query", e.getCause()); } } catch (Exception e) { - LOGGER.info("Attempting to submit query caused and exception"); - + LOGGER.info("Attempting to submit query caused an exception"); LOGGER.info(e.getMessage()); e.printStackTrace(); @@ -1218,21 +1212,6 @@ public long addFunctiontoDB(final Set workers, final String udfName, fi return queryID; } - public boolean functionExists(final String udfName) { - // check in the catalog if the function exists - return true; - } - - public void deleteFunction(final String udfName) throws DbException { - // call catalog delete this function - try { - catalog.deleteFunctionFromCatalog(udfName); - } catch (CatalogException e) { - throw new DbException(e); - } - - } - public List getFunctions() throws DbException { try { return catalog.getFunctions(); @@ -2231,4 +2210,19 @@ public MyriaConfiguration getConfig() { public MasterCatalog getCatalog() { return catalog; } + + /** + * @param functionName + * @return + * @throws DbException + */ + public FunctionStatus getFunctionDetails(final String functionName) throws DbException { + + try { + LOGGER.info("get function status for function with name: " + functionName); + return catalog.getFunctionStatus(functionName); + } catch (CatalogException e) { + throw new DbException(e); + } + } }