Skip to content
This repository has been archived by the owner on Oct 30, 2020. It is now read-only.

Fix camus-hourly mapper skew and improve mapper logging #209

Merged
merged 2 commits into from
Apr 21, 2015

Conversation

zliu41
Copy link
Contributor

@zliu41 zliu41 commented Apr 16, 2015

There were 4 issues in Camus that could cause mapper skew:

  • The biggest one is a bug in calculating avgRecordSize. Camus uses long, and calculate the running total size by currentAvgSize * currentCount. This is simply wrong. For example, suppose we currently pulled 10000 records, avgSize = 100, so total size = 1million. Then, even if we pull an infinite number of records with size=200 each, the avgRecordSize will still be 100. And if we pull a single record with size < 100, then avgRecordSize will become 99. So, the avgRecordSize will keep decreasing.

The solution is to modify the way the running total size is computed. See EtlMultiOutputCommitter.addOffset().

  • There's another bug: say a mapper is responsible for 10 partitions, but it timed out while processing the 5th partition. Then, for the 6th to 10th partitions, the avgMsgSize committed by Camus will be the same as the size of the last record in the 5th partition read by Camus.
    The bug is caused by: if Camus times out before reading a partition, it will emit a (EtlKey, ExceptionWritable) pair. The getMessageSize() of the ExceptionWritable object will return the size of the last read record. This pair is used to calculate the avgMsgSize of this partition.

The solution is to force each mapper to pull at least 5 records in each partition assigned to it. See EtlRecordReader.nextKeyValue() line 204.

  • Camus may pull beyond latestOffset of a partition, if during the pull, new events are added to the partition.

The solution is to only pull to lastestOffest. See KafkaReader.hasNext().

  • If events in a partition cannot be decoded, Camus will keep trying to decode the next event, until all events have been consumed. Camus will ignore the 10-min timeout in this case.

The solution is to check for timeout. See EtlRecordReader.nextKeyValue() line 302.

Tested on Nertz, skew has largely disappeared. The remaining occasional skew is due to a single large partition, which has to be assigned to a single mapper.

Also improved mapper logging: remove the log msg "Received message with null message.key()", which dominated the mapper log and caused other important msg to be truncated. Added the time spent on each partition, the total bytes read and the avg record size in mapper log. See EtlRecordReader.nextKeyValue().

@chavdar
Copy link
Contributor

chavdar commented Apr 16, 2015

I don't understand the read-at-least-5-records change. Why 5 records and not 6 or 16? Can't we just reuse the average size of the last partition (average sizes between partitions should not differe too much) or reuse the average size from the previous run?

Everthing else, LGTM.

@zliu41
Copy link
Contributor Author

zliu41 commented Apr 16, 2015

@chavdar Reading 5 records is because we don't want to spend too much time after timeout, and I think 5 records is usually sufficient to compute a rough avgSize. I can make it configurable.

Reusing avgSize from previous run will require more code change, since avgSize from previous run is retrieved in EtlInputFormat.getSplits(), and we have to somehow pass those numbers to all mappers so that the mappers can commit them. If I understand correctly, mappers don't have access to variables defined in InputFormat.

@chavdar
Copy link
Contributor

chavdar commented Apr 17, 2015

@zliu41 I was a bit facetious. Any fixed number (even configurable) seems random to me. Depending on broker or mapper performance, message sizes, etc., even 5 messages * N partitions may take some time. 5 messages may or may not give an accurate estimate of the average size. Isn't it possible for the mapper to return -1 and then the job to get the historic data?

If that's too much work, then you can go ahead and commit since these are important fixes. I also want to make sure that the Gobblin adapter does not use such arbitrary numbers. :)

@zliu41
Copy link
Contributor Author

zliu41 commented Apr 21, 2015

@chavdar I'm going to merge it for now since it seems to involve more work than I currently have time to do, and the estimated avgSize doesn't need to be very accurate. I'll monitor the Camus-hourly job and see how it actually works, i.e., whether it takes much longer than the 10-min limit, and whether there are still any mapper skew due to avgSize being inaccurate.

zliu41 added a commit that referenced this pull request Apr 21, 2015
Fix camus-hourly mapper skew and improve mapper logging
@zliu41 zliu41 merged commit 2d23f37 into LinkedInAttic:master Apr 21, 2015
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants