Skip to content

Commit

Permalink
[CORE] Execution runtime / native memory manager refactor (apache#6243)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jul 1, 2024
1 parent 231f00c commit ae2d125
Show file tree
Hide file tree
Showing 84 changed files with 1,304 additions and 1,547 deletions.
27 changes: 19 additions & 8 deletions .github/workflows/velox_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ jobs:
- name: TPC-DS SF30.0 Parquet local spark3.2 Q67/Q95 low memory, memory isolation off
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q67,q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
Expand All @@ -304,7 +304,7 @@ jobs:
- name: TPC-DS SF30.0 Parquet local spark3.2 Q67 low memory, memory isolation on
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q67 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
Expand All @@ -315,7 +315,7 @@ jobs:
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q95 low memory, memory isolation on
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q95 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
Expand All @@ -326,10 +326,21 @@ jobs:
- name: TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:ABANDONED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
-d=FLUSH_MODE:FLUSHED,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=0.05,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=0.1,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q23A/Q23B low memory, memory isolation on
if: false # Disabled as error https://gist.github.com/zhztheplayer/abd5e83ccdc48730678ae7ebae479fcc
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q23a,q23b -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:ON,spark.gluten.memory.isolation=true,spark.memory.storageFraction=0.1 \
-d=OFFHEAP_SIZE:2g,spark.memory.offHeap.size=2g \
-d=FLUSH_MODE:DISABLED,spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false,spark.gluten.sql.columnar.backend.velox.maxPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.maxExtendedPartialAggregationMemoryRatio=1.0,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=100,spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=0 \
Expand All @@ -338,7 +349,7 @@ jobs:
- name: (To be fixed) TPC-DS SF30.0 Parquet local spark3.2 Q97 low memory # The case currently causes crash with "free: invalid size".
run: |
cd tools/gluten-it \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh parameterized \
&& GLUTEN_IT_JVM_ARGS=-Xmx3G sbin/gluten-it.sh parameterized \
--local --preset=velox --benchmark-type=ds --error-on-memleak --queries=q97 -s=30.0 --threads=12 --shuffle-partitions=72 --iterations=1 \
--skip-data-gen -m=OffHeapExecutionMemory \
-d=ISOLATION:OFF,spark.gluten.memory.isolation=false \
Expand Down Expand Up @@ -455,7 +466,7 @@ jobs:
strategy:
fail-fast: false
matrix:
spark: ["spark-3.2"]
spark: [ "spark-3.2" ]
runs-on: ubuntu-20.04
container: centos:8
steps:
Expand Down Expand Up @@ -520,8 +531,8 @@ jobs:
strategy:
fail-fast: false
matrix:
spark: ["spark-3.2"]
celeborn: ["celeborn-0.4.1", "celeborn-0.3.2-incubating"]
spark: [ "spark-3.2" ]
celeborn: [ "celeborn-0.4.1", "celeborn-0.3.2-incubating" ]
runs-on: ubuntu-20.04
container: ubuntu:22.04
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
import org.apache.gluten.memory.memtarget.MemoryTargets;
import org.apache.gluten.memory.memtarget.Spiller;
import org.apache.gluten.memory.memtarget.Spillers;

import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.TaskResources;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* Built-in toolkit for managing native memory allocations. To use the facility, one should import
Expand All @@ -46,12 +45,12 @@ private CHNativeMemoryAllocators() {}
private static CHNativeMemoryAllocatorManager createNativeMemoryAllocatorManager(
String name,
TaskMemoryManager taskMemoryManager,
List<Spiller> spillers,
Spiller spiller,
SimpleMemoryUsageRecorder usage) {

CHManagedCHReservationListener rl =
new CHManagedCHReservationListener(
MemoryTargets.newConsumer(taskMemoryManager, name, spillers, Collections.emptyMap()),
MemoryTargets.newConsumer(taskMemoryManager, name, spiller, Collections.emptyMap()),
usage);
return new CHNativeMemoryAllocatorManagerImpl(CHNativeMemoryAllocator.createListenable(rl));
}
Expand All @@ -67,7 +66,7 @@ public static CHNativeMemoryAllocator contextInstance() {
createNativeMemoryAllocatorManager(
"ContextInstance",
TaskResources.getLocalTaskContext().taskMemoryManager(),
Collections.emptyList(),
Spillers.NOOP,
TaskResources.getSharedUsage());
TaskResources.addResource(id, manager);
}
Expand All @@ -78,7 +77,7 @@ public static CHNativeMemoryAllocator contextInstanceForUT() {
return CHNativeMemoryAllocator.getDefaultForUT();
}

public static CHNativeMemoryAllocator createSpillable(String name, Spiller... spillers) {
public static CHNativeMemoryAllocator createSpillable(String name, Spiller spiller) {
if (!TaskResources.inSparkTask()) {
throw new IllegalStateException("spiller must be used in a Spark task");
}
Expand All @@ -87,7 +86,7 @@ public static CHNativeMemoryAllocator createSpillable(String name, Spiller... sp
createNativeMemoryAllocatorManager(
name,
TaskResources.getLocalTaskContext().taskMemoryManager(),
Arrays.asList(spillers),
spiller,
TaskResources.getSharedUsage());
TaskResources.addAnonymousResource(manager);
// force add memory consumer to task memory manager, will release by inactivate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SparkDirectoryUtil, Utils}

import java.io.IOException
import java.util
import java.util.{Locale, UUID}

class CHColumnarShuffleWriter[K, V](
Expand Down Expand Up @@ -122,7 +121,10 @@ class CHColumnarShuffleWriter[K, V](
CHNativeMemoryAllocators.createSpillable(
"ShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, size: Long): Long = {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L;
}
if (nativeSplitter == 0) {
throw new IllegalStateException(
"Fatal: spill() called before a shuffle writer " +
Expand All @@ -134,8 +136,6 @@ class CHColumnarShuffleWriter[K, V](
logError(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}

override def applicablePhases(): util.Set[Spiller.Phase] = Spillers.PHASE_SET_SPILL_ONLY
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocator;
import org.apache.gluten.memory.alloc.CHNativeMemoryAllocatorManagerImpl;
import org.apache.gluten.memory.memtarget.MemoryTargets;
import org.apache.gluten.memory.memtarget.Spillers;

import org.apache.spark.SparkConf;
import org.apache.spark.internal.config.package$;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void initMemoryManager() {
listener =
new CHManagedCHReservationListener(
MemoryTargets.newConsumer(
taskMemoryManager, "test", Collections.emptyList(), Collections.emptyMap()),
taskMemoryManager, "test", Spillers.NOOP, Collections.emptyMap()),
new SimpleMemoryUsageRecorder());

manager = new CHNativeMemoryAllocatorManagerImpl(new CHNativeMemoryAllocator(-1L, listener));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.Runtimes;
import org.apache.gluten.memory.nmm.NativeMemoryManager;
import org.apache.gluten.memory.nmm.NativeMemoryManagers;
import org.apache.gluten.vectorized.ColumnarBatchInIterator;
import org.apache.gluten.vectorized.ColumnarBatchOutIterator;

Expand All @@ -30,12 +28,10 @@
public final class VeloxBatchAppender {
public static ColumnarBatchOutIterator create(
int minOutputBatchSize, Iterator<ColumnarBatch> in) {
final Runtime runtime = Runtimes.contextInstance();
final NativeMemoryManager nmm = NativeMemoryManagers.contextInstance("VeloxBatchAppender");
final Runtime runtime = Runtimes.contextInstance("VeloxBatchAppender");
long outHandle =
VeloxBatchAppenderJniWrapper.forRuntime(runtime)
.create(
nmm.getNativeInstanceHandle(), minOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle, nmm);
VeloxBatchAppenderJniWrapper.create(runtime)
.create(minOutputBatchSize, new ColumnarBatchInIterator(in));
return new ColumnarBatchOutIterator(runtime, outHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private VeloxBatchAppenderJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxBatchAppenderJniWrapper forRuntime(Runtime runtime) {
public static VeloxBatchAppenderJniWrapper create(Runtime runtime) {
return new VeloxBatchAppenderJniWrapper(runtime);
}

Expand All @@ -36,6 +36,5 @@ public long handle() {
return runtime.getHandle();
}

public native long create(
long memoryManagerHandle, int minOutputBatchSize, ColumnarBatchInIterator itr);
public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.utils;

import org.apache.gluten.exec.Runtimes;

import org.apache.commons.io.IOUtils;
import org.apache.spark.util.sketch.BloomFilter;
import org.apache.spark.util.sketch.IncompatibleMergeException;
Expand All @@ -27,17 +29,15 @@
import java.io.OutputStream;

public class VeloxBloomFilter extends BloomFilter {

private final VeloxBloomFilterJniWrapper jni;
private final VeloxBloomFilterJniWrapper jni =
VeloxBloomFilterJniWrapper.create(Runtimes.contextInstance("VeloxBloomFilter"));
private final long handle;

private VeloxBloomFilter(byte[] data) {
jni = VeloxBloomFilterJniWrapper.create();
handle = jni.init(data);
}

private VeloxBloomFilter(int capacity) {
jni = VeloxBloomFilterJniWrapper.create();
handle = jni.empty(capacity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.gluten.exec.Runtime;
import org.apache.gluten.exec.RuntimeAware;
import org.apache.gluten.exec.Runtimes;

public class VeloxBloomFilterJniWrapper implements RuntimeAware {
private final Runtime runtime;
Expand All @@ -27,8 +26,8 @@ private VeloxBloomFilterJniWrapper(Runtime runtime) {
this.runtime = runtime;
}

public static VeloxBloomFilterJniWrapper create() {
return new VeloxBloomFilterJniWrapper(Runtimes.contextInstance());
public static VeloxBloomFilterJniWrapper create(Runtime runtime) {
return new VeloxBloomFilterJniWrapper(runtime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
GenericExpressionTransformer(condFuncName, Seq(left), condExpr),
right,
left,
newExpr
)
newExpr)
}

/** Transform Uuid to Substrait. */
Expand Down Expand Up @@ -488,8 +487,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
left,
right,
isSkewJoin,
projectList
)
projectList)
}
override def genCartesianProductExecTransformer(
left: SparkPlan,
Expand All @@ -498,8 +496,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
CartesianProductExecTransformer(
ColumnarCartesianProductBridge(left),
ColumnarCartesianProductBridge(right),
condition
)
condition)
}

override def genBroadcastNestedLoopJoinExecTransformer(
Expand All @@ -508,13 +505,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]): BroadcastNestedLoopJoinExecTransformer =
VeloxBroadcastNestedLoopJoinExecTransformer(
left,
right,
buildSide,
joinType,
condition
)
VeloxBroadcastNestedLoopJoinExecTransformer(left, right, buildSide, joinType, condition)

override def genHashExpressionTransformer(
substraitExprName: String,
Expand Down Expand Up @@ -795,10 +786,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
*
* @return
*/
override def genExtendedOptimizers(): List[SparkSession => Rule[LogicalPlan]] = List(
CollectRewriteRule.apply,
HLLRewriteRule.apply
)
override def genExtendedOptimizers(): List[SparkSession => Rule[LogicalPlan]] =
List(CollectRewriteRule.apply, HLLRewriteRule.apply)

/**
* Generate extended columnar pre-rules, in the validation phase.
Expand Down Expand Up @@ -879,8 +868,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
requiredChildOutput: Seq[Attribute],
outer: Boolean,
generatorOutput: Seq[Attribute],
child: SparkPlan
): GenerateExecTransformerBase = {
child: SparkPlan): GenerateExecTransformerBase = {
GenerateExecTransformer(generator, requiredChildOutput, outer, generatorOutput, child)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.gluten.backendsapi.velox

import org.apache.gluten.backendsapi.TransformerApi
import org.apache.gluten.exec.Runtimes
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode}
import org.apache.gluten.utils.InputPartitionsUtil
Expand Down Expand Up @@ -83,7 +84,8 @@ class VeloxTransformerApi extends TransformerApi with Logging {

override def getNativePlanString(substraitPlan: Array[Byte], details: Boolean): String = {
TaskResources.runUnsafe {
val jniWrapper = PlanEvaluatorJniWrapper.create()
val jniWrapper = PlanEvaluatorJniWrapper.create(
Runtimes.contextInstance("VeloxTransformerApi#getNativePlanString"))
jniWrapper.nativePlanString(substraitPlan, details)
}
}
Expand Down
Loading

0 comments on commit ae2d125

Please sign in to comment.