New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better HDFS Support #1556

Merged
merged 21 commits into from Jul 5, 2016

Conversation

Projects
None yet
3 participants
@echeipesh
Contributor

echeipesh commented Jun 21, 2016

This PR improves HDFS layer writing and random value reading support.

HadoopValuesReader now opens up MapFile.Readers for each map file in the layer. These readers cache the index of available keys so they are able to provide quick lookups. This replaces and improves previous method of using FileInputFormat to query for a single record.

HadoopRDDWriter has multiple improvements:

  • Accomplishes it's task with a single shuffle step
    • This is possible because instead of estimating the blocks required by counting we simply roll over to a new file when the record being written is about to surpass the block boundary.
    • Instead of using a groupBy we sort our records on shuffle which uses IO index to partition the records
  • Writes happen through mapping the partition iterator, this is possible because the partition comes pre-sorted and greatly reduces memory pressure as records can be garbage collected after they are written.

Current testing shows:

  • HDFS ingest speeds exceed those seen in Accumulo
  • tile fetching latency is visually acceptable
  • that jobs using the GroupedCumulativeIterator approach are able to complete in settings where .groupBy/sort/write jobs are killed for memory violations.

Future improvements that on the mind:

  • Save bloom filter index for layer MapFiles to reduce memory requirements for HadoopValueReader and to produce quicker lookup in most cases.
  • In HadoopValueReader when fetching a record, cache all the tiles that share that index before filtering down to a single tile. These records are very likely to be asked for next and should be stored in LRU cache.
  • Devise a method to compare KeyIndex instances such that if the RDD to be saved is already partitioned by compatible index (where all key boundaries are shared) to avoid the extra shuffle.
@@ -43,7 +43,7 @@ object HadoopRDDWriter extends LazyLogging {
MapFile.Writer.keyClass(classOf[LongWritable]),
MapFile.Writer.valueClass(classOf[BytesWritable]),
MapFile.Writer.compression(SequenceFile.CompressionType.NONE))
writer.setIndexInterval(1)
writer.setIndexInterval(32)

This comment has been minimized.

@pomadchin

pomadchin Jun 24, 2016

Member

how it would effect query time? (just curious)

This comment has been minimized.

@echeipesh

echeipesh Jun 24, 2016

Contributor

Layer query time would not be effected at all by this. When we're reading off ranges we're already seeking through the file, so not having as many points would have minimal impact if any. This is going to have more of an impact on random access through value reader. Spinning up a cluster to figure that part out. For reference the default interval value is 128

import org.apache.spark.SparkContext
import spray.json._
import spray.json.DefaultJsonProtocol._
import scala.collection.immutable._
import scala.reflect.ClassTag
class HadoopValueReader(val attributeStore: HadoopAttributeStore)

This comment has been minimized.

@lossyrob

lossyrob Jun 24, 2016

Member

We need to remove spark context

@echeipesh echeipesh changed the title from [WIP] Better HDFS Support to Better HDFS Support Jul 5, 2016

/**
* When record being written would exceed the block size of the current MapFile
* opens a new file to continue writing. This allows to split partition into block-sized
* chunks without foreknowledge of how bit it is.

This comment has been minimized.

@lossyrob

lossyrob Jul 5, 2016

Member

bit => big

rdd: RDD[(K, V)],
path: Path,
keyIndex: KeyIndex[K],
tileSize: Int = 256*256*8,
compressionFactor: Double = 1.3

This comment has been minimized.

@lossyrob

lossyrob Jul 5, 2016

Member

👏

writer.close()
// TODO: collect statistics on written records and return those
Iterator.empty
}.count

This comment has been minimized.

@lossyrob
val keyBounds = KeyBounds[K](key, key)
val filterDefinition = keyIndex.indexRanges(keyBounds).toArray
inputConf.setSerialized(FilterMapFileInputFormat.FILTER_INFO_KEY, filterDefinition)
// Map from last key in each reader to that reader

This comment has been minimized.

@lossyrob

lossyrob Jul 5, 2016

Member

Feel like this doc string is talking about an older version

val valueWritable: BytesWritable =
ranges
.find{ row =>
index >= row._2 && index <= row._3

This comment has been minimized.

@lossyrob

lossyrob Jul 5, 2016

Member

Should this be index < row._3?

@lossyrob

This comment has been minimized.

Member

lossyrob commented Jul 5, 2016

+1 after comments addressed

@echeipesh echeipesh merged commit 41ba42d into locationtech:master Jul 5, 2016

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details

@lossyrob lossyrob added this to the 1.0 milestone Oct 18, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment