Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarks: Microbenchmark - Add distributed inference benchmark cpp implementation #586

Merged
merged 48 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
835b9ef
add dist inference cpp
yzygitzh Dec 6, 2023
16bb728
fix cmake
yzygitzh Dec 6, 2023
b01043d
fix mpi cmake
yzygitzh Dec 6, 2023
7ef9df5
revise cmake
yzygitzh Dec 6, 2023
1de8864
fix cmake
yzygitzh Dec 6, 2023
e361aa0
fix cpp lint
yzygitzh Dec 6, 2023
611dccc
add python wrapper, revise config, add tests, add example, add doc
yzygitzh Dec 6, 2023
1f230dc
fix lint
yzygitzh Dec 6, 2023
d81d7af
address lint issues
yzygitzh Dec 7, 2023
56bc282
address lint
yzygitzh Dec 7, 2023
f386641
fix lint
yzygitzh Dec 7, 2023
ac08f84
add test data
yzygitzh Dec 7, 2023
49d4600
fix test
yzygitzh Dec 7, 2023
06a1cfc
fix lint
yzygitzh Dec 7, 2023
a4338f0
fix import
yzygitzh Dec 7, 2023
a84b135
fix test
yzygitzh Dec 7, 2023
e3e3b3a
fix test
yzygitzh Dec 7, 2023
d498a92
Merge branch 'main' into ziyue/add-dist-inf-cpp
yzygitzh Dec 7, 2023
bbba691
fix lint
yzygitzh Dec 7, 2023
92f1c6b
Merge branch 'ziyue/add-dist-inf-cpp' of https://github.com/yzygitzh/…
yzygitzh Dec 7, 2023
639ea27
address comments
yzygitzh Dec 8, 2023
254c499
recover file
yzygitzh Dec 8, 2023
e4643fa
remove files
yzygitzh Dec 8, 2023
c008324
fix lint
yzygitzh Dec 8, 2023
520c753
revert benchmark name
yzygitzh Dec 8, 2023
c913bfe
fix test
yzygitzh Dec 8, 2023
e393332
fix lint
yzygitzh Dec 8, 2023
efb2cf1
address comment
yzygitzh Dec 8, 2023
5b77341
fix bug
yzygitzh Dec 8, 2023
45f0d27
revise doc
yzygitzh Dec 9, 2023
9241e84
Merge branch 'main' into ziyue/add-dist-inf-cpp
yzygitzh Dec 10, 2023
491542b
fix cmakefile
yzygitzh Dec 10, 2023
b379db3
fix dockerfile
yzygitzh Dec 10, 2023
2c13689
adapt to rocm hipblaslt
yzygitzh Dec 10, 2023
dbf0c89
fix test
yzygitzh Dec 10, 2023
ebbfc1d
fix test 2
yzygitzh Dec 10, 2023
fc33676
fix test
yzygitzh Dec 10, 2023
4cb0351
fix name
yzygitzh Dec 10, 2023
9bed36b
fix lint
yzygitzh Dec 10, 2023
443ccbf
fix lint
yzygitzh Dec 10, 2023
0f6f21a
fix test
yzygitzh Dec 10, 2023
a7e8fba
fix test
yzygitzh Dec 10, 2023
5720d9d
fix test
yzygitzh Dec 10, 2023
998e089
fix test
yzygitzh Dec 10, 2023
cb2ff64
fix test
yzygitzh Dec 10, 2023
db12f51
try fix coverage
yzygitzh Dec 10, 2023
19cc31f
cover logging in test
yzygitzh Dec 10, 2023
c875a81
fix test
yzygitzh Dec 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/user-tutorial/benchmarks/micro-benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ Test the performance of large scale matmul operation with multiple GPUs:

#### Introduction

Test the performance of distributed model inference.
Test the performance of distributed model inference. Support both PyTorch implementation and cpp implementation.

#### Metrics

Expand Down
207 changes: 147 additions & 60 deletions superbench/benchmarks/micro_benchmarks/dist_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

yzygitzh marked this conversation as resolved.
Show resolved Hide resolved
from superbench.common.utils import logger
from superbench.benchmarks import DistributedImpl, DistributedBackend, BenchmarkRegistry, ReturnCode, Precision
from superbench.benchmarks.micro_benchmarks import MicroBenchmark
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
from superbench.benchmarks.context import Enum
from superbench.benchmarks.reducer import ReduceType

Expand Down Expand Up @@ -168,7 +168,7 @@ def forward(self, x):
return activation_out


class DistInference(MicroBenchmark):
class DistInference(MicroBenchmarkWithInvoke):
"""The base class of micro-benchmarks."""
def __init__(self, name, parameters=''):
"""Constructor.
Expand All @@ -182,7 +182,9 @@ def __init__(self, name, parameters=''):
self.__local_rank = 0
torch.backends.cudnn.benchmark = True
self.__device = None
self.__cuda_available = False

# For cpp impl path
self._bin_name = 'dist_inference'

def __timer(self):
"""Returns the current time which ensures all previous CUDA events have been finished.
Expand All @@ -193,14 +195,19 @@ def __timer(self):
Return:
Current time in second.
"""
if self.__cuda_available:
torch.cuda.synchronize()
torch.cuda.synchronize()
return time.time()

def add_parser_arguments(self):
"""Add the specified arguments."""
super().add_parser_arguments()

self._parser.add_argument(
'--use_pytorch',
action='store_true',
required=False,
help='Whether to use pytorch implementation. If not, cpp implementation will be used.',
)
self._parser.add_argument(
'--batch_size',
type=int,
Expand All @@ -222,6 +229,20 @@ def add_parser_arguments(self):
required=False,
help='Hidden size.',
)
self._parser.add_argument(
'--alpha',
type=float,
default=1.0,
required=False,
help='Coefficient alpha in D = alpha*(A*B) + beta*(C).',
)
self._parser.add_argument(
'--beta',
type=float,
default=1.0,
required=False,
help='Coefficient beta in D = alpha*(A*B) + beta*(C).',
)
self._parser.add_argument(
'--num_layers',
type=int,
Expand Down Expand Up @@ -285,6 +306,12 @@ def add_parser_arguments(self):
required=False,
help='Distributed backends. E.g. {}.'.format(' '.join(DistributedBackend.get_values())),
)
self._parser.add_argument(
'--use_cuda_graph',
action='store_true',
required=False,
help='Whether to launch kernels in CUDA graph mode.',
)

def _preprocess(self):
"""Preprocess/preparation operations before the benchmarking.
Expand All @@ -295,32 +322,41 @@ def _preprocess(self):
if not super()._preprocess():
return False

if self._args.distributed_impl != DistributedImpl.DDP:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
logger.error(
'Unsupported distributed implementation - model: {}, distributed implementation: {}.'.format(
self._name, self._args.distributed_impl
if self._args.use_pytorch:
# Initialize PyTorch if pytorch impl path
if self._args.distributed_impl != DistributedImpl.DDP:
return self._set_error_code_and_print_error_msg(
ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE,
'Unsupported distributed implementation - model: {}, distributed implementation: {}.'.format(
self._name, self._args.distributed_impl
)
)
)
return False

try:
torch.distributed.init_process_group(backend=self._args.distributed_backend.value)
self.__world_size = int(os.environ['WORLD_SIZE'])
self.__local_rank = int(os.environ['LOCAL_RANK'])
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
torch.distributed.destroy_process_group()
logger.error('Initialize distributed env failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False
try:
torch.distributed.init_process_group(backend=self._args.distributed_backend.value)
self.__world_size = int(os.environ['WORLD_SIZE'])
self.__local_rank = int(os.environ['LOCAL_RANK'])
assert (torch.cuda.is_available())
except BaseException as e:
torch.distributed.destroy_process_group()
return self._set_error_code_and_print_error_msg(
ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE,
'Initialize distributed env failed - benchmark: {}, message: {}.'.format(self._name, str(e))
)

if torch.cuda.is_available():
torch.cuda.set_device(self.__local_rank)
self.__device = torch.device('cuda:{}'.format(self.__local_rank))
self.__cuda_available = True
else:
self.__device = torch.device('cpu:{}'.format(self.__local_rank))
self.__cuda_available = False
# Assemble commands if cpp impl path
self.__bin_path = os.path.join(self._args.bin_dir, self._bin_name)

args = '-m %d -n %d -k %d' % (self._args.hidden_size, self._args.batch_size, self._args.input_size)
args += ' --alpha %g --beta %g' % (self._args.alpha, self._args.beta)
args += ' --num_layers %d --num_warmups %d --num_iters %d' % \
(self._args.num_layers, self._args.num_warmup, self._args.num_steps)
if self._args.use_cuda_graph:
args += ' --use_cuda_graph'
self._commands = ['%s %s' % (self.__bin_path, args)]

return True

Expand All @@ -347,8 +383,7 @@ def _prepare_model(
self.__device
)
model = model.to(dtype=getattr(torch, precision.value))
if self.__cuda_available:
model = model.cuda()
model = model.cuda()
return model

def _run_model(self, model, batch_size, input_size, precision, device, num_warmup, num_steps):
Expand Down Expand Up @@ -401,38 +436,78 @@ def _benchmark(self):
Return:
True if _benchmark succeeds.
"""
batch_size = self._args.batch_size
input_size = self._args.input_size
hidden_size = self._args.hidden_size
num_layers = self._args.num_layers
computation = self._args.computation_kernel
communication = self._args.communication_kernel
activation = self._args.activation_kernel
precision = self._args.precision
num_warmup = self._args.num_warmup
num_steps = self._args.num_steps

if self.__local_rank == 0:
logger.info(
'Distributed Inference - using {} GPUs: '
'batch_size={}, input_size={}, hidden_size={}, num_layers={}, '
'computation_kernel={}, communication_kernel={}, activation_kernel={}, precision={}, '
'num_warmup={} num_steps={}'.format(
self.__world_size, batch_size, input_size, hidden_size, num_layers, computation, communication,
activation, precision, num_warmup, num_steps
if self._args.use_pytorch:
# Execute PyTorch model if pytorch impl path
batch_size = self._args.batch_size
input_size = self._args.input_size
hidden_size = self._args.hidden_size
num_layers = self._args.num_layers
computation = self._args.computation_kernel
communication = self._args.communication_kernel
activation = self._args.activation_kernel
precision = self._args.precision
num_warmup = self._args.num_warmup
num_steps = self._args.num_steps

if self.__local_rank == 0:
logger.info(
'Distributed Inference - using {} GPUs: '
'batch_size={}, input_size={}, hidden_size={}, num_layers={}, '
'computation_kernel={}, communication_kernel={}, activation_kernel={}, precision={}, '
'num_warmup={} num_steps={}'.format(
self.__world_size, batch_size, input_size, hidden_size, num_layers, computation, communication,
activation, precision, num_warmup, num_steps
)
)

# Prepare model
model = self._prepare_model(
input_size, hidden_size, num_layers, computation, communication, activation, precision,
self.__world_size
)

# Prepare model
model = self._prepare_model(
input_size, hidden_size, num_layers, computation, communication, activation, precision, self.__world_size
)
# Run model
step_times = self._run_model(model, batch_size, input_size, precision, self.__device, num_warmup, num_steps)

# Run model
step_times = self._run_model(model, batch_size, input_size, precision, self.__device, num_warmup, num_steps)
# Process data and return
return self._process_data(step_times)
else:
# Execute commands if cpp impl path
if not super()._benchmark():
return False
return True

def _process_raw_result(self, cmd_idx, raw_output):
"""Function to parse raw results and save the summarized results.

self._result.add_raw_data() and self._result.add_result() need to be called to save the results.

# Process data and return
return self._process_data(step_times)
Args:
cmd_idx (int): the index of command corresponding with the raw_output.
raw_output (str): raw output string of the micro-benchmark.

Return:
True if the raw output string is valid and result can be extracted.
"""
self._result.add_raw_data('raw_output_' + str(cmd_idx), raw_output, self._args.log_raw_data)

try:
output_lines = [x.strip() for x in raw_output.strip().splitlines()]
step_time = None
for output_line in output_lines:
if ' ms per iteration' in output_line:
step_time = float(output_line.split(' ms per iteration')[0].split()[-1])
break
return self._process_numeric_result(
'step_times', [step_time], reduce_type=ReduceType.MAX, cal_percentile=True
)
except BaseException as e:
return self._set_error_code_and_print_error_msg(
ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE,
'The result format is invalid - round: {}, benchmark: {}, raw output: {}, message: {}.'.format(
self._curr_run_index, self._name, raw_output, str(e)
)
)

def _postprocess(self):
"""Postprocess/cleanup operations after the benchmarking.
Expand All @@ -443,14 +518,26 @@ def _postprocess(self):
if not super()._postprocess():
return False

try:
torch.distributed.destroy_process_group()
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_DESTROY_FAILURE)
logger.error('Post process failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False
if self._args.use_pytorch:
try:
torch.distributed.destroy_process_group()
except BaseException as e:
return self._set_error_code_and_print_error_msg(
ReturnCode.DISTRIBUTED_SETTING_DESTROY_FAILURE,
'Post process failed - benchmark: {}, message: {}.'.format(self._name, str(e))
)

return True

def _set_error_code_and_print_error_msg(self, error_code, error_msg):
"""Set error code and print error log upon error.

Return:
False, representing error.
"""
self._result.set_return_code(error_code)
logger.error(error_msg)
return False


BenchmarkRegistry.register_benchmark('pytorch-dist-inference', DistInference, parameters='')
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

cmake_minimum_required(VERSION 3.18)

project(dist_inference LANGUAGES CXX)

find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_INCLUDE_PATH})

find_package(CUDAToolkit QUIET)

# Cuda environment
if(CUDAToolkit_FOUND)
message(STATUS "Found CUDA: " ${CUDAToolkit_VERSION})

include(../cuda_common.cmake)
add_executable(dist_inference dist_inference.cu)
set_property(TARGET dist_inference PROPERTY CUDA_ARCHITECTURES ${NVCC_ARCHS_SUPPORTED})
target_link_libraries(dist_inference MPI::MPI_CXX nccl cublasLt)
else()
# ROCm environment
include(../rocm_common.cmake)
find_package(hip QUIET)
if(hip_FOUND)
message(STATUS "Found ROCm: " ${HIP_VERSION})

# Convert cuda code to hip code in cpp
execute_process(COMMAND hipify-perl -print-stats -o dist_inference.cpp dist_inference.cu WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/)

# link hip device lib
add_executable(dist_inference dist_inference.cpp)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -DROCM_USE_FLOAT16=1")
target_link_libraries(dist_inference MPI::MPI_CXX rccl hipblaslt hip::device)
else()
message(FATAL_ERROR "No CUDA or ROCm environment found.")
endif()
endif()

install(TARGETS dist_inference RUNTIME DESTINATION bin)
Loading
Loading