New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed AvroEncoder So That it Will Now Work With the Python Bindings #1952

Merged
merged 3 commits into from Jan 11, 2017
Jump to file or symbol
Failed to load files and symbols.
+19 −6
Diff settings

Always

Just for now

@@ -25,7 +25,7 @@ import org.apache.commons.io.IOUtils
import org.apache.commons.io.output.ByteArrayOutputStream
object AvroEncoder {
val deflater =new Deflater(Deflater.BEST_SPEED)
val deflater = new Deflater(Deflater.BEST_SPEED)
def compress(bytes: Array[Byte]): Array[Byte] = {
val deflater = new java.util.zip.Deflater
@@ -38,14 +38,14 @@ object AvroEncoder {
baos.toByteArray
}
def decompress(bytes: Array[Byte]): Array[Byte] = {

This comment has been minimized.

@jamesmcclain

jamesmcclain Jan 5, 2017

Member

Sorry to be pedantic, but there is an extra space here.

This comment has been minimized.

@jbouffard

jbouffard Jan 5, 2017

Contributor

It's fine. I just push up the change that should fix it.

This comment has been minimized.

@jamesmcclain

jamesmcclain Jan 6, 2017

Member

Actually, there was no extra space ... it was being removed there, not added. Sorry about that.

def decompress(bytes: Array[Byte]): Array[Byte] = {
val deflater = new java.util.zip.Inflater()
val bytesIn = new ByteArrayInputStream(bytes)
val in = new InflaterInputStream(bytesIn, deflater)
IOUtils.toByteArray(in)
}
def toBinary[T: AvroRecordCodec](thing: T): Array[Byte] = {
def toBinary[T: AvroRecordCodec](thing: T, deflate: Boolean = true): Array[Byte] = {
val format = implicitly[AvroRecordCodec[T]]
val schema: Schema = format.schema
@@ -54,20 +54,33 @@ object AvroEncoder {
val encoder = EncoderFactory.get().binaryEncoder(jos, null)
writer.write(format.encode(thing), encoder)
encoder.flush()
compress(jos.toByteArray)
if (deflate)
compress(jos.toByteArray)
else
jos.toByteArray
}
def fromBinary[T: AvroRecordCodec](bytes: Array[Byte]): T = {
val format = implicitly[AvroRecordCodec[T]]
fromBinary[T](format.schema, bytes)
}
def fromBinary[T: AvroRecordCodec](writerSchema: Schema, bytes: Array[Byte]): T = {
def fromBinary[T: AvroRecordCodec](bytes: Array[Byte], uncompress: Boolean): T = {
val format = implicitly[AvroRecordCodec[T]]
fromBinary[T](format.schema, bytes, uncompress)
}
def fromBinary[T: AvroRecordCodec](writerSchema: Schema, bytes: Array[Byte],
uncompress: Boolean = true): T = {
val format = implicitly[AvroRecordCodec[T]]
val schema = format.schema
val reader = new GenericDatumReader[GenericRecord](writerSchema, schema)
val decoder = DecoderFactory.get().binaryDecoder(decompress(bytes), null)
val decoder =
if (uncompress)
DecoderFactory.get().binaryDecoder(decompress(bytes), null)
else
DecoderFactory.get().binaryDecoder(bytes, null)
try {
val rec = reader.read(null.asInstanceOf[GenericRecord], decoder)
format.decode(rec)
ProTip! Use n and p to navigate between commits in a pull request.