-
Notifications
You must be signed in to change notification settings - Fork 185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an offheap indexmap implemenation using LinkedIn PalDB as an option. Resolving Issue #4 #17
Conversation
* | ||
* @author yizhou | ||
*/ | ||
trait IndexMap extends Map[String, Int] with java.io.Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why do we extends this trait with java.io.Serializable here? If this class extends scala.collection.immutable.Map already, doesn't it become Scala serializable automatically?
- Nitpick: it might be helpful to extend this trait with scala.collection.Map (via import scala.collection.Map) rather than scala.collection.immutable.Map (type Map[A, +B] = scala.collection.immutable.Map[A, B]), because the former is a more generic one, and the "toMap" function in Scala converts the underlying collection into an "scala.collection.Map", but not "scala.collection.immutable.Map".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Xianxing, those two are very good points. You are right, extends with java.io.Serializable is no longer necessary. And I'll make it extends from scala.collection.Map instead of scala.collection.mutable.Map. I'll modify this interface accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@XianXing , I'd suggest we still stick with scala.collection.immutable.Map for now.
I'll get the following issue with the other map:
/
home/yizhou/Dev/opensource/new-github/photon-ml/photon-ml/src/main/scala/com/linkedin/photon/ml/Driver.scala:210: type mismatch;
found : com.linkedin.photon.ml.util.IndexMap
required: Map[String,Int]
new SystemReport(suite.featureKeyToIdMap, params, summaryOption),
^
/home/yizhou/Dev/opensource/new-github/photon-ml/photon-ml/src/main/scala/com/linkedin/photon/ml/Driver.scala:345: type mismatch;
found : com.linkedin.photon.ml.util.IndexMap
required: Map[String,Int]
val varImportanceDiagnostic = new VarianceFeatureImportanceDiagnostic(suite.featureKeyToIdMap)
^
/home/yizhou/Dev/opensource/new-github/photon-ml/photon-ml/src/main/scala/com/linkedin/photon/ml/Driver.scala:346: type mismatch;
found : com.linkedin.photon.ml.util.IndexMap
required: Map[String,Int]
val meanImportanceDiagnostic = new ExpectedMagnitudeFeatureImportanceDiagnostic(suite.featureKeyToIdMap)
^
/home/yizhou/Dev/opensource/new-github/photon-ml/photon-ml/src/main/scala/com/linkedin/photon/ml/Driver.scala:375: type mismatch;
found : com.linkedin.photon.ml.util.IndexMap
required: Map[String,Int]
Error occurred in an application involving default arguments.
val bootstrapDiagnostic = new BootstrapTrainingDiagnostic(suite.featureKeyToIdMap)
^
/home/yizhou/Dev/opensource/new-github/photon-ml/photon-ml/src/main/scala/com/linkedin/photon/ml/Driver.scala:430: type mismatch;
found : com.linkedin.photon.ml.util.IndexMap
required: Map[String,Int]
suite.featureKeyToIdMap,
^
/home/yizhou/Dev/opensource/new-github/photon-ml/photon-ml/src/main/scala/com/linkedin/photon/ml/io/GLMSuite.scala:201: non-variable type argument Map[String,Any] in type pattern List[Map[String,Any]](the underlying of List[Map[String,Any]]) is unchecked since it is eliminated by erasure
case Some(parsed: List[Map[String, Any]]) =>
^
one warning found
5 errors found
So immutable map already prevails at many places of our codes. I'd suggest we do another round of clean-up to remove immutable.Map interface in another PR.
@@ -286,6 +304,9 @@ protected[ml] class Driver( | |||
} | |||
} | |||
|
|||
/* TODO: we potentially have an excessive memory usage issue at this step, | |||
* 2M feature sponsor ads dataset with fail at here due to OOM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like sensitive information and I'd suggest modifying it to just mention the number of features at which it would fail. I don't think the specifics about which dataset it is should be disclosed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I'll remove the mention of the dataset
This RB looks good to me, although I raised many questions. Will go back and review more later after lunch. One general feeling I have after reading the code is that, some refactoring on GLMSuite in necessary in a separate PR, as it is taking much more responsibility than I ever imagined one and half years ago. |
res | ||
}.mapPartitions{iter => | ||
// Step 2. map features to (hashCode, featureName) | ||
val set = new scala.collection.mutable.HashSet[String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: a better practice might be to import scala.collection.mutable first, then use mutable.HashSet[String] directly.
Some folks might argue that we should use the builder for HashSet (HashSet.newBuilder) here for immutability, but I think either way is fine.
…with PalDBIndexMap's storeReader
} | ||
} | ||
|
||
println(s"Partition [${idx}] total record number: ${i}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use PhotonLogger / spark.logInfo() if this is important or delete it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll delete this one. Actually not correct to use PhotonLogger here since it's executor log instead of driver. I'll add one more logger at the end of job indicating how many records we have indexed.
val config = new Configuration() | ||
config.set(Configuration.CACHE_ENABLED, "true") | ||
// Allow 200MB in-memory cache in total | ||
config.set(Configuration.CACHE_BYTES, String.valueOf(209715200L / _partitionsNum)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting, how is 200MB determined? Does it relate to the number of features in any way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to features but more related to what is upper bound we'd like to allow our executor's JVM memory to spend for off-heap map's LRU cache. I'm just being conservative here saying that 200MB doesn't hurt anyway.
Acutually, even turning it off is okay. A TODO item would be that we should allow config customization in the future, but I don't think we need that unless a real use-case is happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have numbers for performance with/without the cache? Since there is almost no serialization overhead and memory mapping is on, the cache might actually not be worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to keep this, we should make it configurable, or extract it into a constant with a comment explaining the value.
Folks, please DON'T click the merge button until we have confirmed everything (including integTest) runs. It takes too long to run integTest everytime, so I'll just run them a few times when we accumulated enough changes. The last time would be right before we are merging. |
Sure, I will make sure all the tests pass before merging, and as we discussed last time offline, it should be the responsibility of the person who's going to merge the PR to check whether all tests pass. |
* An off heap index map implementation using PalDB. | ||
* | ||
* The internal implementation assumed the following things: | ||
* 1. One DB storage is partitioned into multiple pieces we call partitions. It should be generated and controlled by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation for partitioning the store? Do you only ever ship a subset of the partitions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's majorly to speed up the index building process that we could partition the full feature space into pieces according to hashcode and then build them in parallel. Not useful when feature space is small, but it speeds up the index building process when the feature space is very large.
So this implementation always assume that everytime we are loading a set of partitions into as a whole.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I wonder what kind of sizes you'd need to make this difference significant. (As a bonus though, now you aren't constrained by PalDB's 2GB index size limit.)
Nice work @timyitong! I've left some comments in-line. |
…ame and improve the query efficiency via binarySearch
@XianXing for index building, I'm reverting back to the distinct and then groupByKey approach. reduceByKey will have minor hash collision issues that we will lose a few features. (I lost 7 features in one run over a 200k feature dataset). I think for now, even building the index map against 2M features takes less than 5 minutes, so we probably don't need to optimize this part until we come to a much large scale. |
… in a few hash collisions
@XianXing @joshvfleming @namitk for the index map pull request, I've addressed most of the comments. Though a few things might be remaining to do but I'd suggest we don't further complicate it in this PR:
I think his suggestion about separating (name -> index) and (index -> name) stores to save memory mapping over heads also makes sense. But considering our current situation, we might not need to optimize this too much since 2M features store only takes around 150MB which is relatively small. Plus, currently the only use case for the reverse mapping (index -> name) are at the model writing stage, so I think a more proper improvement is that we can directly iterate over the full map to write out models instead of hitting getFeatures method, in this case we don't even need reverse feature mapping. Do these sound reasonable? Please suggest what's the next things we should address in this request. |
LGTM. Let me merge it. For the reverse index mapping, I agree that it's actually not required to write Photon model, because the Photon model can be written to HDFS on the driver locally and there is no need to broadcast the reverse feature index map to the executors. The current way of writing the model to HDFS as RDD is cumbersome. I will open an issue to discuss and address the problem. However, when we write the GAME models, such broadcasted/PalDB based reverse index map is necessary and also critical, and a separated map logic suggested by @mtth would make a lot of sense there. Considering GAME is beyond the scope of this PR, I would suggest we keep an eye on it but address this problem later. |
This is an implementation of introducing an offheap feature IndexMap and an independent FeatureIndexingJob to bypass the scalability limitation due to the previous feature index map.
See Issue #4 for some early thoughts we've discussed.
As I proceed with this improvement. I've found multiple interesting things. Will create separate issues for them.
What this PR has done:
What this PR is not doing:
Why I use PalDB?
4 optional embeddable offheap storage came to my mind:
How does it perform?
Indexing feature normally only take 5 minutes to finish for 200k. 2M features takes longer, but I also only just tried a very harsh setting (50 executors x 2g memory). And by the current implementation, it is definitely linear scalable for the indexing building job.
For the training job, the offheap map is even surprisingly a bit faster than the default map. (But that just my single data point comparision). This looks very promising to me. (suggest we do a thorough perf study later)
With offheap map, we could handle 2M features with ease. I'm not even using a very aggressive setting. (Only 10g or even 5g for driver memory, and 2g for executor memory). Though at model selection stage, I think it will fail due to other reasons.