Skip to content

Commit

Permalink
[SPARK-30964][CORE][WEBUI] Accelerate InMemoryStore with a new index
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Spark uses the class `InMemoryStore` as the KV storage for live UI and history server(by default if no LevelDB file path is provided).
In `InMemoryStore`, all the task data in one application is stored in a hashmap, which key is the task ID and the value is the task data. This fine for getting or deleting with a provided task ID.
However, Spark stage UI always shows all the task data in one stage and the current implementation is to look up all the values in the hashmap. The time complexity is O(numOfTasks).
Also, when there are too many stages (>spark.ui.retainedStages), Spark will linearly try to look up all the task data of the stages to be deleted as well.

This can be very bad for a large application with many stages and tasks. We can improve it by allowing the natural key of an entity to have a real parent index. So that on each lookup with parent node provided, Spark can look up all the natural keys(in our case, the task IDs) first, and then find the data with the natural keys in the hashmap.

### Why are the changes needed?

The in-memory KV store becomes really slow for large applications. We can improve it with a new index. The performance can be 10 times, 100 times, even 1000 times faster.
This is also possible to make the Spark driver more stable for large applications.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing unit tests.
Also, I run a benchmark with the following code
```
  val store = new InMemoryStore()
  val numberOfTasksPerStage = 10000
   (0 until 1000).map { sId =>
     (0 until numberOfTasksPerStage).map { taskId =>
       val task = newTaskData(sId * numberOfTasksPerStage + taskId, "SUCCESS", sId)
       store.write(task)
     }
   }
  val appStatusStore = new AppStatusStore(store)
  var start = System.nanoTime()
  appStatusStore.taskSummary(2, attemptId, Array(0, 0.25, 0.5, 0.75, 1))
  println("task summary run time: " + ((System.nanoTime() - start) / 1000000))
  val stageIds = Seq(1, 11, 66, 88)
  val stageKeys = stageIds.map(Array(_, attemptId))
  start = System.nanoTime()
  store.removeAllByIndexValues(classOf[TaskDataWrapper], TaskIndexNames.STAGE,
    stageKeys.asJavaCollection)
   println("clean up tasks run time: " + ((System.nanoTime() - start) / 1000000))
```

Task summary before the changes: 98642ms
Task summary after the changes: 120ms

Task clean up before the changes:  4900ms
Task clean up before the changes: 4ms

It's 800x faster after the changes in the micro-benchmark.

Closes apache#27716 from gengliangwang/liveUIStore.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gengliangwang authored and Seongjin Cho committed Apr 14, 2020
1 parent a78a8d3 commit 08b7c10
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ public void clear() {
}
}

/**
* An alias class for the type "ConcurrentHashMap<Comparable<Object>, Boolean>", which is used
* as a concurrent hashset for storing natural keys and the boolean value doesn't matter.
*/
private static class NaturalKeys extends ConcurrentHashMap<Comparable<Object>, Boolean> {}

private static class InstanceList<T> {

/**
Expand Down Expand Up @@ -205,23 +211,50 @@ public void accept(Comparable<Object> key, T value) {
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor naturalKey;
private final ConcurrentMap<Comparable<Object>, T> data;
private final String naturalParentIndexName;
private final Boolean hasNaturalParentIndex;
// A mapping from parent to the natural keys of its children.
// For example, a mapping from a stage ID to all the task IDs in the stage.
private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;

private InstanceList(Class<?> klass) {
this.ti = new KVTypeInfo(klass);
this.naturalKey = ti.getAccessor(KVIndex.NATURAL_INDEX_NAME);
this.data = new ConcurrentHashMap<>();
this.naturalParentIndexName = ti.getParentIndexName(KVIndex.NATURAL_INDEX_NAME);
this.parentToChildrenMap = new ConcurrentHashMap<>();
this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
}

KVTypeInfo.Accessor getIndexAccessor(String indexName) {
return ti.getAccessor(indexName);
}

int countingRemoveAllByIndexValues(String index, Collection<?> indexValues) {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);
if (hasNaturalParentIndex && naturalParentIndexName.equals(index)) {
// If there is a parent index for the natural index and `index` happens to be it,
// Spark can use the `parentToChildrenMap` to get the related natural keys, and then
// delete them from `data`.
int count = 0;
for (Object indexValue : indexValues) {
Comparable<Object> parentKey = asKey(indexValue);
NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys());
for (Comparable<Object> naturalKey : children.keySet()) {
data.remove(naturalKey);
count ++;
}
parentToChildrenMap.remove(parentKey);
}
return count;
} else {
Predicate<? super T> filter = getPredicate(ti.getAccessor(index), indexValues);
CountingRemoveIfForEach<T> callback = new CountingRemoveIfForEach<>(data, filter);

data.forEach(callback);
return callback.count();
// Go through all the values in `data` and delete objects that meets the predicate `filter`.
// This can be slow when there is a large number of entries in `data`.
data.forEach(callback);
return callback.count();
}
}

public T get(Object key) {
Expand All @@ -230,18 +263,35 @@ public T get(Object key) {

public void put(T value) throws Exception {
data.put(asKey(naturalKey.get(value)), value);
if (hasNaturalParentIndex) {
Comparable<Object> parentKey = asKey(getIndexAccessor(naturalParentIndexName).get(value));
NaturalKeys children =
parentToChildrenMap.computeIfAbsent(parentKey, k -> new NaturalKeys());
children.put(asKey(naturalKey.get(value)), true);
}
}

public void delete(Object key) {
data.remove(asKey(key));
if (hasNaturalParentIndex) {
for (NaturalKeys v : parentToChildrenMap.values()) {
if (v.remove(asKey(key))) {
// `v` can be empty after removing the natural key and we can remove it from
// `parentToChildrenMap`. However, `parentToChildrenMap` is a ConcurrentMap and such
// checking and deleting can be slow.
// This method is to delete one object with certain key, let's make it simple here.
break;
}
}
}
}

public int size() {
return data.size();
}

public InMemoryView<T> view() {
return new InMemoryView<>(data.values(), ti);
return new InMemoryView<>(data, ti, naturalParentIndexName, parentToChildrenMap);
}

private static <T> Predicate<? super T> getPredicate(
Expand Down Expand Up @@ -271,22 +321,32 @@ private static Object indexValueForEntity(KVTypeInfo.Accessor getter, Object ent

private static class InMemoryView<T> extends KVStoreView<T> {
private static final InMemoryView<?> EMPTY_VIEW =
new InMemoryView<>(Collections.emptyList(), null);
new InMemoryView<>(new ConcurrentHashMap<>(), null, "", new ConcurrentHashMap<>());

private final Collection<T> elements;
private final ConcurrentMap<Comparable<Object>, T> data;
private final KVTypeInfo ti;
private final KVTypeInfo.Accessor natural;

InMemoryView(Collection<T> elements, KVTypeInfo ti) {
this.elements = elements;
private final ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap;
private final String naturalParentIndexName;
private final Boolean hasNaturalParentIndex;

InMemoryView(
ConcurrentMap<Comparable<Object>, T> data,
KVTypeInfo ti,
String naturalParentIndexName,
ConcurrentMap<Comparable<Object>, NaturalKeys> parentToChildrenMap) {
this.data = data;
this.ti = ti;
this.natural = ti != null ? ti.getAccessor(KVIndex.NATURAL_INDEX_NAME) : null;
this.naturalParentIndexName = naturalParentIndexName;
this.parentToChildrenMap = parentToChildrenMap;
this.hasNaturalParentIndex = !naturalParentIndexName.isEmpty();
}

@Override
public Iterator<T> iterator() {
if (elements.isEmpty()) {
return new InMemoryIterator<>(elements.iterator());
if (data.isEmpty()) {
return new InMemoryIterator<>(Collections.emptyIterator());
}

KVTypeInfo.Accessor getter = index != null ? ti.getAccessor(index) : null;
Expand Down Expand Up @@ -322,15 +382,31 @@ public Iterator<T> iterator() {
*/
private List<T> copyElements() {
if (parent != null) {
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
Comparable<?> parentKey = asKey(parent);

return elements.stream()
.filter(e -> compare(e, parentGetter, parentKey) == 0)
.collect(Collectors.toList());
Comparable<Object> parentKey = asKey(parent);
if (hasNaturalParentIndex && naturalParentIndexName.equals(ti.getParentIndexName(index))) {
// If there is a parent index for the natural index and the parent of `index` happens to
// be it, Spark can use the `parentToChildrenMap` to get the related natural keys, and
// then copy them from `data`.
NaturalKeys children = parentToChildrenMap.getOrDefault(parentKey, new NaturalKeys());
ArrayList<T> elements = new ArrayList<>();
for (Comparable<Object> naturalKey : children.keySet()) {
data.computeIfPresent(naturalKey, (k, v) -> {
elements.add(v);
return v;
});
}
return elements;
} else {
// Go through all the values in `data` and collect all the objects has certain parent
// value. This can be slow when there is a large number of entries in `data`.
KVTypeInfo.Accessor parentGetter = ti.getParentAccessor(index);
Preconditions.checkArgument(parentGetter != null, "Parent filter for non-child index.");
return data.values().stream()
.filter(e -> compare(e, parentGetter, parentKey) == 0)
.collect(Collectors.toList());
}
} else {
return new ArrayList<>(elements);
return new ArrayList<>(data.values());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public KVTypeInfo(Class<?> type) {

Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
"No natural index defined for type %s.", type.getName());
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
"Natural index of %s cannot have a parent.", type.getName());

for (KVIndex idx : indices.values()) {
if (!idx.parent().isEmpty()) {
Expand Down Expand Up @@ -117,6 +115,11 @@ Accessor getParentAccessor(String indexName) {
return index.parent().isEmpty() ? null : getAccessor(index.parent());
}

String getParentIndexName(String indexName) {
KVIndex index = indices.get(indexName);
return index.parent();
}

/**
* Abstracts the difference between invoking a Field and a Method.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ class LevelDBTypeInfo {

// First create the parent indices, then the child indices.
ti.indices().forEach(idx -> {
if (idx.parent().isEmpty()) {
// In LevelDB, there is no parent index for the NUTURAL INDEX.
if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
}
});
ti.indices().forEach(idx -> {
if (!idx.parent().isEmpty()) {
if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
indices.get(idx.parent())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] object TaskIndexNames {
private[spark] class TaskDataWrapper(
// Storing this as an object actually saves memory; it's also used as the key in the in-memory
// store, so in that case you'd save the extra copy of the value here.
@KVIndexParam
@KVIndexParam(parent = TaskIndexNames.STAGE)
val taskId: JLong,
@KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = TaskIndexNames.STAGE)
val index: Int,
Expand Down

0 comments on commit 08b7c10

Please sign in to comment.