Permalink
Browse files

Rather than throwing exception on schema problems, signal the problem

via a message to the user in the form of a schema description.  This
leaves the serde in a good state and Hive allows changes to the
properties to correct it.
  • Loading branch information...
1 parent 32ea941 commit 2641d294e0e0c9b5aa0a92d76743d73200e330e4 @jghoman jghoman committed Jul 25, 2011
View
@@ -224,6 +224,10 @@ Hive tends to swallow exceptions from Haivvreo that occur before job submission.
FAQ
---
+* Why do I get **error-error-error-error-error-error-error** and a message to check schema.literal and schema.url when describing a table or ruinning a query against a table?
+
+> Haivvreo returns this message when it has trouble finding or parsing the schema provided by either the schema.literal or schema.url value. It is unable to be more specific because Hive expects all calls to the serde config methods to be successful, meaning we are unable to return an actual exception. By signaling an error via this message, the table is left in a good state and the incorrect value can be corrected with a call to *alter table T set serdeproperties*.
+
* Why do I get a **java.io.IOException: com.linkedin.haivvreo.HaivvreoException: Neither schema.literal nor schema.url specified, can't determine table schema** when pruning by a partition that doesn't exist?
> Hive creates a temporary empty file for non-existent partitions in order that queries referencing them succeed (returning a count of zero rows). However, when doing so, it doesn't pass the correct information to the RecordWriter, leaving Haivvreo unable to construct one. This problem has been corrected in [Hive 0.8](https://issues.apache.org/jira/browse/HIVE-2260) . With previous versions of Hive, either be sure to only filter on existing partitions or apply HIVE-2260.
@@ -44,7 +44,7 @@
Progressable progressable) throws IOException {
Schema schema;
try {
- schema = HaivvreoUtils.determineSchema(properties);
+ schema = HaivvreoUtils.determineSchemaOrThrowException(properties);
} catch (HaivvreoException e) {
throw new IOException(e);
}
@@ -82,7 +82,7 @@ private Schema getSchema(JobConf job) throws HaivvreoException, IOException {
for (PartitionDesc pd : aliasToPartnInfo.values()) {
Properties props = pd.getProperties();
if(props.containsKey(HaivvreoUtils.SCHEMA_LITERAL) || props.containsKey(HaivvreoUtils.SCHEMA_URL)) {
- Schema schema = HaivvreoUtils.determineSchema(props);
+ Schema schema = HaivvreoUtils.determineSchemaOrThrowException(props);
// while we're here, let's stash the schema in the jobconf for anyone coming after us
job.set(AvroSerDe.HAIVVREO_SCHEMA, schema.toString(false));
return schema;
@@ -23,15 +23,13 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
import java.util.List;
import java.util.Properties;
/**
* Read or write Avro data from Hive.
*/
public class AvroSerDe implements SerDe {
-
public static final String HAIVVREO_SCHEMA = "haivvreo.schema";
private ObjectInspector oi;
private List<String> columnNames;
@@ -40,14 +38,20 @@
private AvroDeserializer avroDeserializer = null;
private AvroSerializer avroSerializer = null;
+ private boolean badSchema = false;
+
@Override
public void initialize(Configuration configuration, Properties properties) throws SerDeException {
- try {
- schema = HaivvreoUtils.determineSchema(properties);
- configuration.set(HAIVVREO_SCHEMA, schema.toString(false));
- } catch (IOException e) {
- throw new HaivvreoException(e);
- }
+ // Reset member variables so we don't get in a half-constructed state
+ schema = null;
+ oi = null;
+ columnNames = null;
+ columnTypes = null;
+
+ schema = HaivvreoUtils.determineSchemaOrReturnErrorSchema(properties);
+ configuration.set(HAIVVREO_SCHEMA, schema.toString(false));
+
+ badSchema = schema.equals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA);
AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(schema);
this.columnNames = aoig.getColumnNames();
@@ -62,11 +66,13 @@ public void initialize(Configuration configuration, Properties properties) throw
@Override
public Writable serialize(Object o, ObjectInspector objectInspector) throws SerDeException {
+ if(badSchema) throw new BadSchemaException();
return getSerializer().serialize(o, objectInspector, columnNames, columnTypes, schema);
}
@Override
public Object deserialize(Writable writable) throws SerDeException {
+ if(badSchema) throw new BadSchemaException();
return getDeserializer().deserialize(columnNames, columnTypes, writable, schema);
}
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+public class BadSchemaException extends HaivvreoException {
+}
@@ -16,6 +16,8 @@
package com.linkedin.haivvreo;
import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -27,6 +29,8 @@
import java.util.Properties;
class HaivvreoUtils {
+ private static final Log LOG = LogFactory.getLog(AvroDeserializer.class);
+
public static final String SCHEMA_LITERAL = "schema.literal";
public static final String SCHEMA_URL = "schema.url";
public static final String SCHEMA_NONE = "none";
@@ -40,7 +44,7 @@
* @throws IOException if error while trying to read the schema from another location
* @throws HaivvreoException if unable to find a schema or pointer to it in the properties
*/
- public static Schema determineSchema(Properties properties) throws IOException, HaivvreoException {
+ public static Schema determineSchemaOrThrowException(Properties properties) throws IOException, HaivvreoException {
String schemaString = properties.getProperty(SCHEMA_LITERAL);
if(schemaString != null && !schemaString.equals(SCHEMA_NONE))
return Schema.parse(schemaString);
@@ -60,6 +64,24 @@ public static Schema determineSchema(Properties properties) throws IOException,
return Schema.parse(new URL(schemaString).openStream());
}
+ /**
+ * Attempt to determine the schema via the usual means, but do not throw
+ * an exception if we fail. Instead, signal failure via a special
+ * schema. This is used because Hive calls init on the serde during
+ * any call, including calls to update the serde properties, meaning
+ * if the serde is in a bad state, there is no way to update that state.
+ */
+ public static Schema determineSchemaOrReturnErrorSchema(Properties props) {
+ try {
+ return determineSchemaOrThrowException(props);
+ } catch(HaivvreoException he) {
+ LOG.warn("Encountered HaivvreoException determing schema. Returning signal schema to indicate problem", he);
+ return SchemaResolutionProblem.SIGNAL_BAD_SCHEMA;
+ } catch (Exception e) {
+ LOG.warn("Encountered exception determing schema. Returning signal schema to indicate problem", e);
+ return SchemaResolutionProblem.SIGNAL_BAD_SCHEMA;
+ }
+ }
// Protected for testing and so we can pass in a conf for testing.
protected static Schema getSchemaFromHDFS(String schemaHDFSUrl, Configuration conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.haivvreo;
+
+
+import org.apache.avro.Schema;
+
+class SchemaResolutionProblem {
+ static final String sentinelString = "{\n" +
+ " \"namespace\": \"com.linkedin.haivvreo\",\n" +
+ " \"name\": \"CannotDetermineSchemaSentinel\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"ERROR-ERROR-ERROR-ERROR-ERROR-ERROR-ERROR\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"Cannot_determine_schema\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"check:\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"schema.url:\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"and:\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"schema.literal:\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public final static Schema SIGNAL_BAD_SCHEMA = Schema.parse(sentinelString);
+}
@@ -18,13 +18,19 @@
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Writable;
import org.junit.Test;
+import org.mockito.Mockito;
+import java.util.List;
import java.util.Properties;
import static com.linkedin.haivvreo.AvroSerDe.HAIVVREO_SCHEMA;
import static com.linkedin.haivvreo.HaivvreoUtils.SCHEMA_LITERAL;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
public class TestAvroSerde {
static final String originalSchemaString = "{\n" +
@@ -72,4 +78,86 @@ public void initializeDoesNotReuseSchemasFromConf() throws SerDeException {
// in via the properties
assertEquals(newSchema, Schema.parse(conf.get(HAIVVREO_SCHEMA)));
}
+
+ @Test
+ public void noSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void gibberishSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(HaivvreoUtils.SCHEMA_LITERAL, "blahblahblah");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void emptySchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(HaivvreoUtils.SCHEMA_LITERAL, "");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void badSchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(HaivvreoUtils.SCHEMA_URL, "not://a/url");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void emptySchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(HaivvreoUtils.SCHEMA_URL, "");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void bothPropertiesSetToNoneReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(HaivvreoUtils.SCHEMA_URL, HaivvreoUtils.SCHEMA_NONE);
+ props.put(HaivvreoUtils.SCHEMA_LITERAL, HaivvreoUtils.SCHEMA_NONE);
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ private void verifyErrorSchemaReturned(Properties props) throws SerDeException {
+ AvroSerDe asd = new AvroSerDe();
+ asd.initialize(new Configuration(), props);
+ assertTrue(asd.getObjectInspector() instanceof StandardStructObjectInspector);
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)asd.getObjectInspector();
+ List<? extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA.getFields().size(), allStructFieldRefs.size());
+ StructField firstField = allStructFieldRefs.get(0);
+ assertTrue(firstField.toString().contains("error-error-error-error-error-error-error"));
+
+ try {
+ Writable mock = Mockito.mock(Writable.class);
+ asd.deserialize(mock);
+ fail("Should have thrown a BadSchemaException");
+ } catch (BadSchemaException bse) {
+ // good
+ }
+
+ try {
+ Object o = Mockito.mock(Object.class);
+ ObjectInspector mockOI = Mockito.mock(ObjectInspector.class);
+ asd.serialize(o, mockOI);
+ fail("Should have thrown a BadSchemaException");
+ } catch (BadSchemaException bse) {
+ // good
+ }
+ }
+
+ @Test
+ public void getSerializedClassReturnsCorrectType() {
+ AvroSerDe asd = new AvroSerDe();
+ assertEquals(AvroGenericRecordWritable.class, asd.getSerializedClass());
+ }
}
@@ -20,7 +20,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -110,7 +109,7 @@ public void getTypeFromNullableTypePositiveCase() {
@Test(expected=HaivvreoException.class)
public void determineSchemaThrowsExceptionIfNoSchema() throws IOException, HaivvreoException {
Properties prop = new Properties();
- HaivvreoUtils.determineSchema(prop);
+ HaivvreoUtils.determineSchemaOrThrowException(prop);
}
@Test
@@ -119,7 +118,7 @@ public void determineSchemaFindsLiterals() throws Exception {
Properties props = new Properties();
props.put(HaivvreoUtils.SCHEMA_LITERAL, schema);
Schema expected = Schema.parse(schema);
- assertEquals(expected, HaivvreoUtils.determineSchema(props));
+ assertEquals(expected, HaivvreoUtils.determineSchemaOrThrowException(props));
}
@Test
@@ -128,7 +127,7 @@ public void detemineSchemaTriesToOpenUrl() throws HaivvreoException, IOException
props.put(HaivvreoUtils.SCHEMA_URL, "not:///a.real.url");
try {
- HaivvreoUtils.determineSchema(props);
+ HaivvreoUtils.determineSchemaOrThrowException(props);
fail("Should have tried to open that URL");
} catch(MalformedURLException e) {
assertEquals("unknown protocol: not", e.getMessage());
@@ -143,7 +142,7 @@ public void noneOptionWorksForSpecifyingSchemas() throws IOException, HaivvreoEx
props.put(SCHEMA_URL, SCHEMA_NONE);
props.put(SCHEMA_LITERAL, SCHEMA_NONE);
try {
- determineSchema(props);
+ determineSchemaOrThrowException(props);
fail("Should have thrown exception with none set for both url and literal");
} catch(HaivvreoException he) {
assertEquals(EXCEPTION_MESSAGE, he.getMessage());
@@ -153,7 +152,7 @@ public void noneOptionWorksForSpecifyingSchemas() throws IOException, HaivvreoEx
props.put(SCHEMA_LITERAL, TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
Schema s;
try {
- s = determineSchema(props);
+ s = determineSchemaOrThrowException(props);
assertNotNull(s);
assertEquals(Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA), s);
} catch(HaivvreoException he) {
@@ -164,7 +163,7 @@ public void noneOptionWorksForSpecifyingSchemas() throws IOException, HaivvreoEx
props.put(SCHEMA_LITERAL, SCHEMA_NONE);
props.put(SCHEMA_URL, "not:///a.real.url");
try {
- determineSchema(props);
+ determineSchemaOrThrowException(props);
fail("Should have tried to open that bogus URL");
} catch(MalformedURLException e) {
assertEquals("unknown protocol: not", e.getMessage());

0 comments on commit 2641d29

Please sign in to comment.