Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgraded to Avro 1.7.5. #147

Merged
merged 3 commits into from
Oct 24, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ package com.twitter.bijection.avro
import com.twitter.bijection.Injection
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecordBase}
import org.apache.avro.file.{DataFileStream, DataFileWriter}
import java.io.{UnsupportedEncodingException, ByteArrayInputStream, ByteArrayOutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.twitter.bijection.Inversion.attempt
import com.twitter.bijection.Attempt
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.Schema
import org.apache.avro.io.{DecoderFactory, DatumReader, EncoderFactory, DatumWriter}
import Injection.utf8
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might want to add the comment that you are ensuring you have the String, Array[Byte] via utf8 (which is the default, BTW, but this does make it safer in case someone accidentally changes the default (why!?!?!) in Injection).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the comment


/**
* Factory providing various avro injections.
* @author Muhammad Ashraf
* @since 7/4/13
*/
object AvroCodecs {
object SpecificAvroCodecs {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks binary and source compatibility. Are you sure want to inconvenience for that? You could probably make this work just adding methods, which would keep others compatible, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnynek I tried to keep the compatibility but couldnt find a good way to do it. The problem is that prior to Avro 1.7.5, you had SpecificRecord and GenericRecords and you needed SpecificDatumReader and GenericDatumReader to serialize/deserialize them, now in 1.7.5 SpecificRecord is extending GenericRecord and AvroCodecs in develop branch doesnt even compile with 1.7.5 due to implicit resolution issues.

/**
* Returns Injection capable of serializing and deserializing a compiled Avro record using SpecificDatumWriter and
* SpecificDatumReader
Expand All @@ -41,27 +42,41 @@ object AvroCodecs {
new SpecificAvroCodec[T](klass)
}


/**
* Returns Injection capable of serializing and deserializing a generic record using GenericDatumReader and
* GenericDatumReader
* @tparam T generic record
* Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder
* @tparam T compiled Avro record
* @return Injection
*/
def apply[T <: GenericRecord](schema: Schema): Injection[T, Array[Byte]] = {
new GenericAvroCodec[T](schema)
def toBinary[T <: SpecificRecordBase : Manifest]: Injection[T, Array[Byte]] = {
val klass = manifest[T].erasure.asInstanceOf[Class[T]]
val writer = new SpecificDatumWriter[T](klass)
val reader = new SpecificDatumReader[T](klass)
new BinaryAvroCodec[T](writer, reader)
}

/**
* Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.BinaryEncoder
* Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder to a
* UTF-8 String
* @tparam T compiled Avro record
* @return Injection
*/
def toBinary[T <: SpecificRecordBase : Manifest]: Injection[T, Array[Byte]] = {
def toJson[T <: SpecificRecordBase : Manifest](schema: Schema): Injection[T, String] = {
val klass = manifest[T].erasure.asInstanceOf[Class[T]]
val writer = new SpecificDatumWriter[T](klass)
val reader = new SpecificDatumReader[T](klass)
new BinaryAvroCodec[T](writer, reader)
new JsonAvroCodec[T](schema, writer, reader)
}
}

object GenericAvroCodecs {
/**
* Returns Injection capable of serializing and deserializing a generic record using GenericDatumReader and
* GenericDatumReader
* @tparam T generic record
* @return Injection
*/
def apply[T <: GenericRecord](schema: Schema): Injection[T, Array[Byte]] = {
new GenericAvroCodec[T](schema)
}

/**
Expand All @@ -76,7 +91,8 @@ object AvroCodecs {
}

/**
* Returns Injection capable of serializing and deserializing a compiled avro record using org.apache.avro.io.JsonEncoder
* Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder to a
* UTF-8 String
* @tparam T compiled Avro record
* @return Injection
*/
Expand All @@ -85,18 +101,6 @@ object AvroCodecs {
val reader = new GenericDatumReader[T](schema)
new JsonAvroCodec[T](schema, writer, reader)
}

/**
* Returns Injection capable of serializing and deserializing a generic avro record using org.apache.avro.io.JsonEncoder
* @tparam T compiled Avro record
* @return Injection
*/
def toJson[T <: SpecificRecordBase : Manifest](schema: Schema): Injection[T, String] = {
val klass = manifest[T].erasure.asInstanceOf[Class[T]]
val writer = new SpecificDatumWriter[T](klass)
val reader = new SpecificDatumReader[T](klass)
new JsonAvroCodec[T](schema, writer, reader)
}
}

/**
Expand Down Expand Up @@ -175,45 +179,25 @@ class BinaryAvroCodec[T](writer: DatumWriter[T], reader: DatumReader[T]) extends

/**
* Provides methods to serializing and deserializing a generic and compiled avro record using org.apache.avro.io.JsonEncoder
* to a UTF-8 String
* @param writer Datum writer
* @param reader Datum reader
* @tparam T avro record
* @throws RuntimeException if Avro Records cannot be converted to a UTF-8 String
*/
class JsonAvroCodec[T](schema: Schema, writer: DatumWriter[T], reader: DatumReader[T]) extends Injection[T, String] {
def apply(a: T): String = {
val stream = new ByteArrayOutputStream()
val encoder = EncoderFactory.get().jsonEncoder(schema, stream)
writer.write(a, encoder)
encoder.flush()
newUtf8(stream.toByteArray)
Injection.invert[String, Array[Byte]](stream.toByteArray).get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

want to add a comment that this is always a UTF8 string (so the get failing is a Bug, not an exception case)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the comment

}

def invert(str: String): Attempt[T] = attempt(str) {
str =>
val decoder = DecoderFactory.get().jsonDecoder(schema, new ByteArrayInputStream(getBytesUtf8(str)))
val decoder = DecoderFactory.get().jsonDecoder(schema, new ByteArrayInputStream(Injection[String, Array[Byte]](str)))
reader.read(null.asInstanceOf[T], decoder)
}

private def newUtf8(bytes: Array[Byte]): String = {
try {
if (bytes == null) null else new String(bytes, "UTF-8")
}
catch {
case uee: UnsupportedEncodingException => {
throw new RuntimeException("UTF-8 Not supported on this platform")
}
}
}

private def getBytesUtf8(string: String): Array[Byte] = {
try {
if (string == null) null else string.getBytes("UTF-8")
}
catch {
case uee: UnsupportedEncodingException => {
throw new RuntimeException("UTF-8 Not supported on this platform")
}
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import com.twitter.bijection.{BaseProperties, Injection}
import org.scalacheck.Properties
import org.apache.avro.generic.{GenericRecord, GenericData}
import org.apache.avro.Schema
import avro.FiscalRecord

/**
* @author Muhammad Ashraf
* @since 7/5/13
*/
object AvroCodecLaws extends Properties("AvroCodecs") with BaseProperties {
object GenericAvroCodecLaws extends Properties("GenericAvroCodecs") with BaseProperties {
val testSchema = new Schema.Parser().parse( """{
"type":"record",
"name":"FiscalRecord",
Expand All @@ -51,15 +50,6 @@ object AvroCodecLaws extends Properties("AvroCodecs") with BaseProperties {
]
}""")


def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = {
FiscalRecord.newBuilder()
.setCalendarDate(i._1)
.setFiscalWeek(i._2)
.setFiscalYear(i._3)
.build()
}

def buildGenericAvroRecord(i: (String, Int, Int)): GenericRecord = {

val fiscalRecord = new GenericData.Record(testSchema)
Expand All @@ -69,48 +59,26 @@ object AvroCodecLaws extends Properties("AvroCodecs") with BaseProperties {
fiscalRecord
}

implicit val testSpecificRecord = arbitraryViaFn {
is: (String, Int, Int) => buildSpecificAvroRecord(is)
}

implicit val testGenericRecord = arbitraryViaFn {
is: (String, Int, Int) => buildGenericAvroRecord(is)
}

def roundTripsSpecificRecord(implicit injection: Injection[FiscalRecord, Array[Byte]]) = {
isLooseInjection[FiscalRecord, Array[Byte]]
}

def roundTripsGenericRecord(implicit injection: Injection[GenericRecord, Array[Byte]]) = {
isLooseInjection[GenericRecord, Array[Byte]]
}

def roundTripsSpecificRecordToJson(implicit injection: Injection[FiscalRecord, String]) = {
isLooseInjection[FiscalRecord, String]
}

def roundTripsGenericRecordToJson(implicit injection: Injection[GenericRecord, String]) = {
isLooseInjection[GenericRecord, String]
}

property("round trips Specific Record -> Array[Byte]") =
roundTripsSpecificRecord(AvroCodecs[FiscalRecord])

property("round trips Generic Record -> Array[Byte]") =
roundTripsGenericRecord(AvroCodecs[GenericRecord](testSchema))

property("round trips Specific Record -> Array[Byte] using Binary Encoder/Decoder") =
roundTripsSpecificRecord(AvroCodecs.toBinary[FiscalRecord])
roundTripsGenericRecord(GenericAvroCodecs[GenericRecord](testSchema))

property("round trips Generic Record -> Array[Byte] using Binary Encoder/Decoder") =
roundTripsGenericRecord(AvroCodecs.toBinary[GenericRecord](testSchema))
roundTripsGenericRecord(GenericAvroCodecs.toBinary[GenericRecord](testSchema))

property("round trips Generic Record -> String using Json Encoder/Decoder") =
roundTripsGenericRecordToJson(AvroCodecs.toJson[GenericRecord](testSchema))

property("round trips Specific Record -> String using Json Encoder/Decoder") =
roundTripsSpecificRecordToJson(AvroCodecs.toJson[FiscalRecord](testSchema))

roundTripsGenericRecordToJson(GenericAvroCodecs.toJson[GenericRecord](testSchema))
}


Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord}
* @author Muhammad Ashraf
* @since 7/6/13
*/
object AvroCodecsSpecification extends Specification with BaseProperties {
object GenericAvroCodecsSpecification extends Specification with BaseProperties {
val testSchema = new Schema.Parser().parse( """{
"type":"record",
"name":"FiscalRecord",
Expand Down Expand Up @@ -52,65 +52,33 @@ object AvroCodecsSpecification extends Specification with BaseProperties {
}""")


"Avro codec" should {

"Round trip specific record using Specific Injection" in {
implicit val specificInjection = AvroCodecs[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Binary Injection" in {
implicit val specificBinaryInjection = AvroCodecs.toBinary[FiscalRecord]
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[FiscalRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[FiscalRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Json Injection" in {
implicit val specificJsonInjection = AvroCodecs.toJson[FiscalRecord](testSchema)
val testRecord = buildSpecificAvroRecord(("2012-01-01", 1, 12))
val jsonString = Injection[FiscalRecord, String](testRecord)
val attempt = Injection.invert[FiscalRecord, String](jsonString)
attempt.get must_== testRecord
}
"Generic Avro codec" should {

"Round trip generic record using Generic Injection" in {
implicit val genericInjection = AvroCodecs[GenericRecord](testSchema)
implicit val genericInjection = GenericAvroCodecs[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Binary Injection" in {
implicit val genericBinaryInjection = AvroCodecs.toBinary[GenericRecord](testSchema)
"Round trip generic record using Binary Injection" in {
implicit val genericBinaryInjection = GenericAvroCodecs.toBinary[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val bytes = Injection[GenericRecord, Array[Byte]](testRecord)
val attempt = Injection.invert[GenericRecord, Array[Byte]](bytes)
attempt.get must_== testRecord
}

"Round trip specific record using Json Injection" in {
implicit val genericJsonInjection = AvroCodecs.toJson[GenericRecord](testSchema)
"Round trip generic record using Json Injection" in {
implicit val genericJsonInjection = GenericAvroCodecs.toJson[GenericRecord](testSchema)
val testRecord = buildGenericAvroRecord(("2012-01-01", 1, 12))
val jsonString = Injection[GenericRecord, String](testRecord)
val attempt = Injection.invert[GenericRecord, String](jsonString)
attempt.get must_== testRecord
}
}

def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = {
FiscalRecord.newBuilder()
.setCalendarDate(i._1)
.setFiscalWeek(i._2)
.setFiscalYear(i._3)
.build()
}

def buildGenericAvroRecord(i: (String, Int, Int)): GenericRecord = {

val fiscalRecord = new GenericData.Record(testSchema)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.twitter.bijection.avro

import org.scalacheck.Properties
import com.twitter.bijection.{Injection, BaseProperties}
import org.apache.avro.Schema
import avro.FiscalRecord

/**
* @author Muhammad Ashraf
* @since 10/5/13
*/
object SpecificAvroCodecLaws extends Properties("SpecificAvroCodecs") with BaseProperties {
val testSchema = new Schema.Parser().parse( """{
"type":"record",
"name":"FiscalRecord",
"namespace":"avro",
"fields":[
{
"name":"calendarDate",
"type":"string"
},
{
"name":"fiscalWeek",
"type":[
"int",
"null"
]
},
{
"name":"fiscalYear",
"type":[
"int",
"null"
]
}
]
}""")


def buildSpecificAvroRecord(i: (String, Int, Int)): FiscalRecord = {
FiscalRecord.newBuilder()
.setCalendarDate(i._1)
.setFiscalWeek(i._2)
.setFiscalYear(i._3)
.build()
}

implicit val testSpecificRecord = arbitraryViaFn {
is: (String, Int, Int) => buildSpecificAvroRecord(is)
}

def roundTripsSpecificRecord(implicit injection: Injection[FiscalRecord, Array[Byte]]) = {
isLooseInjection[FiscalRecord, Array[Byte]]
}

def roundTripsSpecificRecordToJson(implicit injection: Injection[FiscalRecord, String]) = {
isLooseInjection[FiscalRecord, String]
}

property("round trips Specific Record -> Array[Byte]") =
roundTripsSpecificRecord(SpecificAvroCodecs[FiscalRecord])

property("round trips Specific Record -> Array[Byte] using Binary Encoder/Decoder") =
roundTripsSpecificRecord(SpecificAvroCodecs.toBinary[FiscalRecord])

property("round trips Specific Record -> String using Json Encoder/Decoder") =
roundTripsSpecificRecordToJson(SpecificAvroCodecs.toJson[FiscalRecord](testSchema))

}


Loading