Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmarks: Microbenchmark - Add distributed inference benchmark cpp implementation #586

Merged
merged 48 commits into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
835b9ef
add dist inference cpp
yzygitzh Dec 6, 2023
16bb728
fix cmake
yzygitzh Dec 6, 2023
b01043d
fix mpi cmake
yzygitzh Dec 6, 2023
7ef9df5
revise cmake
yzygitzh Dec 6, 2023
1de8864
fix cmake
yzygitzh Dec 6, 2023
e361aa0
fix cpp lint
yzygitzh Dec 6, 2023
611dccc
add python wrapper, revise config, add tests, add example, add doc
yzygitzh Dec 6, 2023
1f230dc
fix lint
yzygitzh Dec 6, 2023
d81d7af
address lint issues
yzygitzh Dec 7, 2023
56bc282
address lint
yzygitzh Dec 7, 2023
f386641
fix lint
yzygitzh Dec 7, 2023
ac08f84
add test data
yzygitzh Dec 7, 2023
49d4600
fix test
yzygitzh Dec 7, 2023
06a1cfc
fix lint
yzygitzh Dec 7, 2023
a4338f0
fix import
yzygitzh Dec 7, 2023
a84b135
fix test
yzygitzh Dec 7, 2023
e3e3b3a
fix test
yzygitzh Dec 7, 2023
d498a92
Merge branch 'main' into ziyue/add-dist-inf-cpp
yzygitzh Dec 7, 2023
bbba691
fix lint
yzygitzh Dec 7, 2023
92f1c6b
Merge branch 'ziyue/add-dist-inf-cpp' of https://github.com/yzygitzh/…
yzygitzh Dec 7, 2023
639ea27
address comments
yzygitzh Dec 8, 2023
254c499
recover file
yzygitzh Dec 8, 2023
e4643fa
remove files
yzygitzh Dec 8, 2023
c008324
fix lint
yzygitzh Dec 8, 2023
520c753
revert benchmark name
yzygitzh Dec 8, 2023
c913bfe
fix test
yzygitzh Dec 8, 2023
e393332
fix lint
yzygitzh Dec 8, 2023
efb2cf1
address comment
yzygitzh Dec 8, 2023
5b77341
fix bug
yzygitzh Dec 8, 2023
45f0d27
revise doc
yzygitzh Dec 9, 2023
9241e84
Merge branch 'main' into ziyue/add-dist-inf-cpp
yzygitzh Dec 10, 2023
491542b
fix cmakefile
yzygitzh Dec 10, 2023
b379db3
fix dockerfile
yzygitzh Dec 10, 2023
2c13689
adapt to rocm hipblaslt
yzygitzh Dec 10, 2023
dbf0c89
fix test
yzygitzh Dec 10, 2023
ebbfc1d
fix test 2
yzygitzh Dec 10, 2023
fc33676
fix test
yzygitzh Dec 10, 2023
4cb0351
fix name
yzygitzh Dec 10, 2023
9bed36b
fix lint
yzygitzh Dec 10, 2023
443ccbf
fix lint
yzygitzh Dec 10, 2023
0f6f21a
fix test
yzygitzh Dec 10, 2023
a7e8fba
fix test
yzygitzh Dec 10, 2023
5720d9d
fix test
yzygitzh Dec 10, 2023
998e089
fix test
yzygitzh Dec 10, 2023
cb2ff64
fix test
yzygitzh Dec 10, 2023
db12f51
try fix coverage
yzygitzh Dec 10, 2023
19cc31f
cover logging in test
yzygitzh Dec 10, 2023
c875a81
fix test
yzygitzh Dec 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 145 additions & 59 deletions superbench/benchmarks/micro_benchmarks/dist_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

yzygitzh marked this conversation as resolved.
Show resolved Hide resolved
from superbench.common.utils import logger
from superbench.benchmarks import DistributedImpl, DistributedBackend, BenchmarkRegistry, ReturnCode, Precision
from superbench.benchmarks.micro_benchmarks import MicroBenchmark
from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke
from superbench.benchmarks.context import Enum
from superbench.benchmarks.reducer import ReduceType

Expand Down Expand Up @@ -168,7 +168,7 @@ def forward(self, x):
return activation_out


class DistInference(MicroBenchmark):
class DistInference(MicroBenchmarkWithInvoke):
"""The base class of micro-benchmarks."""
def __init__(self, name, parameters=''):
"""Constructor.
Expand All @@ -184,6 +184,9 @@ def __init__(self, name, parameters=''):
self.__device = None
self.__cuda_available = False

# For cpp impl path
self._bin_name = 'dist_inference'

def __timer(self):
"""Returns the current time which ensures all previous CUDA events have been finished.

Expand All @@ -201,6 +204,12 @@ def add_parser_arguments(self):
"""Add the specified arguments."""
super().add_parser_arguments()

self._parser.add_argument(
'--use_cpp_impl',
cp5555 marked this conversation as resolved.
Show resolved Hide resolved
action='store_true',
required=False,
help='Whether to use cpp-based implementation.',
cp5555 marked this conversation as resolved.
Show resolved Hide resolved
)
self._parser.add_argument(
'--batch_size',
type=int,
Expand All @@ -222,6 +231,20 @@ def add_parser_arguments(self):
required=False,
help='Hidden size.',
)
self._parser.add_argument(
'--alpha',
type=float,
default=1.0,
required=False,
help='Coefficient alpha in D = alpha*(A*B) + beta*(C).',
)
self._parser.add_argument(
'--beta',
type=float,
default=1.0,
required=False,
help='Coefficient beta in D = alpha*(A*B) + beta*(C).',
)
self._parser.add_argument(
'--num_layers',
type=int,
Expand Down Expand Up @@ -285,6 +308,12 @@ def add_parser_arguments(self):
required=False,
help='Distributed backends. E.g. {}.'.format(' '.join(DistributedBackend.get_values())),
)
self._parser.add_argument(
'--use_cuda_graph',
action='store_true',
required=False,
help='Whether to launch kernels in CUDA graph mode.',
)

def _preprocess(self):
"""Preprocess/preparation operations before the benchmarking.
Expand All @@ -295,32 +324,47 @@ def _preprocess(self):
if not super()._preprocess():
return False

if self._args.distributed_impl != DistributedImpl.DDP:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
logger.error(
'Unsupported distributed implementation - model: {}, distributed implementation: {}.'.format(
self._name, self._args.distributed_impl
if self._args.use_cpp_impl:
# Assemble commands if cpp impl path
self.__bin_path = os.path.join(self._args.bin_dir, self._bin_name)

args = '-m %d -n %d -k %d' % (self._args.input_size, self._args.batch_size, self._args.hidden_size)
args += ' --alpha %g --beta %g' % (self._args.alpha, self._args.beta)
args += ' --num_layers %d --num_warmups %d --num_iters %d' % \
(self._args.num_layers, self._args.num_warmup, self._args.num_steps)
if self._args.use_cuda_graph:
args += ' --use_cuda_graph'
self._commands = ['%s %s' % (self.__bin_path, args)]
else:
# Initialize PyTorch if pytorch impl path
if self._args.distributed_impl != DistributedImpl.DDP:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
logger.error(
'Unsupported distributed implementation - model: {}, distributed implementation: {}.'.format(
self._name, self._args.distributed_impl
)
)
)
return False
return False

try:
torch.distributed.init_process_group(backend=self._args.distributed_backend.value)
self.__world_size = int(os.environ['WORLD_SIZE'])
self.__local_rank = int(os.environ['LOCAL_RANK'])
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
torch.distributed.destroy_process_group()
logger.error(
'Initialize distributed env failed - benchmark: {}, message: {}.'.format(self._name, str(e))
)
return False

try:
torch.distributed.init_process_group(backend=self._args.distributed_backend.value)
self.__world_size = int(os.environ['WORLD_SIZE'])
self.__local_rank = int(os.environ['LOCAL_RANK'])
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_INIT_FAILURE)
torch.distributed.destroy_process_group()
logger.error('Initialize distributed env failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False

if torch.cuda.is_available():
torch.cuda.set_device(self.__local_rank)
self.__device = torch.device('cuda:{}'.format(self.__local_rank))
self.__cuda_available = True
else:
self.__device = torch.device('cpu:{}'.format(self.__local_rank))
self.__cuda_available = False
if torch.cuda.is_available():
torch.cuda.set_device(self.__local_rank)
self.__device = torch.device('cuda:{}'.format(self.__local_rank))
self.__cuda_available = True
else:
self.__device = torch.device('cpu:{}'.format(self.__local_rank))
self.__cuda_available = False

return True

Expand Down Expand Up @@ -401,38 +445,79 @@ def _benchmark(self):
Return:
True if _benchmark succeeds.
"""
batch_size = self._args.batch_size
input_size = self._args.input_size
hidden_size = self._args.hidden_size
num_layers = self._args.num_layers
computation = self._args.computation_kernel
communication = self._args.communication_kernel
activation = self._args.activation_kernel
precision = self._args.precision
num_warmup = self._args.num_warmup
num_steps = self._args.num_steps

if self.__local_rank == 0:
logger.info(
'Distributed Inference - using {} GPUs: '
'batch_size={}, input_size={}, hidden_size={}, num_layers={}, '
'computation_kernel={}, communication_kernel={}, activation_kernel={}, precision={}, '
'num_warmup={} num_steps={}'.format(
self.__world_size, batch_size, input_size, hidden_size, num_layers, computation, communication,
activation, precision, num_warmup, num_steps
if self._args.use_cpp_impl:
# Execute commands if cpp impl path
if not super()._benchmark():
return False
return True
yzygitzh marked this conversation as resolved.
Show resolved Hide resolved
else:
# Execute PyTorch model if pytorch impl path
batch_size = self._args.batch_size
input_size = self._args.input_size
hidden_size = self._args.hidden_size
num_layers = self._args.num_layers
computation = self._args.computation_kernel
communication = self._args.communication_kernel
activation = self._args.activation_kernel
precision = self._args.precision
num_warmup = self._args.num_warmup
num_steps = self._args.num_steps

if self.__local_rank == 0:
logger.info(
'Distributed Inference - using {} GPUs: '
'batch_size={}, input_size={}, hidden_size={}, num_layers={}, '
'computation_kernel={}, communication_kernel={}, activation_kernel={}, precision={}, '
'num_warmup={} num_steps={}'.format(
self.__world_size, batch_size, input_size, hidden_size, num_layers, computation, communication,
activation, precision, num_warmup, num_steps
)
)

# Prepare model
model = self._prepare_model(
input_size, hidden_size, num_layers, computation, communication, activation, precision,
self.__world_size
)

# Prepare model
model = self._prepare_model(
input_size, hidden_size, num_layers, computation, communication, activation, precision, self.__world_size
)
# Run model
step_times = self._run_model(model, batch_size, input_size, precision, self.__device, num_warmup, num_steps)

# Process data and return
return self._process_data(step_times)

# Run model
step_times = self._run_model(model, batch_size, input_size, precision, self.__device, num_warmup, num_steps)
def _process_raw_result(self, cmd_idx, raw_output):
"""Function to parse raw results and save the summarized results.

# Process data and return
return self._process_data(step_times)
self._result.add_raw_data() and self._result.add_result() need to be called to save the results.

Args:
cmd_idx (int): the index of command corresponding with the raw_output.
raw_output (str): raw output string of the micro-benchmark.

Return:
True if the raw output string is valid and result can be extracted.
"""
self._result.add_raw_data('raw_output_' + str(cmd_idx), raw_output, self._args.log_raw_data)

try:
output_lines = [x.strip() for x in raw_output.strip().splitlines()]
step_time = None
for output_line in output_lines:
if ' ms per iteration' in output_line:
step_time = float(output_line.split(' ms per iteration')[0].split()[-1])
break
return self._process_numeric_result(
'step_times', [step_time], reduce_type=ReduceType.MAX, cal_percentile=True
)
except BaseException as e:
self._result.set_return_code(ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE)
logger.error(
'The result format is invalid - round: {}, benchmark: {}, raw output: {}, message: {}.'.format(
self._curr_run_index, self._name, raw_output, str(e)
)
)
return False

def _postprocess(self):
"""Postprocess/cleanup operations after the benchmarking.
Expand All @@ -443,12 +528,13 @@ def _postprocess(self):
if not super()._postprocess():
return False

try:
torch.distributed.destroy_process_group()
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_DESTROY_FAILURE)
logger.error('Post process failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False
if not self._args.use_cpp_impl:
try:
torch.distributed.destroy_process_group()
except BaseException as e:
self._result.set_return_code(ReturnCode.DISTRIBUTED_SETTING_DESTROY_FAILURE)
logger.error('Post process failed - benchmark: {}, message: {}.'.format(self._name, str(e)))
return False

return True

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

cmake_minimum_required(VERSION 3.18)

project(dist_inference LANGUAGES CXX)

find_package(MPI REQUIRED)
include_directories(SYSTEM ${MPI_INCLUDE_PATH})

find_package(CUDAToolkit QUIET)

# Cuda environment
if(CUDAToolkit_FOUND)
message(STATUS "Found CUDA: " ${CUDAToolkit_VERSION})

include(../cuda_common.cmake)
add_executable(dist_inference dist_inference.cu)
set_property(TARGET dist_inference PROPERTY CUDA_ARCHITECTURES ${NVCC_ARCHS_SUPPORTED})
target_link_libraries(dist_inference MPI::MPI_CXX nccl cublasLt)
else()
# ROCm environment
include(../rocm_common.cmake)
find_package(hip QUIET)
if(hip_FOUND)
message(STATUS "Found ROCm: " ${HIP_VERSION})
# Convert cuda code to hip code inplace
execute_process(COMMAND hipify-perl -print-stats -o dist_inference.cpp dist_inference.cu
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/)

# Add HIP targets
add_executable(dist_inference dist_inference.cpp)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -DROCM_USE_FLOAT16=1")
target_link_libraries(dist_inference MPI::MPI_CXX rccl hipblaslt hip::device)
else()
message(FATAL_ERROR "No CUDA or ROCm environment found.")
endif()
endif()

install(TARGETS dist_inference RUNTIME DESTINATION bin)
Loading
Loading