Skip to content
Browse files

First code drop to open source.

  • Loading branch information...
0 parents commit 478b793efeabcf9a92f7c297fa64dca14e1be6cc @jghoman jghoman committed
Showing with 3,722 additions and 0 deletions.
  1. +4 −0 CHANGES.txt
  2. +202 −0 LICENSE
  3. +153 −0 pom.xml
  4. +231 −0 readme.md
  5. +48 −0 src/main/java/com/linkedin/haivvreo/AvroContainerInputFormat.java
  6. +58 −0 src/main/java/com/linkedin/haivvreo/AvroContainerOutputFormat.java
  7. +225 −0 src/main/java/com/linkedin/haivvreo/AvroDeserializer.java
  8. +88 −0 src/main/java/com/linkedin/haivvreo/AvroGenericRecordReader.java
  9. +82 −0 src/main/java/com/linkedin/haivvreo/AvroGenericRecordWritable.java
  10. +48 −0 src/main/java/com/linkedin/haivvreo/AvroGenericRecordWriter.java
  11. +147 −0 src/main/java/com/linkedin/haivvreo/AvroObjectInspectorGenerator.java
  12. +86 −0 src/main/java/com/linkedin/haivvreo/AvroSerDe.java
  13. +253 −0 src/main/java/com/linkedin/haivvreo/AvroSerializer.java
  14. +36 −0 src/main/java/com/linkedin/haivvreo/HaivvreoException.java
  15. +98 −0 src/main/java/com/linkedin/haivvreo/HaivvreoUtils.java
  16. +181 −0 src/main/java/com/linkedin/haivvreo/SchemaToTypeInfo.java
  17. +98 −0 src/test/avro/kitchensink.avsc
  18. +23 −0 src/test/avro/test_serializer.avsc
  19. +406 −0 src/test/java/com/linkedin/haivvreo/TestAvroDeserializer.java
  20. +483 −0 src/test/java/com/linkedin/haivvreo/TestAvroObjectInspectorGenerator.java
  21. +214 −0 src/test/java/com/linkedin/haivvreo/TestAvroSerializer.java
  22. +74 −0 src/test/java/com/linkedin/haivvreo/TestGenericAvroRecordWritable.java
  23. +159 −0 src/test/java/com/linkedin/haivvreo/TestHaivvreoUtils.java
  24. +109 −0 src/test/java/com/linkedin/haivvreo/TestSchemaReEncoder.java
  25. +38 −0 src/test/java/com/linkedin/haivvreo/Utils.java
  26. +5 −0 src/test/resources/README
  27. +17 −0 src/test/resources/kitchensink.sql
  28. +3 −0 src/test/resources/test_serializer.csv
  29. +41 −0 src/test/resources/test_serializer.sql
  30. +112 −0 src/test/scala/com/linkedin/haivvreo/writeKitchenSink.scala
4 CHANGES.txt
@@ -0,0 +1,4 @@
+Haivvreo change log:
+
+1.0.2 Lazy init of serializer/deserializer was overly so.
+1.0.1 Add ability to read schema from HDFS.
202 LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2010 LinkedIn
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
153 pom.xml
@@ -0,0 +1,153 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.linkedin</groupId>
+ <artifactId>haivvreo</artifactId>
+ <packaging>jar</packaging>
+ <version>1.0.2-SNAPSHOT</version>
+ <name>haivvreo</name>
+ <url>https://github.com/jghoman/haivvreo</url>
+ <description>Library for processing Avro data in Hive.</description>
+ <inceptionYear>2011</inceptionYear>
+ <licenses>
+ <license>
+ <name>Apache 2</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <developers>
+ <developer>
+ <id>jghoman</id>
+ <name>Jakob Homan</name>
+ <organization>LinkedIn</organization>
+ <email>jhoman@linkedin.com</email>
+ </developer>
+ </developers>
+ <organization>
+ <name>LinkedIn</name>
+ <url>http://sna-projects.com/sna/</url>
+ </organization>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <repositories>
+ <repository>
+ <id>maven</id>
+ <name>Maven 2 repo</name>
+ <url>http://repo1.maven.org/maven2/</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <id>scala</id>
+ <name>Scala tools</name>
+ <url>http://scala-tools.org/repo-releases</url>
+ </repository>
+ <repository>
+ <id>repository.jboss.org</id>
+ <url>http://repository.jboss.org/nexus/content/groups/public/</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <id>apache snapshots</id>
+ <name>www.apache.org</name>
+ <url>https://repository.apache.org/content/repositories/snapshots</url>
+ </repository>
+ <repository>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <id>apache releases</id>
+ <name>www.apache.org</name>
+ <url>https://repository.apache.org/content/repositories/releases</url>
+ </repository>
+ </repositories>
+ <dependencies>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.8.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>0.7.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.15</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.7.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.4.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.20.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ <version>0.20.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>2.8.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
231 readme.md
@@ -0,0 +1,231 @@
+Hive + Avro = Haivvreo: Putting Avro into Hive
+==============================================
+
+Overview
+--------
+Haivvreo (pronounced with as many diphthongs as possible. Diphthongs are cool, like bowties.) is a Hive Serde that LinkedIn has developed to process Avro-encoded data in Hive. Haivvreo's bullet points:
+
+* Infers the schema of the Hive table from the Avro schema.
+* Reads all Avro files within a table against a specified schema, taking advantage of Avro's backwards compatibility abilities
+* Supports arbitrarily nested schemas.
+* Translates all Avro data types into equivalent Hive types. Most types map exactly, but some Avro types don't exist in Hive and are automatically converted by Haivvreo.
+* Understands compressed Avro files.
+* Transparently converts the Avro idiom of handling nullable types as Union[T, null] into just T and returns null when appropriate.
+* Writes any Hive table to Avro files.
+* Has worked reliably against our most convoluted Avro schemas in our ETL process.
+
+License
+-------
+Haivvreo is Apache 2 licensed.
+
+Requirements
+------------
+Haivvreo has been built and tested against Hive 0.7 and Avro 1.4.1. Its build script is built on Maven 3.
+
+Building and deploying
+----------------------
+Once the jar has been has been built via maven, it and Avro and Avro's transitive dependencies (avro-1.4.1.jar, jackson-core-asl-1.4.2.jar, jackson-mapper-asl-1.4.2.jar) should be made available to Hive either by placing them in the cluster's lib folders or by pointing [HIVE_AUX_JARS_PATH](http://wiki.apache.org/hadoop/Hive/AdminManual/Configuration) to a location with them.
+
+Avro to Hive type conversion
+----------------------------
+While most Avro types convert directly to equivalent Hive types, there are some which do not exist in Hive and are converted to reasonable equivalents. Also, Haivvreo special cases unions of null and another type, as described below:
+<table>
+ <tr>
+ <th>Avro type</th><th>Becomes Hive type</th><th>Note</th>
+ </tr>
+ <tr><td>null</td><td>void</td><td></td></tr>
+ <tr><td>boolean</td><td>boolean</td><td></td></tr>
+ <tr><td>int</td><td>int</td><td></td></tr>
+ <tr><td>long</td><td>bigint</td><td></td></tr>
+ <tr><td>float</td><td>float</td><td></td></tr>
+ <tr><td>double</td><td>double</td><td></td></tr>
+ <tr><td>bytes</td><td>Array[smallint]</td><td>Hive converts these to signed bytes. </td></tr>
+ <tr><td>string</td><td>string</td><td></td></tr>
+ <tr><td>record</td><td>struct</td><td></td></tr>
+ <tr><td>map</td><td>map</td><td></td></tr>
+ <tr><td>list</td><td>array</td><td></td></tr>
+ <tr><td>union</td><td>union</td><td>Unions of [T, null] transparently convert to nullable T, other types translate directly to Hive's unions of those types. However, unions were introduced in Hive 7 and are not currently able to be used in where/group-by statements. They are essentially look-at-only. I'll file a bug with Hive on this. Because Haivvreo transparently converts [T,null], to nullable T, this limitation only applies to unions of multiple types or unions not of a single type and null. </td></tr>
+ <tr><td>enum</td><td>string</td><td>Hive has no concept of enums</td></tr>
+ <tr><td>fixed</td><td>Array[smallint]</td><td>Hive converts the bytes to signed int</td></tr>
+</table>
+
+Creating Avro-backed Hive tables
+--------------------------------
+To create a Haivvreo-backed table, specify the serde as com.linkedin.AvroSerDe, specify the inputformat as com.linkedin.haivvreo.AvroContainerInputFormat, and the outputformat as com.linkedin.haivvreo.AvroContainerOutputFormat. Also provide a location from which Haivvreo will pull the most current schema for the table. For example:
+<pre>CREATE TABLE kst
+ PARTITIONED BY (ds string)
+ ROW FORMAT SERDE
+ 'com.linkedin.haivvreo.AvroSerDe'
+ WITH SERDEPROPERTIES (
+ 'schema.url'='http://schema_provider/kst.avsc')
+ STORED AS INPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerInputFormat'
+ OUTPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerOutputFormat';
+</pre>
+In this example we're pulling the source-of-truth reader schema from a webserver. Other options for providing the schema are described below.
+Add the Avro files to the database (or create an external table) using [standard Hive operations](http://wiki.apache.org/hadoop/Hive/LanguageManual/DML).
+This table might result in a description as below:
+<pre>hive> describe kst;
+OK
+string1 string from deserializer
+string2 string from deserializer
+int1 int from deserializer
+boolean1 boolean from deserializer
+long1 bigint from deserializer
+float1 float from deserializer
+double1 double from deserializer
+inner_record1 struct&lt;int_in_inner_record1:int,string_in_inner_record1:string&gt; from deserializer
+enum1 string from deserializer
+array1 array&lt;string&gt; from deserializer
+map1 map&lt;string,string&gt; from deserializer
+union1 uniontype&lt;float,boolean,string&gt; from deserializer
+fixed1 array&lt;tinyint&gt; from deserializer
+null1 void from deserializer
+unionnullint int from deserializer
+bytes1 array&lt;tinyint&gt; from deserializer</pre>
+
+
+At this point, the Avro-backed table can be worked with in Hive like any other table. Haivvreo cannot yet show comments included in the Avro schema, though a [JIRA has been opened](https://issues.apache.org/jira/browse/HIVE-2171) for this feature.
+
+Writing tables to Avro files
+----------------------------
+Haivvreo can serialize any Hive table to Avro files. This makes it effectively an any-Hive-type to Avro converter. In order to write a table to an Avro file, you must first create an appropriate Avro schema. Create as select type statements are not currently supported. Types translate as detailed in the table above. For types that do not translate directly, there are a few items to keep in mind:
+
+* **Types that may be null must be defined as a union of that type and Null within Avro.** A null in a field that is not so defined with result in an exception during the save. No changes need be made to the Hive schema to support this, as all fields in Hive can be null.
+* Avro Bytes type should be defined in Hive as lists of tiny ints. Haivvreo will convert these to Bytes during the saving process.
+* Avro Fixed type should be defined in Hive as lists of tiny ints. Haivvreo will convert these to Fixed during the saving process.
+* Avro Enum type should be defined in Hive as strings, since Hive doesn't have a concept of enums. Ensure that only valid enum values are present in the table - trying to save a non-defined enum will result in an exception.
+
+**Example**
+
+Consider the following Hive table, which coincidentally covers all types of Hive data types, making it a good example:
+<pre>CREATE TABLE test_serializer(string1 STRING,
+ int1 INT,
+ tinyint1 TINYINT,
+ smallint1 SMALLINT,
+ bigint1 BIGINT,
+ boolean1 BOOLEAN,
+ float1 FLOAT,
+ double1 DOUBLE,
+ list1 ARRAY&lt;STRING&gt;,
+ map1 MAP&lt;STRING,INT&gt;,
+ struct1 STRUCT&lt;sint:INT,sboolean:BOOLEAN,sstring:STRING&gt;,
+ union1 uniontype&lt;FLOAT, BOOLEAN, STRING&gt;,
+ enum1 STRING,
+ nullableint INT,
+ bytes1 ARRAY&lt;TINYINT&gt;,
+ fixed1 ARRAY&lt;TINYINT&gt;)
+ ROW FORMAT DELIMITED FIELDS TERMINATED BY &#x27;,&#x27; COLLECTION ITEMS TERMINATED BY &#x27;:&#x27; MAP KEYS TERMINATED BY &#x27;#&#x27; LINES TERMINATED BY &#x27;\n&#x27;
+ STORED AS TEXTFILE;</pre>
+To save this table as an Avro file, create an equivalent Avro schema (the namespace and actual name of the record are not important):
+<pre>{
+ &quot;namespace&quot;: &quot;com.linkedin.haivvreo&quot;,
+ &quot;name&quot;: &quot;test_serializer&quot;,
+ &quot;type&quot;: &quot;record&quot;,
+ &quot;fields&quot;: [
+ { &quot;name&quot;:&quot;string1&quot;, &quot;type&quot;:&quot;string&quot; },
+ { &quot;name&quot;:&quot;int1&quot;, &quot;type&quot;:&quot;int&quot; },
+ { &quot;name&quot;:&quot;tinyint1&quot;, &quot;type&quot;:&quot;int&quot; },
+ { &quot;name&quot;:&quot;smallint1&quot;, &quot;type&quot;:&quot;int&quot; },
+ { &quot;name&quot;:&quot;bigint1&quot;, &quot;type&quot;:&quot;long&quot; },
+ { &quot;name&quot;:&quot;boolean1&quot;, &quot;type&quot;:&quot;boolean&quot; },
+ { &quot;name&quot;:&quot;float1&quot;, &quot;type&quot;:&quot;float&quot; },
+ { &quot;name&quot;:&quot;double1&quot;, &quot;type&quot;:&quot;double&quot; },
+ { &quot;name&quot;:&quot;list1&quot;, &quot;type&quot;:{&quot;type&quot;:&quot;array&quot;, &quot;items&quot;:&quot;string&quot;} },
+ { &quot;name&quot;:&quot;map1&quot;, &quot;type&quot;:{&quot;type&quot;:&quot;map&quot;, &quot;values&quot;:&quot;int&quot;} },
+ { &quot;name&quot;:&quot;struct1&quot;, &quot;type&quot;:{&quot;type&quot;:&quot;record&quot;, &quot;name&quot;:&quot;struct1_name&quot;, &quot;fields&quot;: [
+ { &quot;name&quot;:&quot;sInt&quot;, &quot;type&quot;:&quot;int&quot; }, { &quot;name&quot;:&quot;sBoolean&quot;, &quot;type&quot;:&quot;boolean&quot; }, { &quot;name&quot;:&quot;sString&quot;, &quot;type&quot;:&quot;string&quot; } ] } },
+ { &quot;name&quot;:&quot;union1&quot;, &quot;type&quot;:[&quot;float&quot;, &quot;boolean&quot;, &quot;string&quot;] },
+ { &quot;name&quot;:&quot;enum1&quot;, &quot;type&quot;:{&quot;type&quot;:&quot;enum&quot;, &quot;name&quot;:&quot;enum1_values&quot;, &quot;symbols&quot;:[&quot;BLUE&quot;,&quot;RED&quot;, &quot;GREEN&quot;]} },
+ { &quot;name&quot;:&quot;nullableint&quot;, &quot;type&quot;:[&quot;int&quot;, &quot;null&quot;] },
+ { &quot;name&quot;:&quot;bytes1&quot;, &quot;type&quot;:&quot;bytes&quot; },
+ { &quot;name&quot;:&quot;fixed1&quot;, &quot;type&quot;:{&quot;type&quot;:&quot;fixed&quot;, &quot;name&quot;:&quot;threebytes&quot;, &quot;size&quot;:3} }
+ ] }</pre>
+If the table were backed by a csv such as:
+<table>
+<tr><td>why hello there </td><td>42 </td><td>3 </td><td>100 </td><td>1412341 </td><td>true </td><td>42.43 </td><td>85.23423424 </td><td>alpha:beta:gamma </td><td>Earth#42:Control#86:Bob#31 </td><td>17:true:Abe Linkedin </td><td>0:3.141459 </td><td>BLUE </td><td>72 </td><td>0:1:2:3:4:5 </td><td>50:51:53</td></tr>
+<tr><td>another record </td><td>98 </td><td>4 </td><td>101 </td><td>9999999 </td><td>false </td><td>99.89 </td><td>0.00000009 </td><td>beta </td><td>Earth#101 </td><td>1134:false:wazzup </td><td>1:true </td><td>RED </td><td>NULL </td><td>6:7:8:9:10 </td><td>54:55:56</td></tr>
+<tr><td>third record </td><td>45 </td><td>5 </td><td>102 </td><td>999999999 </td><td>true </td><td>89.99 </td><td>0.00000000000009 </td><td>alpha:gamma </td><td>Earth#237:Bob#723 </td><td>102:false:BNL </td><td>2:Time to go home </td><td>GREEN </td><td>NULL </td><td>11:12:13 </td><td>57:58:59</td></tr>
+</table>
+one can write it out to Avro with:
+<pre>CREATE TABLE as_avro
+ ROW FORMAT SERDE
+ 'com.linkedin.haivvreo.AvroSerDe'
+ WITH SERDEPROPERTIES (
+ 'schema.url'='file:///path/to/the/schema/test_serializer.avsc')
+ STORED as INPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerInputFormat'
+ OUTPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerOutputFormat';
+insert overwrite table as_avro select * from test_serializer;</pre>
+The files that are written by the Hive job are valid Avro files, however, MapReduce doesn't add the standard .avro extension. If you copy these files out, you'll likely want to rename them with .avro.
+
+Hive is very forgiving about types: it will attempt to store whatever value matches the provided column in the equivalent column position in the new table. No matching is done on column names, for instance. Therefore, it is incumbent on the query writer to make sure the the target column types are correct. If they are not, Avro may accept the type or it may throw an exception, this is dependent on the particular combination of types.
+
+Specifying the Avro schema for a table
+--------------------------------------
+There are three ways to provide the reader schema for an Avro table, all of which involve parameters to the serde. As the schema involves, one can update these values by updating the parameters in the table.
+
+**Use schema.url**
+
+Specifies a url to access the schema from. For http schemas, this works for testing and small-scale clusters, but as the schema will be accessed at least once from each task in the job, this can quickly turn the job into a DDOS attack against the URL provider (a web server, for instance). Use caution when using this parameter for anything other than testing.
+
+The schema can also point to a location on HDFS, for instance: hdfs://your-nn:9000/path/to/avsc/file. Haivvreo will then read the file from HDFS, which should provide resiliency against many reads at once. Note that the serde will read this file from every mapper, so it's a good idea to turn the replication of the schema file to a high value to provide good locality for the readers. The schema file itself should be relatively small, so this does not add a significant amount of overhead to the process.
+
+**Use schema.literal and embed the schema in the create statement**
+
+One can embed the schema directly into the create statement. This works if the schema doesn't have any single quotes (or they are appropriately escaped), as Hive uses this to define the parameter value. For instance:
+<pre>CREATE TABLE embedded
+ COMMENT "just drop the schema right into the HQL"
+ ROW FORMAT SERDE
+ 'com.linkedin.haivvreo.AvroSerDe'
+ WITH SERDEPROPERTIES (
+ 'schema.literal'='{
+ "namespace": "com.linkedin.haivvreo",
+ "name": "some_schema",
+ "type": "record",
+ "fields": [ { "name":"string1","type":"string"}]
+ }')
+ STORED AS INPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerInputFormat'
+ OUTPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerOutputFormat';
+</pre>
+Note that the value is enclosed in single quotes and just pasted into the create statement.
+
+**Use schema.literal and pass the schema into the script**
+
+Hive can do simple variable substitution and one can pass the schema embedded in a variable to the script. Note that to do this, the schema must be completely escaped (carriage returns converted to \n, tabs to \t, quotes escaped, etc). An example:
+<pre>set hiveconf:schema;
+DROP TABLE example;
+CREATE TABLE example
+ ROW FORMAT SERDE
+ 'com.linkedin.haivvreo.AvroSerDe'
+ WITH SERDEPROPERTIES (
+ 'schema.literal'='${hiveconf:schema}')
+
+ STORED AS INPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerInputFormat'
+ OUTPUTFORMAT
+ 'com.linkedin.haivvreo.AvroContainerOutputFormat';
+</pre>
+To execute this script file, assuming $SCHEMA has been defined to be the escaped schema value:
+<pre>hive -hiveconf schema="${SCHEMA}" -f your_script_file.sql</pre>
+Note that $SCHEMA is interpolated into the quotes to correctly handle spaces within the schema.
+
+If something goes wrong
+-----------------------
+Hive tends to swallow exceptions from Haivvreo that occur before job submission. To force Hive to be more verbose, it can be started with **hive -hiveconf hive.root.logger=INFO,console**, which will spit orders of magnitude more information to the console and will likely include any information Haivvreo is trying to get you about what went wrong. If Haivvreo encounters an error during MapReduce, the stack trace will be provided in the failed task log, which can be examined from the JobTracker's web interface. Haivvreo only emits HaivvreoException; look for these. Please include these in any bug reports. The most common is expected to be exceptions while attempting to serializing an incompatible type from what Avro is expecting.
+
+What's next
+-----------
+We're currently testing Haivvreo in our production ETL process and have found it reliable and flexible. We'll be working on improving its performance (there are several opportunities for improvement both in Hive and Haivvreo itself). The next feature we'll add is schema induction when writing from Hive tables so that it is not necessary to provide an equivalent Avro schema ahead of time.
+
+Please kick the tires and file bugs.
+
+Change log
+----------
+* 1.0.2 Lazy initialization of the serializer/deserializer is a bit too lazy. Also, first open-source release.
+* 1.0.1 Add ability to specify schema via hdfs location.
+* 1.0.0 Initial release for testing.
48 src/main/java/com/linkedin/haivvreo/AvroContainerInputFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class AvroContainerInputFormat extends FileInputFormat<NullWritable, AvroGenericRecordWritable> implements JobConfigurable {
+ protected JobConf jobConf;
+
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(job))
+ // TODO: How to have output files end with .avro?
+ //if (file.getPath().getName().endsWith(".avro"))
+ result.add(file);
+ return result.toArray(new FileStatus[0]);
+ }
+
+ @Override
+ public RecordReader<NullWritable, AvroGenericRecordWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+ return new AvroGenericRecordReader(jobConf, (FileSplit) inputSplit, reporter);
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+}
58 src/main/java/com/linkedin/haivvreo/AvroContainerOutputFormat.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Write to an Avro file from a Hive process.
+ */
+public class AvroContainerOutputFormat implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+ Path path,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties properties,
+ Progressable progressable) throws IOException {
+ Schema schema;
+ try {
+ schema = HaivvreoUtils.determineSchema(properties);
+ } catch (HaivvreoException e) {
+ throw new IOException(e);
+ }
+ GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>(schema);
+ DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(gdw);
+
+ dfw.create(schema, path.getFileSystem(jobConf).create(path));
+ return new AvroGenericRecordWriter(dfw);
+ }
+
+}
225 src/main/java/com/linkedin/haivvreo/AvroDeserializer.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.*;
+import org.apache.hadoop.io.Writable;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+class AvroDeserializer {
+ /**
+ * When encountering a record with an older schema than the one we're trying
+ * to read, it is necessary to re-encode with a reader against the newer schema.
+ * Because Hive doesn't provide a way to pass extra information to the
+ * inputformat, we're unable to provide the newer schema when we have it and it
+ * would be most useful - when the inputformat is reading the file.
+ *
+ * This is a slow process, so we try to cache as many of the objects as possible.
+ */
+ static class SchemaReEncoder {
+ private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
+ private BinaryDecoder binaryDecoder = null;
+
+ public GenericRecord reencode(GenericRecord r, Schema readerSchema) throws HaivvreoException {
+ baos.reset();
+
+ BinaryEncoder be = new BinaryEncoder(baos);
+ gdw.setSchema(r.getSchema());
+ try {
+ gdw.write(r, be);
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+
+ binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder);
+ GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>(r.getSchema(), readerSchema);
+ return gdr.read(r, binaryDecoder);
+
+ } catch (IOException e) {
+ throw new HaivvreoException("Exception trying to re-encode record to new schema", e);
+ }
+ }
+ }
+
+ private List<Object> row;
+ private SchemaReEncoder reEncoder;
+
+ /**
+ * Deserialize an Avro record, recursing into its component fields and
+ * deserializing them as well. Fields of the record are matched by name
+ * against fields in the Hive row.
+ *
+ * Because Avro has some data types that Hive does not, these are converted
+ * during deserialization to types Hive will work with.
+ *
+ * @param columnNames List of columns Hive is expecting from record.
+ * @param columnTypes List of column types matched by index to names
+ * @param writable Instance of GenericAvroWritable to deserialize
+ * @param readerSchema Schema of the writable to deserialize
+ * @return A list of objects suitable for Hive to work with further
+ * @throws HaivvreoException For any exception during deseriliazation
+ */
+ public Object deserialize(List<String> columnNames, List<TypeInfo> columnTypes, Writable writable, Schema readerSchema) throws HaivvreoException {
+ if(!(writable instanceof AvroGenericRecordWritable))
+ throw new HaivvreoException("Expecting a AvroGenericRecordWritable");
+
+ if(row == null || row.size() != columnNames.size())
+ row = new ArrayList<Object>(columnNames.size());
+ else
+ row.clear();
+
+ AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable;
+ GenericRecord r = recordWritable.getRecord();
+
+ // Check if we're working with an evolved schema
+ if(!r.getSchema().equals(readerSchema)) {
+ if(reEncoder == null) reEncoder = new SchemaReEncoder();
+ r = reEncoder.reencode(r, readerSchema);
+ }
+
+ workerBase(row, columnNames, columnTypes, r);
+ return row;
+ }
+
+ // The actual deserialization may involve nested records, which require recursion.
+ private List<Object> workerBase(List<Object> objectRow, List<String> columnNames, List<TypeInfo> columnTypes, GenericRecord record) throws HaivvreoException {
+ for(int i = 0; i < columnNames.size(); i++) {
+ TypeInfo columnType = columnTypes.get(i);
+ String columnName = columnNames.get(i);
+ Object datum = record.get(columnName);
+ Schema datumSchema = record.getSchema().getField(columnName).schema();
+
+ objectRow.add(worker(datum, datumSchema, columnType));
+ }
+
+ return objectRow;
+ }
+
+ private Object worker(Object datum, Schema recordSchema, TypeInfo columnType) throws HaivvreoException {
+ // Klaxon! Klaxon! Klaxon!
+ // Avro requires NULLable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+ if(HaivvreoUtils.isNullableType(recordSchema))
+ return deserializeNullableUnion(datum, recordSchema, columnType);
+
+ if(columnType == TypeInfoFactory.stringTypeInfo)
+ return datum.toString(); // To workaround AvroUTF8
+ // This also gets us around the Enum issue since we just take the value and convert it to a string. Yay!
+
+ switch(columnType.getCategory()) {
+ case STRUCT:
+ return deserializeStruct((GenericData.Record) datum, (StructTypeInfo) columnType);
+ case UNION:
+ return deserializeUnion(datum, recordSchema, (UnionTypeInfo) columnType);
+ case LIST:
+ return deserializeList(datum, recordSchema, (ListTypeInfo) columnType);
+ case MAP:
+ return deserializeMap(datum, recordSchema, (MapTypeInfo) columnType);
+ default:
+ return datum; // Simple type.
+ }
+ }
+
+ /**
+ * Extract either a null or the correct type from a Nullable type. This is
+ * horrible in that we rebuild the TypeInfo every time.
+ * FIXME: Something better than building the damn TypeInfo each time.
+ */
+ private Object deserializeNullableUnion(Object datum, Schema recordSchema, TypeInfo columnType) throws HaivvreoException {
+ int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
+ Schema schema = recordSchema.getTypes().get(tag);
+ if(schema.getType().equals(Schema.Type.NULL))
+ return null;
+ return worker(datum, schema, SchemaToTypeInfo.generateTypeInfo(schema));
+
+ }
+
+ private Object deserializeStruct(GenericData.Record datum, StructTypeInfo columnType) throws HaivvreoException {
+ // No equivalent Java type for the backing structure, need to recurse and build a list
+ ArrayList<TypeInfo> innerFieldTypes = columnType.getAllStructFieldTypeInfos();
+ ArrayList<String> innerFieldNames = columnType.getAllStructFieldNames();
+ List<Object> innerObjectRow = new ArrayList<Object>(innerFieldTypes.size());
+
+ return workerBase(innerObjectRow, innerFieldNames, innerFieldTypes, datum);
+ }
+
+ private Object deserializeUnion(Object datum, Schema recordSchema, UnionTypeInfo columnType) throws HaivvreoException {
+ int tag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value
+ Object desered = worker(datum, recordSchema.getTypes().get(tag), columnType.getAllUnionObjectTypeInfos().get(tag));
+ return new StandardUnionObjectInspector.StandardUnion((byte)tag, desered);
+ }
+
+ private Object deserializeList(Object datum, Schema recordSchema, ListTypeInfo columnType) throws HaivvreoException {
+ // Need to check the original schema to see if this is actually a Fixed.
+ if(recordSchema.getType().equals(Schema.Type.FIXED)) {
+ // We're faking out Hive to work through a type system impedence mismatch. Pull out the backing array and convert to a list.
+ GenericData.Fixed fixed = (GenericData.Fixed) datum;
+ List<Byte> asList = new ArrayList<Byte>(fixed.bytes().length);
+ for(int j = 0; j < fixed.bytes().length; j++) {
+ asList.add(fixed.bytes()[j]);
+ }
+ return asList;
+ } else if(recordSchema.getType().equals(Schema.Type.BYTES)) {
+ // This is going to be slow... hold on.
+ ByteBuffer bb = (ByteBuffer)datum;
+ List<Byte> asList = new ArrayList<Byte>(bb.capacity());
+ byte[] array = bb.array();
+ for(int j = 0; j < array.length; j++) {
+ asList.add(array[j]);
+ }
+ return asList;
+ } else { // An actual list, deser its values
+ List listData = (List) datum;
+ Schema listSchema = recordSchema.getElementType();
+ List<Object> listContents = new ArrayList<Object>(listData.size());
+ for(Object obj : listData) {
+ listContents.add(worker(obj, listSchema, columnType.getListElementTypeInfo()));
+ }
+ return listContents;
+ }
+ }
+
+ private Object deserializeMap(Object datum, Schema mapSchema, MapTypeInfo columnType) throws HaivvreoException {
+ // Avro only allows maps with Strings for keys, so we only have to worry
+ // about deserializing the values
+ Map<String, Object> map = new Hashtable<String, Object>();
+ Map<Utf8, Object> mapDatum = (Map)datum;
+ Schema valueSchema = mapSchema.getValueType();
+ TypeInfo valueTypeInfo = columnType.getMapValueTypeInfo();
+ for (Utf8 key : mapDatum.keySet()) {
+ Object value = mapDatum.get(key);
+ map.put(key.toString(), worker(value, valueSchema, valueTypeInfo));
+ }
+
+ return map;
+ }
+}
88 src/main/java/com/linkedin/haivvreo/AvroGenericRecordReader.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.*;
+
+
+import java.io.IOException;
+
+/**
+ * RecordReader optimized against Avro GenericRecords that returns to record
+ * as the value of the k-v pair, as Hive requires.
+ */
+public class AvroGenericRecordReader implements RecordReader<NullWritable, AvroGenericRecordWritable>, JobConfigurable {
+ final private org.apache.avro.file.FileReader<GenericRecord> reader;
+ final private long start;
+ final private long stop;
+ protected JobConf jobConf;
+
+ public AvroGenericRecordReader(JobConf job, FileSplit split, Reporter reporter) throws IOException {
+ this.jobConf = job;
+ GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>();
+ this.reader = new DataFileReader<GenericRecord>(new FsInput(split.getPath(), job), gdr);
+ this.reader.sync(split.getStart());
+ this.start = reader.tell();
+ this.stop = split.getStart() + split.getLength();
+ }
+
+ @Override
+ public boolean next(NullWritable nullWritable, AvroGenericRecordWritable record) throws IOException {
+ if(!reader.hasNext() || reader.pastSync(stop)) return false;
+
+ GenericData.Record r = (GenericData.Record)reader.next();
+ record.setRecord(r);
+
+ return true;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public AvroGenericRecordWritable createValue() {
+ return new AvroGenericRecordWritable();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return reader.tell();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return stop == start ? 0.0f
+ : Math.min(1.0f, (getPos() - start) / (float)(stop - start));
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ this.jobConf= jobConf;
+ }
+}
82 src/main/java/com/linkedin/haivvreo/AvroGenericRecordWritable.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hadoop.io.Writable;
+
+import java.io.*;
+
+/**
+ * Wrapper around an Avro GenericRecord. Necessary because Hive's deserializer
+ * will happily deserialize any object - as long as it's a writable.
+ */
+public class AvroGenericRecordWritable implements Writable{
+ GenericRecord record;
+ private BinaryDecoder binaryDecoder;
+
+ // There are two areas of exploration for optimization here.
+ // 1. We're serializing the schema with every object. If we assume the schema
+ // provided by the table is always correct, we don't need to do this and
+ // and can just send the serialized bytes.
+ // 2. We serialize/deserialize to/from bytes immediately. We may save some
+ // time but doing this lazily, but until there's evidence this is useful,
+ // it's not worth adding the extra state.
+ public GenericRecord getRecord() {
+ return record;
+ }
+
+ public void setRecord(GenericRecord record) {
+ this.record = record;
+ }
+
+ public AvroGenericRecordWritable() {}
+
+ public AvroGenericRecordWritable(GenericRecord record) {
+ this.record = record;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // Write schema since we need it to pull the data out. (see point #1 above)
+ String schemaString = record.getSchema().toString(false);
+ out.writeUTF(schemaString);
+
+ // Write record to byte buffer
+ GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>();
+ BinaryEncoder be = new BinaryEncoder((DataOutputStream)out);
+
+ gdw.setSchema(record.getSchema());
+ gdw.write(record, be);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ Schema schema = Schema.parse(in.readUTF());
+ record = new GenericData.Record(schema);
+ binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder((InputStream) in, binaryDecoder);
+ GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>(schema);
+ record = gdr.read(record, binaryDecoder);
+ }
+}
48 src/main/java/com/linkedin/haivvreo/AvroGenericRecordWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+/**
+ * Write an Avro GenericRecord to an Avro data file.
+ */
+public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+ final private DataFileWriter<GenericRecord> dfw;
+
+ public AvroGenericRecordWriter(DataFileWriter<GenericRecord> dfw) {
+ this.dfw = dfw;
+ }
+
+ @Override
+ public void write(Writable writable) throws IOException {
+ if(!(writable instanceof AvroGenericRecordWritable))
+ throw new IOException("Expecting instance of AvroGenericRecordWritable, but received" + writable.getClass().getCanonicalName());
+ AvroGenericRecordWritable r = (AvroGenericRecordWritable)writable;
+ dfw.append(r.getRecord());
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ dfw.close();
+ }
+
+}
147 src/main/java/com/linkedin/haivvreo/AvroObjectInspectorGenerator.java
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An AvroObjectInspectorGenerator takes an Avro schema and creates the three
+ * data structures Hive needs to work with Avro-encoded data:
+ * * A list of the schema field names
+ * * A list of those fields equivalent types in Hive
+ * * An ObjectInspector capable of working with an instance of that datum.
+ */
+class AvroObjectInspectorGenerator {
+ final private List<String> columnNames;
+ final private List<TypeInfo> columnTypes;
+ final private ObjectInspector oi;
+
+ public AvroObjectInspectorGenerator(Schema schema) throws SerDeException {
+ verifySchemaIsARecord(schema);
+
+ this.columnNames = generateColumnNames(schema);
+ this.columnTypes = SchemaToTypeInfo.generateColumnTypes(schema);
+ assert columnNames.size() == columnTypes.size();
+ this.oi = createObjectInspector();
+ }
+
+ private void verifySchemaIsARecord(Schema schema) throws SerDeException {
+ if(!schema.getType().equals(Schema.Type.RECORD))
+ throw new HaivvreoException("Schema for table must be of type RECORD. " +
+ "Received type: " + schema.getType());
+ }
+
+ public List<String> getColumnNames() {
+ return columnNames;
+ }
+
+ public List<TypeInfo> getColumnTypes() {
+ return columnTypes;
+ }
+
+ public ObjectInspector getObjectInspector() {
+ return oi;
+ }
+
+ private ObjectInspector createObjectInspector() throws SerDeException {
+ List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(columnNames.size());
+
+ // At this point we've verified the types are correct.
+ for(int i = 0; i < columnNames.size(); i++) {
+ columnOIs.add(i, createObjectInspectorWorker(columnTypes.get(i)));
+ }
+ return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);
+ }
+
+ private ObjectInspector createObjectInspectorWorker(TypeInfo ti) throws SerDeException {
+ // We don't need to do the check for U[T,Null] here because we'll give the real type
+ // at deserialization and the object inspector will never see the actual union.
+ if(!supportedCategories(ti))
+ throw new HaivvreoException("Don't yet support this type: " + ti);
+ ObjectInspector result;
+ switch(ti.getCategory()) {
+ case PRIMITIVE:
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo)ti;
+ result = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(pti.getPrimitiveCategory());
+ break;
+ case STRUCT:
+ StructTypeInfo sti = (StructTypeInfo)ti;
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(sti.getAllStructFieldTypeInfos().size());
+ for(TypeInfo typeInfo : sti.getAllStructFieldTypeInfos()) {
+ ois.add(createObjectInspectorWorker(typeInfo));
+ }
+
+ result = ObjectInspectorFactory.getStandardStructObjectInspector(sti.getAllStructFieldNames(), ois);
+
+ break;
+ case MAP:
+ MapTypeInfo mti = (MapTypeInfo)ti;
+ result = ObjectInspectorFactory.getStandardMapObjectInspector(
+ PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.STRING),
+ createObjectInspectorWorker(mti.getMapValueTypeInfo()));
+ break;
+ case LIST:
+ ListTypeInfo ati = (ListTypeInfo)ti;
+ result = ObjectInspectorFactory.getStandardListObjectInspector(createObjectInspectorWorker(ati.getListElementTypeInfo()));
+ break;
+ case UNION:
+ UnionTypeInfo uti = (UnionTypeInfo)ti;
+ List<TypeInfo> allUnionObjectTypeInfos = uti.getAllUnionObjectTypeInfos();
+ List<ObjectInspector> unionObjectInspectors = new ArrayList<ObjectInspector>(allUnionObjectTypeInfos.size());
+
+ for (TypeInfo typeInfo : allUnionObjectTypeInfos) {
+ unionObjectInspectors.add(createObjectInspectorWorker(typeInfo));
+ }
+
+ result = ObjectInspectorFactory.getStandardUnionObjectInspector(unionObjectInspectors);
+ break;
+ default:
+ throw new HaivvreoException("No Hive categories matched: " + ti);
+ }
+
+ return result;
+ }
+
+ private boolean supportedCategories(TypeInfo ti) {
+ final ObjectInspector.Category c = ti.getCategory();
+ return c.equals(ObjectInspector.Category.PRIMITIVE) ||
+ c.equals(ObjectInspector.Category.MAP) ||
+ c.equals(ObjectInspector.Category.LIST) ||
+ c.equals(ObjectInspector.Category.STRUCT) ||
+ c.equals(ObjectInspector.Category.UNION);
+ }
+
+ private List<String> generateColumnNames(Schema schema) {
+ List<Schema.Field> fields = schema.getFields();
+ List<String> fieldsList = new ArrayList<String>(fields.size());
+
+ for (Schema.Field field : fields) {
+ fieldsList.add(field.name());
+ }
+
+ return fieldsList;
+ }
+
+}
86 src/main/java/com/linkedin/haivvreo/AvroSerDe.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Read or write Avro data from Hive.
+ */
+public class AvroSerDe implements SerDe {
+
+ private ObjectInspector oi;
+ private List<String> columnNames;
+ private List<TypeInfo> columnTypes;
+ private Schema schema;
+ private AvroDeserializer avroDeserializer = null;
+ private AvroSerializer avroSerializer = null;
+
+ @Override
+ public void initialize(Configuration configuration, Properties properties) throws SerDeException {
+ try {
+ schema = HaivvreoUtils.determineSchema(properties);
+ } catch (IOException e) {
+ throw new HaivvreoException(e);
+ }
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(schema);
+ this.columnNames = aoig.getColumnNames();
+ this.columnTypes = aoig.getColumnTypes();
+ this.oi = aoig.getObjectInspector();
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return AvroGenericRecordWritable.class;
+ }
+
+ @Override
+ public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema);
+ }
+
+ @Override
+ public Object deserialize(Writable writable) throws SerDeException {
+ return getDeserializer().deserialize(columnNames, columnTypes, writable, schema);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return oi;
+ }
+
+ private AvroDeserializer getDeserializer() {
+ if(avroDeserializer == null) avroDeserializer = new AvroDeserializer();
+
+ return avroDeserializer;
+ }
+
+ private AvroSerializer getSerializer() {
+ if(avroSerializer == null) avroSerializer = new AvroSerializer();
+
+ return avroSerializer;
+ }
+}
253 src/main/java/com/linkedin/haivvreo/AvroSerializer.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.typeinfo.*;
+import org.apache.hadoop.io.Writable;
+import static org.apache.avro.Schema.Type.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+public class AvroSerializer {
+ private static final Log LOG = LogFactory.getLog(AvroSerializer.class);
+
+ AvroGenericRecordWritable cache = new AvroGenericRecordWritable();
+
+ // Hive is pretty simple (read: stupid) in writing out values via the serializer.
+ // We're just going to go through, matching indices. Hive formats normally
+ // handle mismatches with null. We don't have that option, so instead we'll
+ // end up throwing an exception for invalid records.
+ public Writable serialize(Object o, ObjectInspector objectInspector, List<String> columnNames, List<TypeInfo> columnTypes, Schema schema) throws HaivvreoException {
+ StandardStructObjectInspector ssoi = (StandardStructObjectInspector) objectInspector;
+ GenericData.Record record = new GenericData.Record(schema);
+
+ List<? extends StructField> outputFieldRefs = ssoi.getAllStructFieldRefs();
+ if(outputFieldRefs.size() != columnNames.size())
+ throw new HaivvreoException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size());
+
+ int size = schema.getFields().size();
+ if(outputFieldRefs.size() != size) // Hive does this check for us, so we should be ok.
+ throw new HaivvreoException("Hive passed in a different number of fields than the schema expected: (Hive wanted " + outputFieldRefs.size() +", Avro expected " + schema.getFields().size());
+
+ List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs();
+ List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+
+ for(int i = 0; i < size; i++) {
+ Field field = schema.getFields().get(i);
+ TypeInfo typeInfo = columnTypes.get(i);
+ StructField structFieldRef = allStructFieldRefs.get(i);
+ Object structFieldData = structFieldsDataAsList.get(i);
+ ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+ Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
+ record.put(field.name(), val);
+ }
+
+ if(!GenericData.get().validate(schema, record))
+ throw new SerializeToAvroException(schema, record);
+
+ cache.setRecord(record);
+
+ return cache;
+ }
+
+ private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ switch(typeInfo.getCategory()) {
+ case PRIMITIVE:
+ assert fieldOI instanceof PrimitiveObjectInspector;
+ return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData);
+ case MAP:
+ assert fieldOI instanceof MapObjectInspector;
+ assert typeInfo instanceof MapTypeInfo;
+ return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) fieldOI, structFieldData, schema);
+ case LIST:
+ assert fieldOI instanceof ListObjectInspector;
+ assert typeInfo instanceof ListTypeInfo;
+ return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) fieldOI, structFieldData, schema);
+ case UNION:
+ assert fieldOI instanceof UnionObjectInspector;
+ assert typeInfo instanceof UnionTypeInfo;
+ return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) fieldOI, structFieldData, schema);
+ case STRUCT:
+ assert fieldOI instanceof StructObjectInspector;
+ assert typeInfo instanceof StructTypeInfo;
+ return serializeStruct((StructTypeInfo) typeInfo, (StructObjectInspector) fieldOI, structFieldData, schema);
+ default:
+ throw new HaivvreoException("Ran out of TypeInfo Categories: " + typeInfo.getCategory());
+ }
+ }
+
+ private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) throws HaivvreoException {
+ int size = schema.getFields().size();
+ List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs();
+ List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+ GenericData.Record record = new GenericData.Record(schema);
+ ArrayList<TypeInfo> allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+
+ for(int i = 0; i < size; i++) {
+ Field field = schema.getFields().get(i);
+ TypeInfo colTypeInfo = allStructFieldTypeInfos.get(i);
+ StructField structFieldRef = allStructFieldRefs.get(i);
+ Object structFieldData = structFieldsDataAsList.get(i);
+ ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+ Object val = serialize(colTypeInfo, fieldOI, structFieldData, field.schema());
+ record.put(field.name(), val);
+ }
+ return record;
+ }
+
+ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData) throws HaivvreoException {
+ switch(fieldOI.getPrimitiveCategory()) {
+ case UNKNOWN:
+ throw new HaivvreoException("Received UNKNOWN primitive category.");
+ case VOID:
+ return null;
+ default: // All other primitive types are simple
+ return fieldOI.getPrimitiveJavaObject(structFieldData);
+ }
+ }
+
+ private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ byte tag = fieldOI.getTag(structFieldData);
+
+ // Invariant that Avro's tag ordering must match Hive's.
+ return serialize(typeInfo.getAllUnionObjectTypeInfos().get(tag),
+ fieldOI.getObjectInspectors().get(tag),
+ fieldOI.getField(structFieldData),
+ schema.getTypes().get(tag));
+ }
+
+ // Haivvreo treats FIXED and BYTES as arrays of tinyints within Hive. Check
+ // if we're dealing with either of these types and thus need to serialize
+ // them as their Avro types.
+ private boolean isTransformedType(Schema schema) {
+ return schema.getType().equals(FIXED) || schema.getType().equals(BYTES);
+ }
+
+ private Object serializeTransformedType(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Beginning to transform " + typeInfo + " with Avro schema " + schema.toString(false));
+ }
+ if(schema.getType().equals(FIXED)) return serializedAvroFixed(typeInfo, fieldOI, structFieldData, schema);
+ else return serializeAvroBytes(typeInfo, fieldOI, structFieldData, schema);
+
+ }
+
+ private Object serializeAvroBytes(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ ByteBuffer bb = ByteBuffer.wrap(extraByteArray(fieldOI, structFieldData));
+ return bb.rewind();
+ }
+
+ private Object serializedAvroFixed(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ return new GenericData.Fixed(extraByteArray(fieldOI, structFieldData));
+ }
+
+ // For transforming to BYTES and FIXED, pull out the byte array Avro will want
+ private byte[] extraByteArray(ListObjectInspector fieldOI, Object structFieldData) throws HaivvreoException {
+ // Grab a book. This is going to be slow.
+ int listLength = fieldOI.getListLength(structFieldData);
+ byte[] bytes = new byte[listLength];
+ assert fieldOI.getListElementObjectInspector() instanceof PrimitiveObjectInspector;
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector)fieldOI.getListElementObjectInspector();
+ List<?> list = fieldOI.getList(structFieldData);
+
+ for(int i = 0; i < listLength; i++) {
+ Object b = poi.getPrimitiveJavaObject(list.get(i));
+ if(!(b instanceof Byte))
+ throw new HaivvreoException("Attempting to transform to bytes, element was not byte but " + b.getClass().getCanonicalName());
+ bytes[i] = (Byte)b;
+ }
+ return bytes;
+ }
+
+ private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ if(isTransformedType(schema))
+ return serializeTransformedType(typeInfo, fieldOI, structFieldData, schema);
+
+ List<?> list = fieldOI.getList(structFieldData);
+ List<Object> deserialized = new ArrayList<Object>(list.size());
+
+ TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
+ ObjectInspector listElementObjectInspector = fieldOI.getListElementObjectInspector();
+ Schema elementType = schema.getElementType();
+
+ for(int i = 0; i < list.size(); i++) {
+ deserialized.add(i, serialize(listElementTypeInfo, listElementObjectInspector, list.get(i), elementType));
+ }
+
+ return deserialized;
+ }
+
+ private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, Schema schema) throws HaivvreoException {
+ // Avro only allows maps with string keys
+ if(!mapHasStringKey(fieldOI.getMapKeyObjectInspector()))
+ throw new HaivvreoException("Avro only supports maps with keys as Strings. Current Map is: " + typeInfo.toString());
+
+ ObjectInspector mapKeyObjectInspector = fieldOI.getMapKeyObjectInspector();
+ ObjectInspector mapValueObjectInspector = fieldOI.getMapValueObjectInspector();
+ TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
+ TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
+ Map<?,?> map = fieldOI.getMap(structFieldData);
+ Schema valueType = schema.getValueType();
+
+ Map<Object, Object> deserialized = new Hashtable<Object, Object>(fieldOI.getMapSize(structFieldData));
+
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ deserialized.put(serialize(mapKeyTypeInfo, mapKeyObjectInspector, entry.getKey(), null), // This works, but is a bit fragile. Construct a single String schema?
+ serialize(mapValueTypeInfo, mapValueObjectInspector, entry.getValue(), valueType));
+ }
+
+ return deserialized;
+ }
+
+ private boolean mapHasStringKey(ObjectInspector mapKeyObjectInspector) {
+ return mapKeyObjectInspector instanceof PrimitiveObjectInspector &&
+ ((PrimitiveObjectInspector) mapKeyObjectInspector)
+ .getPrimitiveCategory()
+ .equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
+ }
+
+ /**
+ * Thrown when, during serialization of a Hive row to an Avro record, Avro
+ * cannot verify the converted row to the record's schema.
+ */
+ public static class SerializeToAvroException extends HaivvreoException {
+ final private Schema schema;
+ final private GenericData.Record record;
+
+ public SerializeToAvroException(Schema schema, GenericData.Record record) {
+ this.schema = schema;
+ this.record = record;
+ }
+
+ @Override
+ public String toString() {
+ return "Avro could not validate record against schema (record = " + record
+ + ") (schema = "+schema.toString(false) + ")";
+ }
+ }
+}
36 src/main/java/com/linkedin/haivvreo/HaivvreoException.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+
+public class HaivvreoException extends SerDeException {
+ public HaivvreoException() {
+ super();
+ }
+
+ public HaivvreoException(String message) {
+ super(message);
+ }
+
+ public HaivvreoException(Throwable cause) {
+ super(cause);
+ }
+
+ public HaivvreoException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
98 src/main/java/com/linkedin/haivvreo/HaivvreoUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+
+class HaivvreoUtils {
+ public static final String SCHEMA_LITERAL = "schema.literal";
+ public static final String SCHEMA_URL = "schema.url";
+
+ /**
+ * Determine the schema to that's been provided for Avro serde work.
+ * @param properties containing a key pointing to the schema, one way or another
+ * @return schema to use while serdeing the avro file
+ * @throws IOException if error while trying to read the schema from another location
+ * @throws HaivvreoException if unable to find a schema or pointer to it in the properties
+ */
+ public static Schema determineSchema(Properties properties) throws IOException, HaivvreoException {
+ String schemaString = properties.getProperty(SCHEMA_LITERAL);
+ if(schemaString != null)
+ return Schema.parse(schemaString);
+
+ // Try pulling directly from URL
+ schemaString = properties.getProperty(SCHEMA_URL);
+ if(schemaString == null)
+ throw new HaivvreoException("Neither " + SCHEMA_LITERAL + " nor "
+ + SCHEMA_URL + " specified, can't determine table schema");
+
+ try {
+ if(schemaString.startsWith("hdfs://"))
+ return getSchemaFromHDFS(schemaString, new Configuration());
+ } catch(IOException ioe) {
+ throw new HaivvreoException("Unable to read schema from HDFS: " + schemaString, ioe);
+ }
+
+ return Schema.parse(new URL(schemaString).openStream());
+ }
+
+ // Protected for testing and so we can pass in a conf for testing.
+ protected static Schema getSchemaFromHDFS(String schemaHDFSUrl, Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FSDataInputStream in = null;
+
+ try {
+ in = fs.open(new Path(schemaHDFSUrl));
+ Schema s = Schema.parse(in);
+ return s;
+ } finally {
+ if(in != null) in.close();
+ }
+ }
+
+ /**
+ * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable
+ * types via a union of type T and null. This is a very common use case.
+ * As such, we want to silently convert it to just T and allow the value to be null.
+ *
+ * @return true if type represents Union[T, Null], false otherwise
+ */
+ public static boolean isNullableType(Schema schema) {
+ return schema.getType().equals(Schema.Type.UNION) &&
+ schema.getTypes().size() == 2 &&
+ (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) || // [null, null] not allowed, so this check is ok.
+ schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
+ }
+
+ /**
+ * In a nullable type, get the schema for the non-nullable type. This method
+ * does no checking that the provides Schema is nullable.
+ */
+ public static Schema getOtherTypeFromNullableType(Schema schema) {
+ List<Schema> types = schema.getTypes();
+
+ return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
+ }
+}
181 src/main/java/com/linkedin/haivvreo/SchemaToTypeInfo.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.*;
+
+import static org.apache.avro.Schema.Type.*;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ */
+class SchemaToTypeInfo {
+ // Conversion of Avro primitive types to Hive primitive types
+ // Avro Hive
+ // Null
+ // boolean boolean check
+ // int int check
+ // long bigint check
+ // float double check
+ // double double check
+ // bytes
+ // string string check
+ // tinyint
+ // smallint
+
+ // Map of Avro's primitive types to Hives (for those that are supported by both)
+ private static final Map<Schema.Type, TypeInfo> primitiveTypeToTypeInfo = initTypeMap();
+ private static Map<Schema.Type, TypeInfo> initTypeMap() {
+ Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+ theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
+ theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
+ theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
+ theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
+ theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float"));
+ theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double"));
+ theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
+ return Collections.unmodifiableMap(theMap);
+ }
+
+ /**
+ * Generate a list of of TypeInfos from an Avro schema. This method is
+ * currently public due to some weirdness in deserializing unions, but
+ * will be made private once that is resolved.
+ * @param schema Schema to generate field types for
+ * @return List of TypeInfos, each element of which is a TypeInfo derived
+ * from the schema.
+ * @throws HaivvreoException for problems during conversion.
+ */
+ public static List<TypeInfo> generateColumnTypes(Schema schema) throws HaivvreoException {
+ List<Schema.Field> fields = schema.getFields();
+
+ List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
+
+ for (Schema.Field field : fields) {
+ types.add(generateTypeInfo(field.schema()));
+ }
+
+ return types;
+ }
+
+ /**
+ * Convert an Avro Schema into an equivalent Hive TypeInfo.
+ * @param schema to record. Must be of record type.
+ * @return TypeInfo matching the Avro schema
+ * @throws HaivvreoException for any problems during conversion.
+ */
+ public static TypeInfo generateTypeInfo(Schema schema) throws HaivvreoException {
+ // Avro requires NULLable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+ if(HaivvreoUtils.isNullableType(schema))
+ return generateTypeInfo(HaivvreoUtils.getOtherTypeFromNullableType(schema));
+
+ Schema.Type type = schema.getType();
+
+ if(primitiveTypeToTypeInfo.containsKey(type))
+ return primitiveTypeToTypeInfo.get(type);
+
+ switch(type) {
+ case BYTES: return generateBytesTypeInfo(schema);
+ case RECORD: return generateRecordTypeInfo(schema);
+ case MAP: return generateMapTypeInfo(schema);
+ case ARRAY: return generateArrayTypeInfo(schema);
+ case UNION: return generateUnionTypeInfo(schema);
+ case ENUM: return generateEnumTypeInfo(schema);
+ case FIXED: return generateFixedTypeInfo(schema);
+ default: throw new HaivvreoException("Do not yet support: " + schema);
+ }
+ }
+
+ private static TypeInfo generateRecordTypeInfo(Schema schema) throws HaivvreoException {
+ assert schema.getType().equals(Schema.Type.RECORD);
+
+ List<Schema.Field> fields = schema.getFields();
+ List<String> fieldNames = new ArrayList<String>(fields.size());
+ List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+
+ for(int i = 0; i < fields.size(); i++) {
+ fieldNames.add(i, fields.get(i).name());
+ typeInfos.add(i, generateTypeInfo(fields.get(i).schema()));
+ }
+
+ return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos);
+ }
+
+ /**
+ * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that
+ * Avro only allows maps with strings for keys.
+ */
+ private static TypeInfo generateMapTypeInfo(Schema schema) throws HaivvreoException {
+ assert schema.getType().equals(Schema.Type.MAP);
+ Schema valueType = schema.getValueType();
+ TypeInfo ti = generateTypeInfo(valueType);
+
+ return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti);
+ }
+
+ private static TypeInfo generateArrayTypeInfo(Schema schema) throws HaivvreoException {
+ assert schema.getType().equals(Schema.Type.ARRAY);
+ Schema itemsType = schema.getElementType();
+ TypeInfo itemsTypeInfo = generateTypeInfo(itemsType);
+
+ return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
+ }
+
+ private static TypeInfo generateUnionTypeInfo(Schema schema) throws HaivvreoException {
+ assert schema.getType().equals(Schema.Type.UNION);
+ List<Schema> types = schema.getTypes();
+
+
+ List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+
+ for(Schema type : types) {
+ typeInfos.add(generateTypeInfo(type));
+ }
+
+ return TypeInfoFactory.getUnionTypeInfo(typeInfos);
+ }
+
+ // Hive doesn't have an Enum type, so we're going to treat them as Strings.
+ // During the deserialize/serialize stage we'll check for enumness and
+ // convert as such.
+ private static TypeInfo generateEnumTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.ENUM);
+
+ return TypeInfoFactory.getPrimitiveTypeInfo("string");
+ }
+
+ // Hive doesn't have a Fixed type, so we're going to treat them as arrays of
+ // bytes
+ // TODO: Make note in documentation that Hive sends these out as signed bytes.
+ private static final TypeInfo FIXED_AND_BYTES_EQUIV = TypeInfoFactory.getListTypeInfo(TypeInfoFactory.byteTypeInfo);
+ private static TypeInfo generateFixedTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.FIXED);
+
+ return FIXED_AND_BYTES_EQUIV;
+ }
+
+ // Avro considers bytes to be a primitive type, but Hive doesn't. We'll
+ // convert them to a list of bytes, just like Fixed. Sigh.
+ private static TypeInfo generateBytesTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.BYTES);
+ return FIXED_AND_BYTES_EQUIV;
+ }
+}
98 src/test/avro/kitchensink.avsc
@@ -0,0 +1,98 @@
+{
+ "namespace": "com.linkedin.haivvreo",
+ "name": "kitchsink",
+ "doc": "this is the kitchensink schema with all types we support.",
+ "type": "record",
+ "fields": [
+ {
+ "name":"string1",
+ "type":"string",
+ "doc":"this field is string1"
+ },
+ {
+ "name":"string2",
+ "type":"string",
+ "doc":"this field is string2"
+ },
+ {
+ "name":"int1",
+ "type":"int",
+ "doc":"this field is int1"
+ },
+ {
+ "name":"boolean1",
+ "type":"boolean",
+ "doc":"this field is boolean1"
+ },
+ {
+ "name":"long1",
+ "type":"long",
+ "doc":"this field is long1"
+ },
+ {
+ "name":"float1",
+ "type":"float",
+ "doc":"this field is float1"
+ },
+ {
+ "name":"double1",
+ "type":"double",
+ "doc":"this field is double1"
+ },
+ {
+ "name":"inner_record1",
+ "type":{ "type":"record",
+ "name":"inner_record1_impl",
+ "fields": [
+ {"name":"int_in_inner_record1",
+ "type":"int",
+ "doc":"this field is int_in_inner_record1"},
+ {"name":"string_in_inner_record1",
+ "type":"string",
+ "doc":"this field is string_in_inner_record1"}
+ ]
+ },
+ "doc":"this field is inner_record1"
+ },
+ {
+ "name":"enum1",
+ "type":{"type":"enum", "name":"enum1_values", "symbols":["ENUM1_VALUES_VALUE1","ENUM1_VALUES_VALUE2", "ENUM1_VALUES_VALUE3"]},
+ "doc":"this field is enum1"
+ },
+ {
+ "name":"array1",
+ "type":{"type":"array", "items":"string"},
+ "doc":"this field is array1"
+ },
+ {
+ "name":"map1",
+ "type":{"type":"map", "values":"string"},
+ "doc":"this field is map1"
+ },
+ {
+ "name":"union1",
+ "type":["float", "boolean", "string"],
+ "doc":"this field is union1"
+ },
+ {
+ "name":"fixed1",
+ "type":{"type":"fixed", "name":"fourbytes", "size":4},
+ "doc":"this field is fixed1"
+ },
+ {
+ "name":"null1",
+ "type":"null",
+ "doc":"this field is null1"
+ },
+ {
+ "name":"UnionNullInt",
+ "type":["int", "null"],
+ "doc":"this field is UnionNullInt"
+ },
+ {
+ "name":"bytes1",
+ "type":"bytes",
+ "doc":"this field is bytes1"
+ }
+ ]
+}
23 src/test/avro/test_serializer.avsc
@@ -0,0 +1,23 @@
+{
+ "namespace": "com.linkedin.haivvreo",
+ "name": "test_serializer",
+ "type": "record",
+ "fields": [
+ { "name":"string1", "type":"string" },
+ { "name":"int1", "type":"int" },
+ { "name":"tinyint1", "type":"int" },
+ { "name":"smallint1", "type":"int" },
+ { "name":"bigint1", "type":"long" },
+ { "name":"boolean1", "type":"boolean" },
+ { "name":"float1", "type":"float" },
+ { "name":"double1", "type":"double" },
+ { "name":"list1", "type":{"type":"array", "items":"string"} },
+ { "name":"map1", "type":{"type":"map", "values":"int"} },
+ { "name":"struct1", "type":{"type":"record", "name":"struct1_name", "fields": [
+ { "name":"sInt", "type":"int" }, { "name":"sBoolean", "type":"boolean" }, { "name":"sString", "type":"string" } ] } },
+ { "name":"union1", "type":["float", "boolean", "string"] },
+ { "name":"enum1", "type":{"type":"enum", "name":"enum1_values", "symbols":["BLUE","RED", "GREEN"]} },
+ { "name":"nullableint", "type":["int", "null"] },
+ { "name":"bytes1", "type":"bytes" },
+ { "name":"fixed1", "type":{"type":"fixed", "name":"threebytes", "size":3} }
+ ] }
406 src/test/java/com/linkedin/haivvreo/TestAvroDeserializer.java
@@ -0,0 +1,406 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestAvroDeserializer {
+ private final GenericData GENERIC_DATA = GenericData.get();
+
+ @Test
+ public void canDeserializeVoidType() throws IOException, SerDeException {
+ String schemaString = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\": \"isANull\", \"type\": \"null\"}\n" +
+ " ]\n" +
+ "}";
+ Schema s = Schema.parse(schemaString);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("isANull", null);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theVoidObject = row.get(0);
+ assertNull(theVoidObject);
+
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ StructField fieldRef = oi.getStructFieldRef("isANull");
+
+ Object shouldBeNull = oi.getStructFieldData(row, fieldRef);
+ assertNull(shouldBeNull);
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof VoidObjectInspector);
+ }
+
+ @Test
+ public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.MAP_WITH_PRIMITIVE_VALUE_TYPE);
+ GenericData.Record record = new GenericData.Record(s);
+
+ Map<String, Long> m = new Hashtable<String, Long>();
+ m.put("one", 1l);
+ m.put("two", 2l);
+ m.put("three", 3l);
+
+ record.put("aMap", m);
+ assertTrue(GENERIC_DATA.validate(s, record));
+ System.out.println("record = " + record);
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theMapObject = row.get(0);
+ assertTrue(theMapObject instanceof Map);
+ Map theMap = (Map)theMapObject;
+
+ // Verify the raw object that's been created
+ assertEquals(1l, theMap.get("one"));
+ assertEquals(2l, theMap.get("two"));
+ assertEquals(3l, theMap.get("three"));
+
+ // Verify that the provided object inspector can pull out these same values
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+
+ List<Object> z = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, z.size());
+ StructField fieldRef = oi.getStructFieldRef("amap");
+
+ Map theMap2 = (Map)oi.getStructFieldData(row, fieldRef);
+ assertEquals(1l, theMap2.get("one"));
+ assertEquals(2l, theMap2.get("two"));
+ assertEquals(3l, theMap2.get("three"));
+ }
+
+ @Test
+ public void canDeserializeArrays() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE);
+ GenericData.Record record = new GenericData.Record(s);
+
+ List<String> list = new ArrayList<String>();
+ list.add("Eccleston");
+ list.add("Tennant");
+ list.add("Smith");
+
+ record.put("anArray", list);
+ assertTrue(GENERIC_DATA.validate(s, record));
+ System.out.println("Array-backed record = " + record);
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+ List theList = (List)theArrayObject;
+
+ // Verify the raw object that's been created
+ assertEquals("Eccleston", theList.get(0));
+ assertEquals("Tennant", theList.get(1));
+ assertEquals("Smith", theList.get(2));
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ StructField fieldRefToArray = oi.getStructFieldRef("anArray");
+
+ Object anArrayData = oi.getStructFieldData(row, fieldRefToArray);
+ StandardListObjectInspector anArrayOI = (StandardListObjectInspector)fieldRefToArray.getFieldObjectInspector();
+ assertEquals(3, anArrayOI.getListLength(anArrayData));
+
+ JavaStringObjectInspector elementOI = (JavaStringObjectInspector)anArrayOI.getListElementObjectInspector();
+
+ Object firstElement = anArrayOI.getListElement(anArrayData, 0);
+ assertEquals("Eccleston", elementOI.getPrimitiveJavaObject(firstElement));
+ assertTrue(firstElement instanceof String);
+
+ Object secondElement = anArrayOI.getListElement(anArrayData, 1);
+ assertEquals("Tennant", elementOI.getPrimitiveJavaObject(secondElement));
+ assertTrue(secondElement instanceof String);
+
+ Object thirdElement = anArrayOI.getListElement(anArrayData, 2);
+ assertEquals("Smith", elementOI.getPrimitiveJavaObject(thirdElement));
+ assertTrue(thirdElement instanceof String);
+
+ }
+
+ @Test
+ public void canDeserializeRecords() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+ GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema());
+ innerRecord.put("int1", 42);
+ innerRecord.put("boolean1", true);
+ innerRecord.put("long1", 42432234234l);
+ record.put("aRecord", innerRecord);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theRecordObject = row.get(0);
+ System.out.println("theRecordObject = " + theRecordObject.getClass().getCanonicalName());
+
+ // The original record was lost in the deserialization, so just go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<? extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, allStructFieldRefs.size());
+ StructField fieldRefForaRecord = allStructFieldRefs.get(0);
+ assertEquals("arecord", fieldRefForaRecord.getFieldName());
+ Object innerRecord2 = oi.getStructFieldData(row, fieldRefForaRecord); // <--- use this!
+
+ // Extract innerRecord field refs
+ StandardStructObjectInspector innerRecord2OI = (StandardStructObjectInspector) fieldRefForaRecord.getFieldObjectInspector();
+
+ List<? extends StructField> allStructFieldRefs1 = innerRecord2OI.getAllStructFieldRefs();
+ assertEquals(3, allStructFieldRefs1.size());
+ assertEquals("int1", allStructFieldRefs1.get(0).getFieldName());
+ assertEquals("boolean1", allStructFieldRefs1.get(1).getFieldName());
+ assertEquals("long1", allStructFieldRefs1.get(2).getFieldName());
+
+ innerRecord2OI.getStructFieldsDataAsList(innerRecord2);</