Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Preconditions.checkNotNull with warning in SequenceFileStorage.putNext #89

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 122 additions & 70 deletions src/java/com/twitter/elephantbird/pig/load/SequenceFileLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
Expand All @@ -52,32 +51,29 @@

/**
* Pig LoadFunc supporting conversion from key, value objects stored within {@link SequenceFile}s to
* Pig objects. Example:
* Pig objects. Example usage:
*
* <pre>
* key_val = LOAD '$INPUT' USING com.twitter.elephantbird.pig.load.SequenceFileLoader (
* pairs = LOAD '$INPUT' USING com.twitter.elephantbird.pig.load.SequenceFileLoader (
* '-c com.twitter.elephantbird.pig.util.IntWritableConverter',
* '-c com.twitter.elephantbird.pig.util.TextConverter'
* ) as (
* key: int,
* val: chararray
* value: chararray
* );
*
* -- or, making use of defaults
* key_val = LOAD '$INPUT' USING com.twitter.elephantbird.pig.load.SequenceFileLoader () as (
* key: chararray,
* val: chararray
* );
* pairs = LOAD '$INPUT' USING com.twitter.elephantbird.pig.load.SequenceFileLoader ();
* </pre>
*
* @author Andy Schlaikjer
* @see WritableConverter
*/
public class SequenceFileLoader<K extends Writable, V extends Writable> extends FileInputLoadFunc
implements LoadPushDown, LoadMetadata {
protected static final String CONVERTER_PARAM = "converter";
protected static final String READ_KEY_PARAM = "_readKey";
protected static final String READ_VALUE_PARAM = "_readValue";
public static final String CONVERTER_PARAM = "converter";
private static final String READ_KEY_PARAM = "_readKey";
private static final String READ_VALUE_PARAM = "_readValue";
protected final CommandLine keyArguments;
protected final CommandLine valueArguments;
protected final WritableConverter<K> keyConverter;
Expand All @@ -89,7 +85,8 @@ public class SequenceFileLoader<K extends Writable, V extends Writable> extends
private final TupleFactory tupleFactory = TupleFactory.getInstance();
protected String signature;
private RecordReader<DataInputBuffer, DataInputBuffer> reader;
private boolean readKey = true, readValue = true;
private boolean readKey = true;
private boolean readValue = true;

/**
* Parses key and value options from argument strings. Available options for both key and value
Expand All @@ -102,11 +99,18 @@ public class SequenceFileLoader<K extends Writable, V extends Writable> extends
* Any extra arguments found will be treated as String arguments for the WritableConverter
* constructor. For instance, the argument string {@code "-c MyConverter 123 abc"} specifies
* WritableConverter class {@code MyConverter} along with two constructor arguments {@code "123"}
* and {@code "abc"}. This will cause SequenceFileLoader to invoke
* {@code MyConverter(String arg1, String arg2)} with the given values when creating a new
* instance of MyConverter. If no such constructor exists, constructor
* {@code new MyConverter(String[] args)} is attempted. If this also fails, a RuntimeException
* will be thrown.
* and {@code "abc"}. This will cause SequenceFileLoader to attempt to invoke the following
* constructors, in order, to create a new instance of MyConverter:
* <ol>
* <li><code>MyConverter(String arg1, String arg2)</code> -- constructor arguments are passed as
* explicit arguments.</li>
* <li><code>MyConverter(String[] args)</code> -- constructor arguments are passed within a String
* array.</li>
* <li><code>MyConverter(String... args)</code> -- same as above, with var args syntax.</li>
* <li><code>MyConverter(String argString)</code> -- constructor arguments are joined with space
* char to create {@code argString}.</li>
* </ol>
* If none of these constructors exist, a RuntimeException will be thrown.
*
* <p>
* Note that WritableConverter constructor arguments prefixed by one or more hyphens will be
Expand All @@ -121,25 +125,32 @@ public class SequenceFileLoader<K extends Writable, V extends Writable> extends
* );
* </pre>
*
* The above Pig script will cause SequenceFileLoader to attempt execution of
* {@code MyComplexWritableConverter("basic", "options", "here", "--complex", "-options", "here")}.
*
* @param keyArgs
* @param valueArgs
* @throws ParseException
* @throws IOException
*/
public SequenceFileLoader(String keyArgs, String valueArgs) throws ParseException {
keyArguments = parseArguments(keyArgs);
valueArguments = parseArguments(valueArgs);
public SequenceFileLoader(String keyArgs, String valueArgs) throws ParseException, IOException {
// parse key, value arguments
Options options = getOptions();
keyArguments = parseArguments(options, keyArgs);
valueArguments = parseArguments(options, valueArgs);

// construct key, value converters
keyConverter = getWritableConverter(keyArguments);
valueConverter = getWritableConverter(valueArguments);

// initialize key, value converters
initialize();
}

/**
* Default constructor. Defaults used for all options.
*
* @throws ParseException
* @throws IOException
*/
public SequenceFileLoader() throws ClassCastException, ParseException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
public SequenceFileLoader() throws ParseException, IOException {
this("", "");
}

Expand All @@ -155,7 +166,7 @@ protected Options getOptions() {
.withArgName("cls")
.withDescription(
"Converter type to use for conversion of data." + " Defaults to '"
+ TextConverter.class.getName() + "' for key and value.").create("c");
+ TextConverter.class.getName() + "'.").create("c");
return new Options().addOption(converterOption);
}

Expand All @@ -164,8 +175,7 @@ protected Options getOptions() {
* @return CommandLine instance containing options parsed from argument string.
* @throws ParseException
*/
private CommandLine parseArguments(String args) throws ParseException {
Options options = getOptions();
private static CommandLine parseArguments(Options options, String args) throws ParseException {
CommandLine cmdline = null;
try {
cmdline = new GnuParser().parse(options, args.split(" "));
Expand All @@ -177,43 +187,71 @@ private CommandLine parseArguments(String args) throws ParseException {
return cmdline;
}

/**
* @param arguments
* @return new WritableConverter instance constructed using given arguments.
*/
@SuppressWarnings("unchecked")
private static <T extends Writable> WritableConverter<T> getWritableConverter(
CommandLine arguments) {
// get remaining non-empty argument strings from commandline
String[] converterArgs = removeEmptyArgs(arguments.getArgs());

// create writable converter instance
WritableConverter<T> converter = null;
try {
// get writable converter class

// get converter classname
String converterClassName =
arguments.getOptionValue(CONVERTER_PARAM, TextConverter.class.getName());

// get converter class
Class<WritableConverter<T>> converterClass =
(Class<WritableConverter<T>>) Class.forName(arguments.getOptionValue(CONVERTER_PARAM,
TextConverter.class.getName()));
(Class<WritableConverter<T>>) Class.forName(converterClassName);

// construct converter instance
if (converterArgs == null || converterArgs.length == 0) {

// use default ctor
converter = converterClass.newInstance();
return converterClass.newInstance();

} else {
try {

// look up ctor having explicit number of String arguments
Class<?>[] parameterTypes = new Class<?>[converterArgs.length];
Arrays.fill(parameterTypes, String.class);
Constructor<WritableConverter<T>> ctor = converterClass.getConstructor(parameterTypes);
converter = ctor.newInstance((Object[]) converterArgs);
return ctor.newInstance((Object[]) converterArgs);

} catch (NoSuchMethodException e) {
// look up ctor having single String[] (or String... varargs) argument
Constructor<WritableConverter<T>> ctor =
converterClass.getConstructor(new Class<?>[] { String[].class });
converter = ctor.newInstance((Object) converterArgs);
try {

// look up ctor having single String[] (or String... varargs) argument
Constructor<WritableConverter<T>> ctor =
converterClass.getConstructor(new Class<?>[] { String[].class });
return ctor.newInstance((Object) converterArgs);

} catch (NoSuchMethodException e2) {

// look up ctor having single String argument and join args together
Constructor<WritableConverter<T>> ctor =
converterClass.getConstructor(new Class<?>[] { String.class });
StringBuilder sb = new StringBuilder(converterArgs[0]);
for (int i = 1; i < converterArgs.length; ++i) {
sb.append(" ").append(converterArgs[i]);
}
return ctor.newInstance(sb.toString());

}
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to create WritableConverter instance", e);
}
return converter;
}

/**
* @param args
* @return new String[] containing non-empty values from args.
*/
private static String[] removeEmptyArgs(String[] args) {
List<String> converterArgsFiltered = Lists.newArrayList();
for (String arg : args) {
Expand All @@ -224,18 +262,14 @@ private static String[] removeEmptyArgs(String[] args) {
return converterArgsFiltered.toArray(new String[0]);
}

private Properties getContextProperties() {
Preconditions.checkNotNull(signature, "Signature is null");
return UDFContext.getUDFContext().getUDFProperties(getClass(), new String[] { signature });
}

private void setContextProperty(String name, String value) {
Preconditions.checkNotNull(name, "Context property name is null");
getContextProperties().setProperty(signature + name, value);
}

private String getContextProperty(String name, String defaultValue) {
return getContextProperties().getProperty(signature + name, defaultValue);
/**
* Initializes key, value WritableConverters.
*
* @throws IOException
*/
protected void initialize() throws IOException {
keyConverter.initialize(null);
valueConverter.initialize(null);
}

@Override
Expand All @@ -258,6 +292,20 @@ public void setUDFContextSignature(String signature) {
this.signature = signature;
}

protected Properties getContextProperties() {
Preconditions.checkNotNull(signature, "Signature is null");
return UDFContext.getUDFContext().getUDFProperties(getClass(), new String[] { signature });
}

protected void setContextProperty(String name, String value) {
Preconditions.checkNotNull(name, "Context property name is null");
getContextProperties().setProperty(signature + name, value);
}

protected String getContextProperty(String name, String defaultValue) {
return getContextProperties().getProperty(signature + name, defaultValue);
}

@Override
public List<OperatorSet> getFeatures() {
return ImmutableList.of(OperatorSet.PROJECTION);
Expand All @@ -273,6 +321,7 @@ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
switch (i) {
case 0:
readKey = true;
// TODO(Andy Schlaikjer) enable schema checking here?
// try {
// keyConverter.checkLoadSchema(ResourceSchemaUtil.createResourceFieldSchema(field));
// } catch (IOException e) {
Expand All @@ -281,6 +330,7 @@ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
break;
case 1:
readValue = true;
// TODO(Andy Schlaikjer) enable schema checking here?
// try {
// valueConverter.checkLoadSchema(ResourceSchemaUtil.createResourceFieldSchema(field));
// } catch (IOException e) {
Expand All @@ -297,30 +347,24 @@ public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
return new RequiredFieldResponse(true);
}

@Override
public void setLocation(String location, Job job) throws IOException {
Preconditions.checkNotNull(location, "Location is null");
Preconditions.checkNotNull(location, "Job is null");
FileInputFormat.setInputPaths(job, new Path(location));
readKey = Boolean.parseBoolean(getContextProperty(READ_KEY_PARAM, "true"));
readValue = Boolean.parseBoolean(getContextProperty(READ_VALUE_PARAM, "true"));
}

@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
ResourceSchema resourceSchema = new ResourceSchema();
// determine key field schema
ResourceFieldSchema keySchema = keyConverter.getLoadSchema();
if (keySchema == null) {
keySchema = new ResourceFieldSchema();
keySchema.setType(DataType.BYTEARRAY);
return null;
}
keySchema.setName("key");

// determine value field schema
ResourceFieldSchema valueSchema = valueConverter.getLoadSchema();
if (valueSchema == null) {
valueSchema = new ResourceFieldSchema();
valueSchema.setType(DataType.BYTEARRAY);
return null;
}
valueSchema.setName("value");

// return tuple schema
ResourceSchema resourceSchema = new ResourceSchema();
resourceSchema.setFields(new ResourceFieldSchema[] { keySchema, valueSchema });
return resourceSchema;
}
Expand Down Expand Up @@ -359,11 +403,19 @@ public void setPartitionFilter(Expression expression) throws IOException {
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void setLocation(String location, Job job) throws IOException {
Preconditions.checkNotNull(location, "Location is null");
Preconditions.checkNotNull(job, "Job is null");
Path inputPath = new Path(location);
FileInputFormat.setInputPaths(job, inputPath);
readKey = Boolean.parseBoolean(getContextProperty(READ_KEY_PARAM, "true"));
readValue = Boolean.parseBoolean(getContextProperty(READ_VALUE_PARAM, "true"));
}

@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
this.reader = reader;
keyConverter.initialize(null);
valueConverter.initialize(null);
}

@Override
Expand Down
Loading