Skip to content

Commit

Permalink
[AutoTVM][Ansor] Enable random fill and CPU cache flush for AutoTVM a…
Browse files Browse the repository at this point in the history
…nd Ansor (apache#6391)

* [AutoTVM][Ansor] Enable random fill and CPU cache flush for AutoTVM and Ansor

* Trigger CI

* add assert check of random fill function

* remove duplicate tvm.get_global_func

* Add check random_fill exists on remote devices

* solve pylint
  • Loading branch information
FrozenGene authored and kevinthesun committed Sep 17, 2020
1 parent 9399aef commit 304ed59
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 42 deletions.
10 changes: 8 additions & 2 deletions include/tvm/auto_scheduler/measure.h
Expand Up @@ -276,6 +276,8 @@ class ProgramRunnerNode : public Object {
int min_repeat_ms;
/*! \brief The cool down interval between two measurements. */
double cooldown_interval;
/*! \brief Whether to flush cache on CPU between repeated measurements. */
bool enable_cpu_cache_flush;

/*!
* \brief Run measurement and return results.
Expand Down Expand Up @@ -358,8 +360,10 @@ class LocalRunner : public ProgramRunner {
* \param repeat The number of times to repeat the measurement.
* \param min_repeat_ms The minimum duration of one repeat in milliseconds.
* \param cooldown_interval The cool down interval between two measurements.
* \param enable_cpu_cache_flush Whether to flush cache on CPU between repeated measurements.
*/
LocalRunner(int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval);
LocalRunner(int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval,
bool enable_cpu_cache_flush);

TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(LocalRunner, ProgramRunner, LocalRunnerNode);
};
Expand Down Expand Up @@ -408,9 +412,11 @@ class RPCRunner : public ProgramRunner {
* \param repeat The number of times to repeat the measurement.
* \param min_repeat_ms The minimum duration of one repeat in milliseconds.
* \param cooldown_interval The cool down interval between two measurements.
* \param enable_cpu_cache_flush Whether to flush cache on CPU between repeated measurements.
*/
RPCRunner(const String& key, const String& host, int port, int priority, int n_parallel,
int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval);
int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval,
bool enable_cpu_cache_flush);

TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(RPCRunner, ProgramRunner, RPCRunnerNode);
};
Expand Down
93 changes: 71 additions & 22 deletions python/tvm/auto_scheduler/measure.py
Expand Up @@ -202,8 +202,6 @@ def __init__(self,
class LocalRunner(ProgramRunner):
""" LocalRunner that uses local CPU/GPU to measures the time cost of programs.
TODO(FrozenGene): Add cpu cache flush to this runner.
Parameters
----------
timeout : int = 10
Expand All @@ -227,16 +225,24 @@ class LocalRunner(ProgramRunner):
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
enable_cpu_cache_flush: bool = False
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
"""

def __init__(self,
timeout=10,
number=3,
repeat=1,
min_repeat_ms=0,
cooldown_interval=0.0):
cooldown_interval=0.0,
enable_cpu_cache_flush=False):
self.__init_handle_by_constructor__(
_ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval)
_ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval,
enable_cpu_cache_flush)


@tvm._ffi.register_object("auto_scheduler.RPCRunner")
Expand All @@ -245,8 +251,6 @@ class RPCRunner(ProgramRunner):
Or sometime we may need to use RPC even in local running to insulate the thread environment.
(e.g. running CUDA programs)
TODO(FrozenGene): Add cpu cache flush to this runner.
Parameters
----------
key : str
Expand Down Expand Up @@ -280,14 +284,20 @@ class RPCRunner(ProgramRunner):
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
enable_cpu_cache_flush: bool = False
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
"""

def __init__(self, key, host, port,
priority=1, n_parallel=1, timeout=10, number=3, repeat=1,
min_repeat_ms=0, cooldown_interval=0.0):
min_repeat_ms=0, cooldown_interval=0.0, enable_cpu_cache_flush=False):
self.__init_handle_by_constructor__(
_ffi_api.RPCRunner, key, host, port, priority, n_parallel, timeout,
number, repeat, min_repeat_ms, cooldown_interval)
number, repeat, min_repeat_ms, cooldown_interval, enable_cpu_cache_flush)

if check_remote(key, host, port, priority, timeout):
print("Get devices for measurement successfully!")
Expand All @@ -302,8 +312,6 @@ class LocalRPCMeasureContext:
""" A context wrapper for running RPCRunner locally.
This will launch a local RPC Tracker and local RPC Server.
TODO(FrozenGene): Add cpu cache flush to this RPC context.
Parameters
----------
priority : int = 1
Expand Down Expand Up @@ -331,10 +339,16 @@ class LocalRPCMeasureContext:
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
enable_cpu_cache_flush: bool = False
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
"""

def __init__(self, priority=1, n_parallel=1, timeout=10, number=3, repeat=1,
min_repeat_ms=0, cooldown_interval=0.0):
min_repeat_ms=0, cooldown_interval=0.0, enable_cpu_cache_flush=False):
ctx = tvm.context("cuda", 0)
if ctx.exist:
cuda_arch = "sm_" + "".join(ctx.compute_version.split('.'))
Expand All @@ -347,7 +361,7 @@ def __init__(self, priority=1, n_parallel=1, timeout=10, number=3, repeat=1,
tracker_addr=(self.tracker.host, self.tracker.port))
self.runner = RPCRunner(device_key, host, self.tracker.port, priority,
n_parallel, timeout, number, repeat,
min_repeat_ms, cooldown_interval)
min_repeat_ms, cooldown_interval, enable_cpu_cache_flush)
# Wait for the processes to start
time.sleep(0.5)

Expand Down Expand Up @@ -507,7 +521,7 @@ def local_builder_build(inputs, timeout, n_parallel, build_func='default', verbo
@tvm._ffi.register_func("auto_scheduler.local_runner.run")
def local_run(inputs, build_results,
timeout=10, number=3, repeat=1, min_repeat_ms=0, cooldown_interval=0,
verbose=1):
enable_cpu_cache_flush=False, verbose=1):
"""
Run function of LocalRunner to test the performance of the input BuildResults.
Expand Down Expand Up @@ -538,6 +552,12 @@ def local_run(inputs, build_results,
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
enable_cpu_cache_flush: bool = False
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
verbose: int = 1
Verbosity level. 0 for silent, 1 to output information during program measuring.
Expand All @@ -555,9 +575,15 @@ def timed_func(inp, build_res):
try:
func = module.load_module(build_res.filename)
ctx = ndarray.context(str(inp.task.target), 0)
# TODO(FrozenGene): Add cpu cache flush to this function.
# Limitation:
# We can not get PackFunction directly in the remote mode as it is wrapped
# under the std::function. We could lift the restriction later once we fold
# the PackedFunc as an object. Currently, we pass function name to work
# around it.
f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else ''
time_f = func.time_evaluator(
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms,
f_preproc=f_prepare)
# pylint: disable=broad-except
except Exception:
costs = (max_float,)
Expand All @@ -566,9 +592,12 @@ def timed_func(inp, build_res):

if error_no == 0:
try:
# TODO(FrozenGene): Update to ndarray.non-empty.
args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in
build_res.args]
random_fill = tvm.get_global_func("tvm.contrib.random.random_fill", True)
assert random_fill, "Please make sure USE_RANDOM is ON in the config.cmake"
for arg in args:
random_fill(arg)
ctx.sync()
costs = time_f(*args).results
# pylint: disable=broad-except
Expand Down Expand Up @@ -626,7 +655,8 @@ def rpc_run_worker(index):
"""
global GLOBAL_RUN_ARGUMENTS
inputs, build_results, key, host, port, priority, timeout, number, \
repeat, min_repeat_ms, cooldown_interval, verbose = GLOBAL_RUN_ARGUMENTS
repeat, min_repeat_ms, cooldown_interval, enable_cpu_cache_flush, \
verbose = GLOBAL_RUN_ARGUMENTS

max_float = 1e10 # We use 1e10 instead of sys.float_info.max for better readability in log
inp = inputs[index]
Expand All @@ -646,9 +676,15 @@ def timed_func():
remote.upload(build_res.filename)
func = remote.load_module(os.path.split(build_res.filename)[1])
ctx = remote.context(str(inp.task.target), 0)
# TODO(FrozenGene): Add cpu cache flush to this function.
# Limitation:
# We can not get PackFunction directly in the remote mode as it is wrapped
# under the std::function. We could lift the restriction later once we fold
# the PackedFunc as an object. Currently, we pass function name to work
# around it.
f_prepare = 'cache_flush_cpu_non_first_arg' if enable_cpu_cache_flush else ''
time_f = func.time_evaluator(
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms,
f_preproc=f_prepare)
# pylint: disable=broad-except
except Exception:
costs = (max_float,)
Expand All @@ -657,9 +693,15 @@ def timed_func():

if error_no == 0:
try:
# TODO(FrozenGene): Update to ndarray.non-empty.
args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in
build_res.args]
try:
random_fill = remote.get_function("tvm.contrib.random.random_fill")
except AttributeError:
raise AttributeError("Please make sure USE_RANDOM is ON in the config.cmake "
"on the remote devices")
for arg in args:
random_fill(arg)
ctx.sync()

costs = time_f(*args).results
Expand Down Expand Up @@ -698,7 +740,7 @@ def timed_func():
@tvm._ffi.register_func("auto_scheduler.rpc_runner.run")
def rpc_runner_run(inputs, build_results, key, host, port,
priority=1, n_parallel=1, timeout=10, number=3, repeat=1, min_repeat_ms=0,
cooldown_interval=0.0, verbose=1):
cooldown_interval=0.0, enable_cpu_cache_flush=False, verbose=1):
""" Run function of RPCRunner to test the performance of the input BuildResults.
Parameters
Expand Down Expand Up @@ -738,6 +780,12 @@ def rpc_runner_run(inputs, build_results, key, host, port,
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
enable_cpu_cache_flush: bool = False
Whether to flush cache on CPU between repeated measurements.
Flushing cache can make the measured latency of one operator closer to
its actual latency during end-to-end inference.
To make this option effective, the argument `number` should also be set to 1.
This is only has effect on CPU task.
verbose: int = 1
Verbosity level. 0 for silent, 1 to output information during program measuring.
Expand All @@ -748,7 +796,8 @@ def rpc_runner_run(inputs, build_results, key, host, port,
"""
global GLOBAL_RUN_ARGUMENTS
GLOBAL_RUN_ARGUMENTS = (inputs, build_results, key, host, port, priority, timeout, number,
repeat, min_repeat_ms, cooldown_interval, verbose)
repeat, min_repeat_ms, cooldown_interval, enable_cpu_cache_flush,
verbose)

assert len(inputs) == len(build_results), \
"Measure input size should be equal to build results"
Expand Down
10 changes: 7 additions & 3 deletions python/tvm/autotvm/measure/measure_methods.py
Expand Up @@ -511,10 +511,14 @@ def run_through_rpc(measure_input, build_result,
if ref_input:
args = [nd.array(x, ctx=ctx) for x in ref_input]
else:
# create empty arrays on the remote device and copy them once.
# This can avoid some memory issues that make the measurement results unreliable.
try:
random_fill = remote.get_function("tvm.contrib.random.random_fill")
except AttributeError:
raise AttributeError("Please make sure USE_RANDOM is ON in the config.cmake "
"on the remote devices")
args = [nd.empty(x[0], dtype=x[1], ctx=ctx) for x in build_result.arg_info]
args = [nd.array(x, ctx=ctx) for x in args]
for arg in args:
random_fill(arg)
ctx.sync()

costs = time_f(*args).results
Expand Down
22 changes: 13 additions & 9 deletions src/auto_scheduler/measure.cc
Expand Up @@ -123,21 +123,23 @@ Array<BuildResult> LocalBuilderNode::Build(const Array<MeasureInput>& inputs, in

/********** LocalRunner **********/
LocalRunner::LocalRunner(int timeout, int number, int repeat, int min_repeat_ms,
double cooldown_interval) {
double cooldown_interval, bool enable_cpu_cache_flush) {
ObjectPtr<LocalRunnerNode> node = make_object<LocalRunnerNode>();
node->timeout = timeout;
node->number = number;
node->repeat = repeat;
node->min_repeat_ms = min_repeat_ms;
node->cooldown_interval = cooldown_interval;
node->enable_cpu_cache_flush = enable_cpu_cache_flush;
data_ = std::move(node);
}

Array<MeasureResult> LocalRunnerNode::Run(const Array<MeasureInput>& inputs,
const Array<BuildResult>& build_results, int verbose) {
if (const auto* f = runtime::Registry::Get("auto_scheduler.local_runner.run")) {
Array<MeasureResult> results = (*f)(inputs, build_results, timeout, number, repeat,
min_repeat_ms, cooldown_interval, verbose);
Array<MeasureResult> results =
(*f)(inputs, build_results, timeout, number, repeat, min_repeat_ms, cooldown_interval,
enable_cpu_cache_flush, verbose);
return results;
}
LOG(FATAL) << "auto_scheduler.local_runner.run is not registered. "
Expand All @@ -149,7 +151,7 @@ Array<MeasureResult> LocalRunnerNode::Run(const Array<MeasureInput>& inputs,
/********** RPCRunner **********/
RPCRunner::RPCRunner(const String& key, const String& host, int port, int priority, int n_parallel,
int timeout, int number, int repeat, int min_repeat_ms,
double cooldown_interval) {
double cooldown_interval, bool enable_cpu_cache_flush) {
auto node = make_object<RPCRunnerNode>();
node->key = key;
node->host = host;
Expand All @@ -161,6 +163,7 @@ RPCRunner::RPCRunner(const String& key, const String& host, int port, int priori
node->repeat = repeat;
node->min_repeat_ms = min_repeat_ms;
node->cooldown_interval = cooldown_interval;
node->enable_cpu_cache_flush = enable_cpu_cache_flush;
data_ = std::move(node);
}

Expand All @@ -169,7 +172,7 @@ Array<MeasureResult> RPCRunnerNode::Run(const Array<MeasureInput>& inputs,
if (const auto* f = runtime::Registry::Get("auto_scheduler.rpc_runner.run")) {
Array<MeasureResult> results =
(*f)(inputs, build_results, key, host, port, priority, n_parallel, timeout, number, repeat,
min_repeat_ms, cooldown_interval, verbose);
min_repeat_ms, cooldown_interval, enable_cpu_cache_flush, verbose);
return results;
} else {
LOG(FATAL) << "auto_scheduler.rpc_runner.run is not registered. "
Expand Down Expand Up @@ -356,16 +359,17 @@ TVM_REGISTER_GLOBAL("auto_scheduler.LocalBuilder")

TVM_REGISTER_GLOBAL("auto_scheduler.LocalRunner")
.set_body_typed([](int timeout, int number, int repeat, int min_repeat_ms,
double cooldown_interval) {
return LocalRunner(timeout, number, repeat, min_repeat_ms, cooldown_interval);
double cooldown_interval, bool enable_cpu_cache_flush) {
return LocalRunner(timeout, number, repeat, min_repeat_ms, cooldown_interval,
enable_cpu_cache_flush);
});

TVM_REGISTER_GLOBAL("auto_scheduler.RPCRunner")
.set_body_typed([](const String& key, const String& host, int port, int priority,
int n_parallel, int timeout, int number, int repeat, int min_repeat_ms,
double cooldown_interval) {
double cooldown_interval, bool enable_cpu_cache_flush) {
return RPCRunner(key, host, port, priority, n_parallel, timeout, number, repeat,
min_repeat_ms, cooldown_interval);
min_repeat_ms, cooldown_interval, enable_cpu_cache_flush);
});

} // namespace auto_scheduler
Expand Down

0 comments on commit 304ed59

Please sign in to comment.