Skip to content

Commit

Permalink
addressing more PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jortiz16 committed Jul 13, 2016
1 parent 44ded9a commit 10b1792
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,7 @@ public void createView(final String viewName, final String viewDefinition) throw
createViewPostgres(viewName, viewDefinition);
} else {
throw new UnsupportedOperationException(
"create index if not exists is not supported in "
+ jdbcInfo.getDbms()
+ ", implement me");
"create view is not supported in " + jdbcInfo.getDbms() + ", implement me");
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/edu/washington/escience/myria/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -451,17 +451,20 @@ public Response createView(final CreateViewEncoding encoding) throws DbException
@Path("/createFunction/")
@Consumes(MediaType.APPLICATION_JSON)
public Response createFunction(final CreateFunctionEncoding encoding) throws DbException {
long queryId;
String functionCreationResponse;
try {
queryId =
functionCreationResponse =
server.createFunction(
encoding.functionName, encoding.functionDefinition, encoding.workers);
encoding.functionName,
encoding.functionDefinition,
encoding.functionOutputSchema.toString(),
encoding.workers);
} catch (Exception e) {
throw new DbException();
}
/* Build the response to return the queryId */
ResponseBuilder response = Response.ok();
return response.entity(queryId).build();
return response.entity(functionCreationResponse).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

import java.util.Set;

import edu.washington.escience.myria.Schema;

/**
*
*/
public class CreateFunctionEncoding extends MyriaApiEncoding {
@Required public String functionName;
@Required public String functionDefinition;
@Required public Schema functionOutputSchema;
public Set<Integer> workers;
}
36 changes: 20 additions & 16 deletions src/edu/washington/escience/myria/coordinator/MasterCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public final class MasterCatalog {
+ " col_name TEXT,\n"
+ " col_type TEXT NOT NULL,\n"
+ " is_indexed INTEGER NOT NULL, \n"
+ " ascending_order BOOLEAN,\n"
+ " ascending_order INTEGER, \n"
+ " FOREIGN KEY (user_name,program_name,relation_name) REFERENCES relations ON DELETE CASCADE);";
/** Create an index on the relation_schema table. */
private static final String CREATE_RELATION_SCHEMA_INDEX =
Expand Down Expand Up @@ -392,8 +392,8 @@ private void addRelationMetadata(
/* Second, populate the Schema table. */
statement =
sqliteConnection.prepare(
"INSERT INTO relation_schema(user_name,program_name,relation_name,col_index,col_name,col_type,is_indexed) "
+ "VALUES (?,?,?,?,?,?,?);");
"INSERT INTO relation_schema(user_name,program_name,relation_name,col_index,col_name,col_type,is_indexed, ascending_order) "
+ "VALUES (?,?,?,?,?,?,?,?);");
statement.bind(1, relation.getUserName());
statement.bind(2, relation.getProgramName());
statement.bind(3, relation.getRelationName());
Expand All @@ -402,6 +402,7 @@ private void addRelationMetadata(
statement.bind(5, schema.getColumnName(i));
statement.bind(6, schema.getColumnType(i).toString());
statement.bind(7, 0);
statement.bindNull(8);
statement.step();
statement.reset(false);
}
Expand Down Expand Up @@ -1684,11 +1685,12 @@ protected Void job(final SQLiteConnection sqliteConnection)
try {
SQLiteStatement statement =
sqliteConnection.prepare(
"UPDATE relation_schema SET is_indexed=1 WHERE user_name=? AND program_name=? AND relation_name=? AND col_index=?;");
statement.bind(1, relation.getUserName());
statement.bind(2, relation.getProgramName());
statement.bind(3, relation.getRelationName());
statement.bind(4, indexes.get(indexID).getColumn());
"UPDATE relation_schema SET is_indexed=1, ascending_order=? WHERE user_name=? AND program_name=? AND relation_name=? AND col_index=?;");
statement.bind(1, indexes.get(indexID).isAscending() ? 1 : 0);
statement.bind(2, relation.getUserName());
statement.bind(3, relation.getProgramName());
statement.bind(4, relation.getRelationName());
statement.bind(5, indexes.get(indexID).getColumn());
statement.stepThrough();
statement.dispose();
statement = null;
Expand All @@ -1711,10 +1713,13 @@ protected Void job(final SQLiteConnection sqliteConnection)
*/

public void registerFunction(
@Nonnull final String functionName, @Nonnull final String functionDefinition)
@Nonnull final String functionName,
@Nonnull final String functionDefinition,
@Nonnull final String functionOutputSchema)
throws CatalogException {
Objects.requireNonNull(functionName, "function name");
Objects.requireNonNull(functionDefinition, "function definition");
Objects.requireNonNull(functionOutputSchema, "function output schema");
if (isClosed) {
throw new CatalogException("Catalog is closed.");
}
Expand All @@ -1728,16 +1733,17 @@ public void registerFunction(
protected Void job(final SQLiteConnection sqliteConnection)
throws CatalogException, SQLiteException {
try {
deleteFunctionIfExists(sqliteConnection, functionName, false);
deleteFunctionIfExists(sqliteConnection, functionName);

SQLiteStatement statement =
sqliteConnection.prepare(
"INSERT INTO registered_functions (function_name, function_definition) VALUES (?,?);");
"INSERT INTO registered_functions (function_name, function_definition, function_outputSchema) VALUES (?,?,?);");
statement.bind(1, functionName);
statement.bind(2, functionDefinition);
statement.bind(3, functionOutputSchema);
statement.stepThrough();
statement.dispose();

statement = null;
} catch (final SQLiteException e) {
throw new CatalogException(e);
}
Expand All @@ -1758,12 +1764,10 @@ protected Void job(final SQLiteConnection sqliteConnection)
* @throws CatalogException if there is an error
*/
private void deleteFunctionIfExists(
@Nonnull final SQLiteConnection sqliteConnection,
@Nonnull final String functionName,
final boolean isOverwrite)
@Nonnull final SQLiteConnection sqliteConnection, @Nonnull final String functionName)
throws CatalogException {
try {
String sql = String.format("DELETE FROM registered_functions WHERE function_name=? ");
String sql = String.format("DELETE FROM registered_functions WHERE function_name=?;");
SQLiteStatement statement = sqliteConnection.prepare(sql);
statement.bind(1, functionName);
statement.stepThrough();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
*/
package edu.washington.escience.myria.operator;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.collect.ImmutableMap;

import edu.washington.escience.myria.DbException;
Expand All @@ -17,7 +14,7 @@
/**
*
*/
public class DbCreateFunction extends RootOperator {
public class DbExecute extends RootOperator {
/** Required for Java serialization. */
private static final long serialVersionUID = 1L;

Expand All @@ -26,23 +23,19 @@ public class DbCreateFunction extends RootOperator {
/** The information for the database connection. */
private ConnectionInfo connectionInfo;

private final String functionName;
private final String functionDefinition;
private final String sqlToExecute;

/**
* @param child the source of tuples to be inserted.
* @param relationKey the key of the table the tuples should be inserted into.
* @param sqlToExecute the sql command to execute
* @param connectionInfo the parameters of the database connection.
*/
public DbCreateFunction(
final Operator child,
final String functionName,
final String functionDefinition,
final ConnectionInfo connectionInfo) {
public DbExecute(
final Operator child, final String sqlToExecute, final ConnectionInfo connectionInfo) {
super(child);
this.connectionInfo = connectionInfo;
this.functionName = functionName;
this.functionDefinition = functionDefinition;
this.sqlToExecute = sqlToExecute;
}

@Override
Expand All @@ -55,14 +48,8 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc
/* Open the database connection */
accessMethod = AccessMethod.of(connectionInfo.getDbms(), connectionInfo, false);

/* Validate command */
Pattern pattern = Pattern.compile("(CREATE FUNCTION)([\\s\\S]*)(LANGUAGE SQL;)");
Matcher matcher = pattern.matcher(functionDefinition);

if (matcher.matches()) {
/* Run command */
accessMethod.runCommand(functionDefinition);
}
/* Run command */
accessMethod.runCommand(sqlToExecute);
}

@Override
Expand Down
87 changes: 50 additions & 37 deletions src/edu/washington/escience/myria/parallel/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -86,10 +88,10 @@
import edu.washington.escience.myria.io.UriSink;
import edu.washington.escience.myria.operator.Apply;
import edu.washington.escience.myria.operator.DataOutput;
import edu.washington.escience.myria.operator.DbCreateFunction;
import edu.washington.escience.myria.operator.DbCreateIndex;
import edu.washington.escience.myria.operator.DbCreateView;
import edu.washington.escience.myria.operator.DbDelete;
import edu.washington.escience.myria.operator.DbExecute;
import edu.washington.escience.myria.operator.DbInsert;
import edu.washington.escience.myria.operator.DbQueryScan;
import edu.washington.escience.myria.operator.DuplicateTBGenerator;
Expand Down Expand Up @@ -1099,52 +1101,63 @@ public long createView(
/**
* Create a function and register it in the catalog
*/
public long createFunction(
final String functionName, final String functionDefinition, final Set<Integer> workers)
public String createFunction(
final String functionName,
final String functionDefinition,
final String functionOutputSchema,
final Set<Integer> workers)
throws DbException, InterruptedException {
long queryID;
String response = "Created Function";
Set<Integer> actualWorkers = workers;
if (workers == null) {
actualWorkers = getWorkers().keySet();
}

/* Create the function */
try {
Map<Integer, SubQueryPlan> workerPlans = new HashMap<>();
for (Integer workerId : actualWorkers) {
workerPlans.put(
workerId,
new SubQueryPlan(
new DbCreateFunction(
EmptyRelation.of(Schema.EMPTY_SCHEMA),
functionName,
functionDefinition,
null)));
}
ListenableFuture<Query> qf =
queryManager.submitQuery(
"create function",
"create function",
"create function",
new SubQueryPlan(new SinkRoot(new EOSSource())),
workerPlans);
/* Validate the command */
Pattern pattern = Pattern.compile("(CREATE FUNCTION)([\\s\\S]*)(LANGUAGE SQL;)");
Matcher matcher = pattern.matcher(functionDefinition);

if (matcher.matches()) {
/* Add a replace statement */
String modifiedReplaceFunction =
functionDefinition.replace("CREATE FUNCTION", "CREATE OR REPLACE FUNCTION");

/* Create the function */
try {
queryID = qf.get().getQueryId();
} catch (ExecutionException e) {
throw new DbException("Error executing query", e.getCause());
Map<Integer, SubQueryPlan> workerPlans = new HashMap<>();
for (Integer workerId : actualWorkers) {
workerPlans.put(
workerId,
new SubQueryPlan(
new DbExecute(
EmptyRelation.of(Schema.EMPTY_SCHEMA), modifiedReplaceFunction, null)));
}
ListenableFuture<Query> qf =
queryManager.submitQuery(
"create function",
"create function",
"create function",
new SubQueryPlan(new SinkRoot(new EOSSource())),
workerPlans);
try {
qf.get().getQueryId();
} catch (ExecutionException e) {
throw new DbException("Error executing query", e.getCause());
}
} catch (CatalogException e) {
throw new DbException(e);
}
} catch (CatalogException e) {
throw new DbException(e);
}

/* Register the function to the catalog */
try {
catalog.registerFunction(functionName, functionDefinition);
} catch (CatalogException e) {
throw new DbException(e);
/* Register the function to the catalog */
try {
catalog.registerFunction(functionName, functionDefinition, functionOutputSchema);
} catch (CatalogException e) {
throw new DbException(e);
}
} else {
response = "Function is not valid";
}

return queryID;
return response;
}

/**
Expand Down

0 comments on commit 10b1792

Please sign in to comment.