Skip to content

Commit

Permalink
Merge 3cf932e into d4fdb71
Browse files Browse the repository at this point in the history
  • Loading branch information
senderista committed Jul 12, 2017
2 parents d4fdb71 + 3cf932e commit 125e870
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 10 deletions.
35 changes: 35 additions & 0 deletions integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import random
import json
import base64
import unittest
from tempfile import NamedTemporaryFile
from collections import Counter
Expand Down Expand Up @@ -386,5 +387,39 @@ def test(self):
count_result.to_dict(), uda_result.to_dict())


class BlobLiteralTest(MyriaTestBase):
def test(self):
program = r"""
R = [b'\x01' as bytes];
store(R, bytes);
"""
query = MyriaQuery.submit(program)
expected = [{'bytes': base64.standard_b64encode(b'\x01')}]
self.assertEqual(query.status, 'SUCCESS')
self.assertListOfDictsEqual(query.to_dict(), expected)


class BitsetTest(MyriaTestBase):
def test(self):
program = r"""
R = [b'\x01' as bytes];
S = [from R emit BITSET($0) as bit];
store(S, bits);
"""
query = MyriaQuery.submit(program)
expected = [
{'bit': True},
{'bit': False},
{'bit': False},
{'bit': False},
{'bit': False},
{'bit': False},
{'bit': False},
{'bit': False}
]
self.assertEqual(query.status, 'SUCCESS')
self.assertListOfDictsEqual(query.to_dict(), expected)


if __name__ == '__main__':
unittest.main()
3 changes: 2 additions & 1 deletion src/edu/washington/escience/myria/JsonTupleWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Base64;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -186,7 +187,7 @@ public void writeTuples(final ReadableTable tuples) throws IOException {
break;
case BLOB_TYPE:
print('"');
print("Byte buffer, cannot be written to json yet!");
print(Base64.getEncoder().encodeToString(tuples.getBlob(j, i).array()));
print('"');
break;
}
Expand Down
5 changes: 4 additions & 1 deletion src/edu/washington/escience/myria/MyriaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,10 @@ public final class MyriaConstants {
* Default imports for janino. Modules imported here can be used in expressions.
*/
public static final String[] DEFAULT_JANINO_IMPORTS = {
"com.google.common.hash.Hashing", "java.nio.charset.Charset"
"com.google.common.hash.Hashing",
"java.nio.charset.Charset",
"java.nio.ByteBuffer",
"java.util.Base64"
};

/** Private constructor to disallow building utility class. */
Expand Down
74 changes: 74 additions & 0 deletions src/edu/washington/escience/myria/expression/BitsetExpression.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package edu.washington.escience.myria.expression;

import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.expression.evaluate.ExpressionOperatorParameter;

/**
* Returns all bits in an input of BLOB_TYPE as a sequence of BOOLEAN_TYPE
* (enumerating the input bytes in little-endian order).
*/
public class BitsetExpression extends UnaryExpression {
/***/
private static final long serialVersionUID = 1L;

/**
* This is not really unused, it's used automagically by Jackson deserialization.
*/
@SuppressWarnings("unused")
private BitsetExpression() {
super();
}

/**
* Takes an input of BLOB_TYPE.
*
* @param operand the input blob
*/
public BitsetExpression(final ExpressionOperator operand) {
super(operand);
}

@Override
public Type getOutputType(final ExpressionOperatorParameter parameters) {
checkOperandType(Type.BLOB_TYPE, parameters);
return Type.BOOLEAN_TYPE;
}

@Override
public String getJavaString(final ExpressionOperatorParameter parameters) {
return new StringBuilder()
.append("ByteBuffer bb = (")
.append(getOperand().getJavaString(parameters))
.append(");\n")
.append("java.util.BitSet bs = java.util.BitSet.valueOf(bb);\n")
.append("int bits_len = bb.capacity() * 8;\n")
.append("boolean[] vals = new boolean[bits_len];\n")
.append("for (int i = 0; i < bits_len; ++i) {\n")
.append("vals[i] = bs.get(i);\n")
.append("}\n")
.append("return vals;\n")
.toString();
}

@Override
public String getJavaExpressionWithAppend(final ExpressionOperatorParameter parameters) {
return new StringBuilder()
.append("ByteBuffer bb = (")
.append(getOperand().getJavaString(parameters))
.append(");\n")
.append("java.util.BitSet bs = java.util.BitSet.valueOf(bb);\n")
.append("int bits_len = bb.capacity() * 8;\n")
.append(Expression.COUNT)
.append(".appendInt(bits_len);\n")
.append("for (int i = 0; i < bits_len; ++i) {\n")
.append(Expression.RESULT)
.append(".appendBoolean(bs.get(i));\n")
.append("}\n")
.toString();
}

@Override
public boolean hasArrayOutputType() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.washington.escience.myria.expression;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Objects;

import org.apache.commons.lang.StringEscapeUtils;
Expand Down Expand Up @@ -102,7 +103,7 @@ public ConstantExpression(final String value) {
* @param value the value of this constant.
*/
public ConstantExpression(final ByteBuffer value) {
throw new UnsupportedOperationException();
this(Type.BLOB_TYPE, Base64.getEncoder().encodeToString(value.array()));
}

@Override
Expand All @@ -122,7 +123,7 @@ public String getJavaString(final ExpressionOperatorParameter parameters) {
case DATETIME_TYPE:
throw new UnsupportedOperationException("using constant value of type DateTime");
case BLOB_TYPE:
return value;
return "ByteBuffer.wrap(Base64.getDecoder().decode(\"" + value + "\"))";
case STRING_TYPE:
return '\"' + StringEscapeUtils.escapeJava(value) + '\"';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
@Type(name = "WORKERID", value = WorkerIdExpression.class),
/* Unary */
@Type(name = "ABS", value = AbsExpression.class),
@Type(name = "BITSET", value = BitsetExpression.class),
@Type(name = "CAST", value = CastExpression.class),
@Type(name = "CEIL", value = CeilExpression.class),
@Type(name = "COS", value = CosExpression.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public ConstantEvaluator(
final Expression expression, final ExpressionOperatorParameter parameters)
throws DbException {
super(expression, parameters);
;
Preconditions.checkArgument(
!expression.hasOperator(VariableExpression.class)
&& !expression.hasOperator(StateExpression.class),
"Expression %s does not evaluate to a constant",
expression);
expression.isConstant(), "Expression %s does not evaluate to a constant", expression);
Preconditions.checkArgument(
!expression.isMultiValued(), "Expression %s is multivalued", expression);
type = expression.getOutputType(parameters);
String java;
try {
Expand Down Expand Up @@ -94,8 +94,9 @@ public void eval(
final int stateRow,
final WritableColumn result,
final WritableColumn count) {
Preconditions.checkArgument(
count == null, "Count column must be null for constant expressions");
result.appendObject(value);
count.appendInt(1);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/edu/washington/escience/myria/operator/Apply.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected void init(final ImmutableMap<String, Object> execEnvVars) throws DbExc

for (Expression expr : emitExpressions) {
GenericEvaluator evaluator;
if (expr.isConstant()) {
if (expr.isConstant() && !expr.isMultiValued()) {
evaluator = new ConstantEvaluator(expr, parameters);
} else if (expr.isRegisteredPythonUDF()) {
evaluator = new PythonUDFEvaluator(expr, parameters);
Expand Down
16 changes: 16 additions & 0 deletions test/edu/washington/escience/myria/api/SerializationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;

import org.junit.BeforeClass;
import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;

import edu.washington.escience.myria.expression.ConstantExpression;
import edu.washington.escience.myria.operator.network.distribute.DistributeFunction;
import edu.washington.escience.myria.operator.network.distribute.HashDistributeFunction;
import edu.washington.escience.myria.operator.network.distribute.RoundRobinDistributeFunction;
Expand Down Expand Up @@ -55,4 +58,17 @@ public void testDistributeFunctionWithNullNumPartitions() throws Exception {
assertEquals(df.getClass(), deserialized.getClass());
assertEquals(3, ((HashDistributeFunction) deserialized).getIndexes()[0]);
}

@Test
public void testBlobConstantExpression() throws Exception {
/* Setup */
ObjectReader reader = mapper.reader(ConstantExpression.class);
byte[] bytes = {0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F};
ConstantExpression expr = new ConstantExpression(ByteBuffer.wrap(bytes));
assertEquals("CgsMDQ4P", expr.getValue());
String serialized = mapper.writeValueAsString(expr);
ConstantExpression deserialized = reader.readValue(serialized);
assertEquals(expr.getClass(), deserialized.getClass());
assertEquals("CgsMDQ4P", deserialized.getValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package edu.washington.escience.myria.operator.apply;

import static org.junit.Assert.assertEquals;

import java.nio.ByteBuffer;

import org.junit.Test;

import com.google.common.collect.ImmutableList;

import edu.washington.escience.myria.DbException;
import edu.washington.escience.myria.Schema;
import edu.washington.escience.myria.Type;
import edu.washington.escience.myria.expression.Expression;
import edu.washington.escience.myria.expression.ExpressionOperator;
import edu.washington.escience.myria.expression.BitsetExpression;
import edu.washington.escience.myria.expression.VariableExpression;
import edu.washington.escience.myria.operator.Apply;
import edu.washington.escience.myria.operator.BatchTupleSource;
import edu.washington.escience.myria.storage.TupleBatch;
import edu.washington.escience.myria.storage.TupleBatchBuffer;
import edu.washington.escience.myria.util.TestEnvVars;

public class ApplyBitsetTest {

@Test
public void testApply() throws DbException {
final Schema schema = Schema.ofFields("bytes", Type.BLOB_TYPE);
final Schema expectedResultSchema = Schema.ofFields("bits", Type.BOOLEAN_TYPE);
final TupleBatchBuffer input = new TupleBatchBuffer(schema);

byte[] bytes = new byte[8];
for (int i = 0; i < 8; ++i) {
bytes[i] = (byte) (1 << i);
}
input.putBlob(0, ByteBuffer.wrap(bytes));

ImmutableList.Builder<Expression> expressions = ImmutableList.builder();
ExpressionOperator colIdx = new VariableExpression(0);
ExpressionOperator bits = new BitsetExpression(colIdx);
Expression expr = new Expression("bits", bits);
expressions.add(expr);

Apply apply = new Apply(new BatchTupleSource(input), expressions.build());
apply.open(TestEnvVars.get());
int rowIdx = 0;
while (!apply.eos()) {
TupleBatch result = apply.nextReady();
if (result != null) {
assertEquals(expectedResultSchema, result.getSchema());

for (int batchIdx = 0; batchIdx < result.numTuples(); ++batchIdx, ++rowIdx) {
int srcByteIdx = rowIdx / 8;
int srcBitIdx = rowIdx % 8;
boolean trueExpected = (srcByteIdx == srcBitIdx);
assertEquals(trueExpected, result.getBoolean(0, batchIdx));
}
}
}
assertEquals(8 * 8, rowIdx);
apply.close();
}
}

0 comments on commit 125e870

Please sign in to comment.