-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding blob data type and tests #862
Conversation
Starting to push Python UDF and Blob data type changes to master. |
This commit is to add python UDF registration support. |
…yria-python support
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For f48bb38, I have several general comments before going into details:
- I think the new function of getting the size of a tuple batch should be wrapped within a TupleBatch/TupleBatchBuffer/TupleBuffer/Mutable... object, Instead of calling TupleUtils with an explicit schema. The schema is determined when the TupleBatch/TupleBatchBuffer/... object is constructed, so the size should just be an internal property. With this change, some of the tuple batch building functions need to be cleaned up to use TupleBatchBuffer, instead of an explicit list of ColumnBuilders, in order to build a TupleBatch. ColumnBuilders should just be internal data structures of a TupleBatchBuffer most of the time, and their sizes are determined by the TupleBatchBuffer.
- There are multiple names of this new data type, I have seen Bytes, Blob, and ByteBuffer all being used. It would be more clear to just stick with one. I think either Bytes or Blob is good (personally prefer Blob for its clear definition as "binary large object", while Bytes is about physical representation), but ByteBuffer is how this new data type is implemented internally so should be avoided when possible.
- Have you tried any test case with more than one row in a BytesColumn? Just in case if there is any implementation detail that happens to rely on this assumption.
It might be more clear if you could just squash new changes with this commit once they are ready, since I believe they are independent from other python function registration commits.
I have changes bytebuffer and bytes to blob. Also made batchSize member of TupleBatch, TupleBuffer etc. I have not cleaned up to use tuplebatchbuffer, which is a little bit involved ( issue#865 assigned to me). |
There are some small stuffs, such as missing Javadoc (there might be more), typo, using Java-style function names (getBatchSize()), unremoved comments, etc. But more importantly, do you plan to do #865 in this PR? Looks like there are only a few places need to be changed, e.g. JdbcAccessMethod and maybe a few more, and they would make this PR self-contained. |
…and using camelcase for function name.
Addressing code review comments. |
Type.BYTES_TYPE, | ||
"output_type", | ||
Type.STRING_TYPE); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment similar to profiling?
@@ -470,6 +472,16 @@ public Response createFunction(final CreateFunctionEncoding encoding) throws DbE | |||
ResponseBuilder response = Response.ok(); | |||
return response.entity(functionCreationResponse).build(); | |||
} | |||
/** | |||
* @param queryId an optional query ID specifying which datasets to get. | |||
* @return a list of datasets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
/** | ||
* Python function registrar. | ||
*/ | ||
protected PythonFunctionRegistrar pyFuncReg; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pyFuncReg
is a per-worker property and should not be kept here. We can make the method getPythonFunctionRegistrar()
below as a wrapper of calling Worker
's getPythonFunctionRegistrar()
. (and my understanding is that the return type is non-null (or can we make it non-null)? Annotate the return type with @nonnull if that's the case and get rid of all the checkings like if (pyFuncReg != null)
.)
public PythonFunctionRegistrar getPythonFunctionRegistrar() { | ||
Preconditions.checkNotNull(pyFuncReg); | ||
if (execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE) != null | ||
&& execEnvVars.get(MyriaConstants.EXEC_ENV_VAR_TEST_MODE).equals("false")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you say a little more about this? Why the test mode has to be false to get a python function registrar (and seems it's never been set to be false in the whole code base)?
@@ -345,4 +411,12 @@ private void writeToStream( | |||
throw new DbException(e); | |||
} | |||
} | |||
|
|||
@Override | |||
public void sendEos() throws DbException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong but you probably don't need to write anything as PythonWorker.sendEos()
closes the connection anyway, unless the python process needs this information to do something before close.
if (input != null && input.hasArray()) { | ||
// LOGGER.info("input array buffer length" + input.array().length); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nit picking but please get rid of unnecessary empty lines. (There are some more)
@@ -352,7 +352,8 @@ private MyriaConstants() {} | |||
public static final int PYTHON_EXCEPTION = -3; | |||
/** python function return is null.*/ | |||
public static final int NULL_LENGTH = -5; | |||
|
|||
/** Send EOS tp python strea, is null.*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"to" and "stream". And if my comment below makes sense we can get rid of this declaration.
throws DbException { | ||
|
||
BitSet bs; | ||
switch (gColumnType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These types share many lines, merge them.
} | ||
} | ||
|
||
private void setBitset(final ReadableTable table, final int row, final Object[] groupAgg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'setBitSet'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
if (aggregators[agg] | ||
.getClass() | ||
.getName() | ||
.equals(StatefulUserDefinedAggregator.class.getName())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instanceof
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
@Override | ||
public void add(final List<TupleBatch> from, final Object state) throws DbException { | ||
// LOGGER.info("add tuple called"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unnecessary comments, same above & below
String script = compute.append(output).toString(); | ||
LOGGER.debug("Compiling UDA {}", script); | ||
LOGGER.info("Compiling UDA {}", script); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think debug
is enough since it is not some crucial state transition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
I made a pass, some of my comments are showing "outdated" but if you click on them, most of them still make sense. I feel that we may need another pass after these being resolved, in particular I think some newly added interfaces may get improved but I'd like to wait until we have a more clear version. Also, I think testing on having > 1 tuples in a blob column should help us discover many bugs. It's the best if you can add such a test. Even if it's hard, running some informal test should be helpful. |
* | ||
* @param value the value of this constant. | ||
*/ | ||
public ConstantExpression(final ByteBuffer value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First, this function is never used, and second, the return value would be a summary string of the ByteBuffer. It would cause a problem for getJavaString
since it can't be parsed as some value, and also for the ConstantExpression
constructor since we're expecting a direct value like in a JSON string. I think an UnsupportedOperationException
makes more sense if you don't really want to output all the bytes as string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
* @return | ||
* @throws DbException | ||
*/ | ||
public abstract void sendEos() throws DbException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this function is never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -0,0 +1,4 @@ | |||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this file is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -14,6 +14,7 @@ | |||
import org.apache.commons.httpclient.URIException; | |||
|
|||
import com.amazonaws.ClientConfiguration; | |||
import com.amazonaws.auth.AnonymousAWSCredentials; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
countIdx.putInt(0, flatmapid); | ||
flatmapid = 0; | ||
} | ||
if (getAddCounter()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if {} can be merged with the above one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -453,9 +453,13 @@ private void leftAndRightEqual() throws Exception { | |||
// advance the one with the larger set of equal tuples because this produces fewer join tuples | |||
// not exact but good approximation | |||
final int leftSizeOfGroupOfEqualTuples = | |||
leftRowIndex + TupleBatch.BATCH_SIZE * (leftBatches.size() - 1) - leftBeginIndex; | |||
leftRowIndex | |||
+ TupleUtils.getBatchSize(generateSchema()) * (leftBatches.size() - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the schema should be the schema of the left batch, not the output of this join. They may have different schemas.
* @return PythonFunctionRegistrar for operator. | ||
* @throws DbException in case of error. | ||
*/ | ||
public PythonFunctionRegistrar getPythonFunctionRegistrar() throws DbException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is, pyFuncRegistrar
is only used by PythonUDFEvaluator
, so you don't need to add this function to Operator
, and many aggregator-related methods. for example, AggUtils.allocateAggs()
. Just get pyFuncRegistrar
from the worker in PythonUDFEvaluator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets talk about this. Not clear to me how aggutils has access to the worker.
@@ -94,7 +99,7 @@ private void setUpdateExpressions(final List<Expression> updaterExpressions) { | |||
} | |||
|
|||
@Override | |||
protected TupleBatch fetchNextReady() throws DbException, InvocationTargetException { | |||
protected TupleBatch fetchNextReady() throws DbException, InvocationTargetException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IOException
is not thrown from anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
*/ | ||
void getResult(AppendableTable dest, int destColumn, Object state) throws DbException; | ||
void getResult(AppendableTable dest, int destColumn, Object state) | ||
throws DbException, IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the IOException
is never thrown.
@@ -53,7 +55,7 @@ public Aggregate(@Nullable final Operator child, final AggregatorFactory... aggr | |||
} | |||
|
|||
@Override | |||
protected TupleBatch fetchNextReady() throws DbException { | |||
protected TupleBatch fetchNextReady() throws DbException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IOException
is never thrown if we remove the IOException
from Aggregator.getResult()
. (See another comment in Aggregator.java
)
*/ | ||
@Override | ||
protected TupleBatch fetchNextReady() throws DbException { | ||
protected TupleBatch fetchNextReady() throws DbException, IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this IOException
.
/** Holds the corresponding TB for each group key in {@link #groupKeys}. */ | ||
private transient List<List<TupleBatch>> tbgroupState; | ||
/** Holds the bitset for each group key in {@link #groupKeys}. */ | ||
HashMap<Integer, BitSet> bs = new HashMap<Integer, BitSet>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a heads-up that these keys will be replace by rows in primitive columns in my refactor_aggregate PR.
Agree with @BrandonHaynes: using a stdlib API is not taking on a
dependency, and using temp files correctly is much less simple than one
would think at first glance.
…On Thu, Feb 9, 2017 at 12:16 PM, Brandon Haynes ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In src/edu/washington/escience/myria/CsvTupleWriter.java
<#862>:
> for (int i = 0; i < tuples.numTuples(); ++i) {
for (int j = 0; j < tuples.numColumns(); ++j) {
- row[j] = tuples.getObject(j, i).toString();
+ Type type = tbsc.getColumnType(j);
+ if (type.equals(Type.BLOB_TYPE)) {
+ // write the file out
+ // add filename to the csv file
+ String filename = UUID.randomUUID().toString();
Strongly disagree; IMO we should rely on the core language API by default,
and only roll-our-own implementation if there is a strong reason to do
otherwise (this is uncommon). The code would also be strengthened by using
Java-flavored RAII and automatically cleaning up the temp files (
deleteOnExit).
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#862>, or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAIBYdhjMM77prdC5m1bScntaXzFRnTdks5ra3Q5gaJpZM4LOb-j>
.
|
remove extra comment
bfa4a48
to
e6504da
Compare
wChannel.write(bb); | ||
wChannel.close(); | ||
fos.close(); | ||
private static String createTempFile(final ByteBuffer bb) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeToTempFile
is more clear
fos.close(); | ||
private static String createTempFile(final ByteBuffer bb) throws IOException { | ||
Path path = Files.createTempFile("out", null); | ||
File file = path.toFile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file
is unnecessary, you can use path.toAbsolutePath()
to get the absolute path.
@@ -1,5 +1,8 @@ | |||
package edu.washington.escience.myria.expression; | |||
|
|||
import java.io.IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two imports are not used.
@@ -96,7 +96,9 @@ public void testApplywithCounter() throws DbException { | |||
for (int batchIdx = 0; batchIdx < result.numTuples(); ++batchIdx, ++rowIdx) { | |||
char[] ngramChars = new char[] {(char) rowIdx, (char) (rowIdx + 1), (char) (rowIdx + 2)}; | |||
String ngram = new String(ngramChars); | |||
int fltmapid = (int) rowIdx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, and nit-picking: it's fine to just declare rowIdx
to be an int
wChannel.close(); | ||
fos.close(); | ||
private static String createTempFile(final ByteBuffer bb) throws IOException { | ||
Path path = Files.createTempFile("out", null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit-picking: maybe a better suffix
stateEvaluator.evaluate(null, 0, state, null); | ||
|
||
/* Set up the updaters. */ | ||
|
||
pyUpdateEvaluators = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is never used. (And you may need to change some constructors to remove it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I take back my word, it is used.
Looks good to me, only a few new comments. Also two issues need to be opened based on our discussion. |
No description provided.