Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LZO Thrift read support in Presto-hive. #75

Merged

Conversation

Yaliang
Copy link
Collaborator

@Yaliang Yaliang commented Mar 14, 2017

I wish to expose code in twitter fork's repo first, make it possible to discuss this implementation with the team and get some feedbacks. The DummyClass is somehow able to bypass, we may able to read from thrift blob and transform into presto block directly. In that way, it may be more meaningful to the upstream since it may no longer need the elephant-bird.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.elephantbird.hive.serde;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why elephantbird in the package name? This class is generic and not dependent on EB.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because the dal-sync was using the elephant-bird's hive serDe. Later, I realize it's not worth to use their serDe class. Since the elephant-bird dependency is not in the hive, as a result I cannot use hive-cli to alter the serDe class. Meanwhile, I don't want to change dal-sync for one more turn of initial backfill, which caused the dal failed all its client's request and some people got paged.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we should move this class to it's proper home (without elephantbird) before we ship, since it's basically naming technical debt, which is much easier to fix before we go live than after.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, completely right. Let's wait dal finishing their fix.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we either commit this upstream or put it in the twitter package? Same for ThriftHiveRecordCursor.java.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let's put them in twitter package first.

@Yaliang Yaliang changed the title Add LZO Thrift support in Hive. Add LZO Thrift read support in Presto-hive. Mar 16, 2017
@Yaliang
Copy link
Collaborator Author

Yaliang commented Mar 29, 2017

+cc @maosongfu

@Yaliang
Copy link
Collaborator Author

Yaliang commented Mar 31, 2017

Upstream modified BackgroundHiveSplitLoader in order to support customized InputFormat. I will take a look if we can adapt to that approach. Since we need FileSystem to read the index file, it may be a little bit different. Searched the annotation for the customized InputFormat, It's from uber/hoodie.

@Yaliang
Copy link
Collaborator Author

Yaliang commented Apr 13, 2017

Ready for review. @billonahill @dabaitu

Copy link
Collaborator

@billonahill billonahill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide unit tests for the 7 new tests. Especially tests for how we read fields from thrift objects.

throws IOException
{
if (bucketHandle.isPresent()) {
throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "Bucketed table in ThriftGeneralInputFormat is not yet supported");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please include info in the exception message to indicate the table, that's not supported. Also what is a bucketed table? We should make the exception more clear to a user without detailed knowledge of the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I basically follow this line:

https://github.com/twitter-forks/presto/blob/twitter-master/presto-hive/src/main/java/com/facebook/presto/hive/BackgroundHiveSplitLoader.java#L327

I think the bucketed table means the rows are pre-sampled based on some columns in order to save the query time. Since we are using the external table for all, we simply put NOT_SUPPORTED should be fine. Here is the hive wiki:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It's basically a field-partitioned table. We should include the table name in the exception message.


while (hiveFileIterator.hasNext()) {
LocatedFileStatus file = hiveFileIterator.next();
if (!ThriftGeneralInputFormat.lzoSuffixFilter.accept(file.getPath())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead exposing a public static member of this class with an accept method, could we instead expose a static method on ThriftGeneralInputFormat like isLzoFile? That makes the expected usage more straight forward and the implementation more flexible to be changed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but what I am doing is mixing the logic from LzoInputFormat in elephant-bird and LzoIndex in hadoop-lzo. I am not quite sure if it's the best way to filter the files by exposing either a static method isLzoFile or a static member lzoSuffixFilter. We can also return an empty list if the passed file path shows it is not an LZO file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar enough with those classes OTTOMH to comment onthe mixing, but I think it's better to support a boolean method than to return an empty list.

while (hiveFileIterator.hasNext()) {
LocatedFileStatus file = hiveFileIterator.next();
if (!ThriftGeneralInputFormat.lzoSuffixFilter.accept(file.getPath())) {
continue;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use case for why non-lzo files would be passed to addLzoThriftSplitsToQueue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index file will be passed

this.root = root;
}

public ThriftFieldIdResolver initialize(Properties schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this method is not initializing the instance but instead returning a new one, this should be a static factor method and the above constructors should be private. Same for the next method:

public static ThriftFieldIdResolver newResolver(Properties schema)
public static ThriftFieldIdResolver newNestedResolver(JsonNode root, int hiveIndex)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think more carefully for these. By saying newNestedResolver(JsonNode root, int hiveIndex) we will lose other possible solution other than a JSON field in the schema. The purpose I make ThriftFieldIdResolver as an interface and HiveThriftFieldIdResolver as an implementation was trying to giving a more flexible solution in future. Let's say in future we can make a service call to DAL to resolve the thrift field id, we may just call initialize to set some context, and return itself as a delegate to DAL and always use the same object.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the interface/impl pattern is fine, just don't mix in the builder pattern as well via instance methods on the impl. The interface should just have what's needed for their usage (i.e, short getThriftId(int hiveIndex);). The builder methods should not be included and should be handled separately as static initializes. When changing the implementation, a new factory method on a new class can be used, but the downstream consumers would remain unaffected since the interface contract they care about is consistent.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I was thinking in the same way where we should separate builder and instance methods. I agree on the initial constructor. But I cannot agree on how to build the nested resolver. The nested resolver builder must be a part of the interface because the nested resolver relies on the instantiated root resolver. If we have to hold the JsonNode root in our case, we just don't need this class at all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the refactor.

try {
return new HiveThriftFieldIdResolver(objectMapper.readTree(jsonData));
}
catch (Exception e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why catch and swallow Exception here and below? Can we be more specific re which types of exceptions we want to ignore? This is too loose in that any kind of bug or exception will change what's returned, probably without a log warning. Is this code path really expected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be fine. It's saying let's try to read this JSON property. If it's not presented or corrupted, no worries, let's try a default id resolver which simply does hiveIndex+1.

Another place uses similar try catch box for JSON is here:

https://github.com/twitter-forks/presto/blob/twitter-master/presto-record-decoder/src/main/java/com/facebook/presto/decoder/json/JsonRowDecoder.java#L62

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern (of catching Exception) is really bad, because it encapsulates anything that could possibly go wrong and assigns a single solution to it. If you want to handle the case where the json is not presented, handle it with a null check. Same for the case where it might be corrupted, handle it by catching the expected exception that would be thrown in that case (JsonProcessingException?) and logging it.

What if you had corrupt json in the system due to a bug in dalsync? The current approach would mask that forever as an expected condition (you have to assume we're not running in production with debug logging enabled, and even if we were the scenario I describe is worthy of error logging).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the null check.
For the rest, let's break this conversation to more specific:

  1. Should we have a default hiveIndex+1 behaviors?
  2. What level of the log should we log for what type of exception with the factor that the resolver will be called very frequently?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you encounter what we think is a bug, we should log it and ideally processing should fail. if it's an expected case, like an unsupported type, we should proceed without logging.

For the hiveIndex + 1 thing, why do we do that currently? Is that because the id space is offset by 1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's because the id space is offset by 1.

super(new MultiInputFormat());
}

private void initialize(FileSplit split, JobConf job) throws IOException
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be part of the constructor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the initialize method is called in getRecordReader method. There is nothing specified for ThriftGenericRow even if we only use ThriftGenericRow. Also in general, the constructor of an InputFormat is often called without parameters so we would better leave initialize method as is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a valid reason for why object creation and initialization needs to happen in different times in the code, then fine to keep it as is,

setInputFormatInstance(new MultiInputFormat(new TypeRef(thriftClass) {}));
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Failed getting class for " + thriftClassName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better presto exception we can throw here than RuntimeException?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we imitate https://github.com/twitter-forks/presto/blob/twitter-master/presto-hive/src/main/java/com/facebook/presto/hive/HiveUtil.java#L184 instead and throw PrestoException, which is a type of RuntimeException? Throwing runtime exception should be avoided. PrestoException was created to have a specific type of runtime exception that could be handle specifically and could contain more contectual information. For example our error reports break down types of presto exceptions.

I don't see the advantage of using RuntimeException over PrestoException in this case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw PrestoException here?

throws IOException
{
job.setBoolean("elephantbird.mapred.input.bad.record.check.only.in.close", false);
job.setFloat("elephantbird.mapred.input.bad.record.threshold", 0.0f);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We used to have to configure these often with EB in map reduce. We should make these configurable by table.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will allow JobConf to override these.

case TType.MAP:
return readMap(iprot);
default:
TProtocolUtil.skip(iprot, type);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skip if we don't find a know type? Is this an error condition that should be logged? What type(s) do we expect here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply follow the scrooge generated code which skips any unmatched type and field ids. Maybe we can log it, but most codes I have seen right now simply skip all unexpected conditions. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might not want to log in this case (maybe debug logging?) since it could be verbose if there are many unsupported fields.

default:
TProtocolUtil.skip(iprot, type);
}
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this into the default clause to make it explicit? Also what does returning null mean? Could we instead throw a TypeNotFoundException or something like that? Typically returning null is an anti-pattern.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning null means there is a field here but we cannot understand what it is. Let's mark it as null and continue. It may because the file is corrupt or something. Yes, we can just throw an exception and stop it. Do you think we should prevent this loose conversion?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the contract we're providing is that we return null if we can't handle the type, then we should stick with this. Would still be good to include the return in the default block though, so it's clear that default should a.) skip and; b.) return null.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will move it.

}

Configuration targetConfiguration = hdfsEnvironment.getConfiguration(file.getPath());
JobConf targetJob = new JobConf(targetConfiguration);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working with Hadoop configs can sometimes be expensive actually, they're notoriously inefficient. If there's a way we can get these configs once outside of the loop we should.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think it's better to keep it in the loop since the path changed for each file and we should keep the logic correct.
  2. For Presto, the hdfsEnvironment.getConfiguration(file.getPath()) will always return same Configuration. I think this also the reason why we have to use a soft link to dwrev files. Code:
    https://github.com/twitter-forks/presto/blob/twitter-master/presto-hive/src/main/java/com/facebook/presto/hive/HiveHdfsConfiguration.java#L66

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine then. Let's just be aware of it and note it as something to keep an eye on w.r.t. performance.

return (short) (hiveIndex + 1);
}

Short thriftId = thriftIds.get(Integer.valueOf(hiveIndex));
Copy link
Collaborator

@billonahill billonahill Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integer.valueOf is probably not needed here due to auto boxing. sane for below in the else statement

else {
chunkLength = index.alignSliceEndToIndex(offset + chunkLength, length) - offset;
}
log.debug("lzo split: %s (%s:%s)", path, offset, offset + chunkLength);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After testing can we remove this line? I suspect it would make it so debug logging is saturated with these and not useful for anything else. Have you found that to be the case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I use this line for manual verification. It will be removed later.

while (chunkOffset >= blockLocation.getLength()) {
// allow overrun for lzo compressed file for intermediate blocks
if (!isLzoCompressedFile(filePath) || blockLocation.getOffset() + blockLocation.getLength() >= length) {
checkState(chunkOffset == blockLocation.getLength(), "Error splitting blocks");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more context to the "Error splitting blocks" message to better describe what error was encountered.

@@ -535,13 +543,21 @@ private void addToHiveSplitSourceRoundRobin(List<Iterator<HiveSplit>> iteratorLi
Map<Integer, HiveType> columnCoercions)
throws IOException
{
Path filePath = new Path(path);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class has a lot of mixed in code now, which will make merging from master difficult and risky in the future. We should either contribute this code back to presto (preferred), or make it more merge-friendly if our plan is to not contribute it. In that case, is there a way to minimize changes to this class to only a few one-liners that call methods added to the bottom in a block, or maybe methods in a twitter-specific class? Or maybe adding method calls with empty bodies and then subclassing is possible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to contribute back to OSS since they are working on LZO text which could benefit from this change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Included in the upstream PR prestodb#7916

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code added in this file is to handle lzo,lzop decompression, right? Would switching to airlift lib help get rid of our additions here? Or is it that our lzop files might not all be indexed, so we need to handle it here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code added in this method is to perform a flexible splitting, not for decompression. If the index file is not present, line 592 will make it return the whole file as a split.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index being present vs absent is only with Thrift files?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more general truth is that the index is coming with lzop files. It's no aware of the data format. That's the reason why we want to contribute the patch for splittable lzo to upstream since they have support for lzo text and they should be able to benefit with our patch

@@ -279,6 +310,21 @@ static boolean isSplittable(InputFormat<?, ?> inputFormat, FileSystem fileSystem
}
}

public static boolean isLzoCompressedFile(Path filePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we plan to contribute this code back, please move these methods to a twitter-specific class for merge-friendliness.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to contribute to upstream

import static com.facebook.presto.hive.HiveUtil.checkCondition;
import static com.google.common.base.MoreObjects.toStringHelper;

public class HiveThriftFieldIdResolver
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make a unit test for this class

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TestHiveThriftFieldIdResolver

setInputFormatInstance(new MultiInputFormat(new TypeRef(thriftClass) {}));
}
catch (ClassNotFoundException e) {
throw new RuntimeException("Failed getting class for " + thriftClassName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw PrestoException here?

iprot.readStructEnd();
}

private Object readElem(TProtocol iprot, byte type) throws TException
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a unit test for this class/method that constructs a thrift object of all supported (and unsupported) types and then asserts that it can read (or fail appropriately) for each.

Copy link
Collaborator Author

@Yaliang Yaliang Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be covered in testLZOThrift.

oprot.writeStructEnd();
}

private void writeField(Map.Entry<Short, Object> field, TProtocol oprot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change this signature to more clearly reflect/convey what's in the Entry by instead passing the key values? This reads much more clearly:

private void writeField(short thriftFieIdId, Object fieldValue, TProtocol oprot)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will address.

private static final String LAZY_BINARY_SERDE = "org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe";
private static final String THRIFT_GENERIC_ROW = ThriftGenericRow.class.getName();
private static final Set<String> THRIFT_SERDE_CLASS_NAMES = ImmutableSet.<String>builder()
.add("com.facebook.presto.twitter.hive.thrift.ThriftGeneralSerDe")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There classes are already dependencies so we should do ThriftGeneralSerDe.class.getName()

return Optional.empty();
}

setPropertyIfUnset(schema, "elephantbird.mapred.input.bad.record.check.only.in.close", Boolean.toString(false));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these settings should be configurable, ideally per datatype. we should expect that bad records exist.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of configurable are you expecting here? It's configurable by table's scale currently. Say if the table's properties have these settings(user can use hive-cli to set these properties), it will respect the values from the table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize that that value could be set using hive-cli, that's great.

@Yaliang
Copy link
Collaborator Author

Yaliang commented Apr 27, 2017

With commit d0d2ae3, we now have an integrated unit test for lzo thrift.

[presto-hive (yaliangw/addThriftInHive)] $ pmvn test -Dtest=TestHiveFileFormats
...
[INFO] ------------------------------------------------------------------------
[INFO] Building presto-hive 0.170-tw-0.32
[INFO] ------------------------------------------------------------------------
...
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.facebook.presto.hive.TestHiveFileFormats
...
Results :

Tests run: 40, Failures: 0, Errors: 0, Skipped: 0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
...

The reason I called it integrated is because the test is actually walking through the whole process includes the ThriftHiveRecordCursorProvider, ThriftHiveRecordCursor, ThriftGeneralDeserializer, ThriftGenericRow, HiveThriftFieldIdResolverFactory, HiveThriftFieldIdResolver. I will add a separate unit test to test the HiveThriftFieldIdResolver can correctly understand the JSON string of hive-id-thrift-id mapping.

@Yaliang
Copy link
Collaborator Author

Yaliang commented Apr 27, 2017

I have submitted a separate PR prestodb#7916 in upstream for splitting splittable lzo. The support of LZO/LZOP codec is just merged this morning. Hopefully get their review soon.

@Yaliang
Copy link
Collaborator Author

Yaliang commented May 3, 2017

Ready for another round review. @billonahill @dabaitu

@@ -238,6 +262,13 @@ static String getInputFormatName(Properties schema)
return name;
}

public static String getSerializationClassName(Properties schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class has deps on com.facebook.presto.twitter.hive.thrift.ThriftGeneralInputFormat and references elephantbird so I assume you're not planning on contributing this to OSS. If that's the case, can we put these new methods in another twitter-specific class? That way we've left less a footprint on this class and can more easily merge from OSS in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me clarify the changes in this class. The change in line 195 must be placed in this class since it calls getRecordReader directly and we don't have the chance to rename the property, once we use HiveMultiInputFormat directly and remove com.facebook.presto.twitter.hive.thrift.ThriftGeneralInputFormat which is just a mirror of EB's HiveMultiInputFormat. The change on HiveMultiInputFormat has been merged in EB so we will remove line 249 to line 252 once the EB make a new release. The method getSerializationClassName could be moved to someplace out of this class. Then rest changes are included in PR prestodb#7916 so they will go to OSS.

private static final Logger log = Logger.get(HiveThriftFieldIdResolverFactory.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
public static final String THRIFT_FIELD_ID_JSON = "thrift.field.id.json";
public static final ThriftFieldIdResolver PLUSONE = new HiveThriftFieldIdResolver(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why PLUSONE? Can we give this a more descriptive name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it's doing the shifting as ThriftId = HiveId Plus one
Is there any suggestion?

Copy link
Collaborator

@billonahill billonahill May 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I missed that. Instead of naming it based on it's current implementation details, it seems more natural to name it based on what it is: HIVE_THRIFT_FIELD_ID_RESOLVER. That makes it more clear to the reader what class is returned by default.

@Test
public class TestHiveThriftFieldIdResolver
{
private static final Map<String, Object> STRUCT_FIELD_ID_AS_MAP = ImmutableMap.of(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use Map<String, Short> and avoid the cast.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this error:

incompatible types: inference variable V has incompatible bounds
[ERROR] equality constraints: java.lang.Short
[ERROR] lower bounds: java.lang.Integer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought auto-boxing would handle that for you. I guess not...

"1", (short) 2,
"id", (short) 4);

private static final Map<String, Object> LIST_FIELD_ID_AS_MAP = ImmutableMap.of(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we mixing string and short values in the same map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value for 0 can be any map. It represents the nested structure for the element's thrift id mapping.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

"0", "{}",
"id", (short) 5);

private static final Map<String, Object> VERBOSE_PRIMARY_FIELD_ID_AS_MAP = ImmutableMap.of(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, Short>

public void testDefaultResolver()
throws Exception
{
ThriftFieldIdResolver plusOne = resolverFactory.createResolver(new Properties());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use more descriptive name than plusOne, like resolver.

{
ThriftFieldIdResolver plusOne = resolverFactory.createResolver(new Properties());

assertEquals(plusOne.getThriftId(0), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these values found when using a resolverFactory initialized with nothing in it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the default non-informative resolver just do hiveId + 1 as result

assertEquals(plusOne.getThriftId(0), 1);
assertEquals(plusOne.getThriftId(10), 11);
assertEquals(plusOne.getThriftId(5), 6);
assertEquals(plusOne.getNestedResolver(2), plusOne);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does getNestedResolver return the same resolver?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the default non-informative resolver should return itself since it's nested resolver must be non-informative resolver

@@ -78,6 +80,9 @@
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveUtil.checkCondition;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.HiveUtil.getLzopIndexPath;
import static com.facebook.presto.hive.HiveUtil.isLzopCompressedFile;
import static com.facebook.presto.hive.HiveUtil.isLzopIndexFile;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it easy to switch to airlift lzo decompressor now? If not, can you add a note to do that in the future?
I don't know the technical details, but from website description the airlift decompressor is suppose to be faster.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's potentially an optimization we can do. But unfortunately, if we want to switch to airlift decompressor, we have to either write the airlift's codec class naming in the LZO thrift data file or pull the logic of decompressing lzo from EB, especially this line: https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L84

@@ -535,13 +543,21 @@ private void addToHiveSplitSourceRoundRobin(List<Iterator<HiveSplit>> iteratorLi
Map<Integer, HiveType> columnCoercions)
throws IOException
{
Path filePath = new Path(path);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code added in this file is to handle lzo,lzop decompression, right? Would switching to airlift lib help get rid of our additions here? Or is it that our lzop files might not all be indexed, so we need to handle it here?

@@ -91,6 +93,11 @@
TEXTFILE(LazySimpleSerDe.class.getName(),
TextInputFormat.class.getName(),
HiveIgnoreKeyTextOutputFormat.class.getName(),
new DataSize(8, Unit.MEGABYTE)),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how did you estimate this writer size?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's based on HiveIgnoreKeyTextOutputFormat which simply write binary on my understanding. We do not use the write functionality but the unit test requires a valid StorageFormat for the test file format.

this.root = root;
}

public short getThriftId(int hiveIndex)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments about how thrift ids could be discontinuous and hive ids are continuous to help us follow the logic of index mapping?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

recordReader.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just
throw new RuntimeException(e);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@Yaliang
Copy link
Collaborator Author

Yaliang commented May 4, 2017

@dabaitu Updated. Will you take a look again?

Copy link

@dabaitu dabaitu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 ^1000 for coding this.

@@ -535,13 +543,21 @@ private void addToHiveSplitSourceRoundRobin(List<Iterator<HiveSplit>> iteratorLi
Map<Integer, HiveType> columnCoercions)
throws IOException
{
Path filePath = new Path(path);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index being present vs absent is only with Thrift files?

this.root = root;
}

public short getThriftId(int hiveIndex)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

@Yaliang Yaliang merged commit 71b47c6 into twitter-forks:twitter-master May 4, 2017
@Yaliang Yaliang deleted the yaliangw/addThriftInHive branch May 4, 2017 22:54
luohao pushed a commit that referenced this pull request May 1, 2019
Port original PR(#75) to prestosql code base:

* 3c9b124
* 9a65316
* 7bb2994
* 489d3d2
* d0d2ae3
* d526964
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants